7

【笔记】SpringBoot项目整合SpringAMQP

 1 year ago
source link: https://feiju12138.github.io/2022/11/02/SpringBoot%E9%A1%B9%E7%9B%AE%E6%95%B4%E5%90%88SpringAMQP/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【笔记】SpringBoot项目整合SpringAMQP

2022-11-022022-11-03Java后端学习指北

2

SpringBoot项目整合SpringAMQP学习笔记

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 因为生产者和消费者都需要连接消息队列,所以需要相同的配置

spring.rabbitmq.addresses:RabbitMQ的地址
spring.rabbitmq.port:RabbitMQ的端口,默认为5672
spring.rabbitmq.virtual-host:RabbitMQ的虚拟主机
spring.rabbitmq.username:RabbitMQ的用户名
spring.rabbitmq.password:RabbitMQ的密码

src/main/resource/application.yml

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin

简单队列(Simple Queue)

  • 一个生产者向队列中发送消息
  • 一个消费者从队列中获取消息,每次获取消息后,阅后即焚
  • 通过convertAndSend()方法发送消息

"queuename":消息队列名
"message":消息内容

src/test/java/com/ApplicationTests.java

package com;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("queuename", "message");
}
}

@RabbitListener(queues = "")@RabbitListener(queues = {"", """}):指定队列名,可以指定多个队列名

src/main/java/com/listener/SpringRabbitMQListener.java

package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive(String message) {
System.out.println("queuename: " + message);
}
}

工作队列(Work Queue)

  • 一个生产者向队列中发送消息
  • 多个消费者从队列中获取消息,每次获取消息后,阅后即焚

"queuename":消息队列名
"message":消息内容

src/test/java/com/ApplicationTests.java

package com;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("queuename", "message");
}
}

添加额外配置

  • RabbitMQ内部有预取机制,默认预取消息的数量是无限
  • 为了防止消费者提前将消息拿走后再处理,而忽略了自身能力,可以修改预取消息数量

spring.rabbitmq.listener.simple.prefetch:配置预取消息数量。如果设置为1,消费者每次只能先拿到1条消息,处理完后才会拿下一条消息

src/main/resource/application.yml

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
prefetch: 1

src/main/java/com/listener/SpringRabbitMQListener.java

package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive1(String message) {
System.out.println("queuename: " + message);
}

@RabbitListener(queues = "queuename")
public void receive2(String message) {
System.out.println("queuename: " + message);
}
}

发布订阅模式

  • 一个生产者向交换机(exchange)中发送消息
  • 交换机会将消息拷贝多份,发布给多个订阅的队列
  • 一个或多个消费者从对应的队列中获取消息,每次获取消息后,阅后即焚
  • 交换机只能做消息的转发,如果转发没有成功,则消息会丢失

广播方式(Fanout)

  • 交换机会将消息路由到每一个与其绑定的消息队列

消息队列绑定到交换机

  • 本案例通过声明Bean的方式声明交换机、声明消息队列、消息队列绑定交换机

src/main/java/com/config/FanoutConfig.java

package com.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

@SpringBootConfiguration
public class FanoutConfig {

// 声明1个交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchangename");
}

// 声明2个消息队列
@Bean
public Queue queue1() {
return new Queue("queuename1");
}
@Bean
public Queue queue2() {
return new Queue("queuename2");
}

// 2个消息队列绑定到交换机
@Bean
public Binding binding1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
  • 生产者在发送消息时需要指定交换机

"exchangename":交换机名
"message":消息内容

src/test/java/com/ApplicationTests.java

package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("exchangename", "", "message");
}
}

src/main/java/com/listener/SpringRabbitMQListener.java

package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename1")
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(queues = "queuename2")
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

路由方式(Direct)

  • 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
  • 每一个消息队列指定一个或多个BindingKey作为路由的依据
  • 在交换机上会指定一个RoutingKey作为路由的条件
  • RoutingKeyBindingKey匹配成功时,才进行消息的转发

convertAndSend():第一个参数是交换机名,第二个参数是BindingKey,第三个参数是消息内容

package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send1() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey1", "message");
}

@Test
public void send2() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey2", "message");
}

@Test
public void send3() {
rabbitTemplate.convertAndSend("exchangename", "BindingKey3", "message");
}
}
  • 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
  • 在消费者上,通过注解,将消息队列绑定交换机

value = @Queue():声明消息队列

name = "":指定消息队列名

exchange = @Exchange():声明交换机

name = "exchangename":指定交换机名
type = "direct"type = ExchangeTypes.DIRECT:指定交换机类型,可以指定字符串,也可以指定常量。如果不指定,默认是路由方式Direct

key = {"BindingKey1", "BindingKey3"}:指定BindingKey,可以指定一个或多个

package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename1"),
exchange = @Exchange(name = "exchangename", type = "direct"),
key = {"BindingKey1", "BindingKey3"}
))
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename2"),
exchange = @Exchange(name = "exchangename", type = ExchangeTypes.DIRECT),
key = {"BindingKey2", "BindingKey3"}
))
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

话题方式(Topic)

  • 话题方式与路由方式基本相同,只是引入了通配符匹配
  • 交换机会根据规则路由到指定的消息队列,并不一定会转发给所有绑定的消息队列
  • 每一个消息队列指定一个或多个BindingKey作为路由的依据,与路由方式不同的是,话题方式要求BindingKey是以多个单词组合构成,多个单词之间用.分隔
  • 在交换机上会指定一个RoutingKey作为路由的条件,可以使用通配符
    • #:表示0个或多个任意单词
    • *:表示1个任意单词
  • RoutingKeyBindingKey匹配成功时,才进行消息的转发

convertAndSend():第一个参数是交换机名,第二个参数是BindingKey(多个单词之间由.分隔),第三个参数是消息内容

package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
rabbitTemplate.convertAndSend("exchangename", "key.key", "message");
}
}
  • 与路由方式的消费者基本相同,只是在指定BindingKey时可以使用通配符
  • 本案例通过注解的方式声明交换机、声明消息队列、消息队列绑定交换机
  • 在消费者上,通过注解,将消息队列绑定交换机

value = @Queue():声明消息队列

name = "":指定消息队列名

exchange = @Exchange():声明交换机

name = "exchangename":指定交换机名
type = "topic"type = ExchangeTypes.TOPIC:指定交换机类型,可以指定字符串,也可以指定常量

key = {"key.#", "#.key"}:指定BindingKey,可以指定一个或多个,可以使用通配符

package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitMQListener {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename1"),
exchange = @Exchange(name = "exchangename", type = "topic"),
key = {"key.#"}
))
public void receive1(String message) {
System.out.println("queuename1: " + message);
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "queuename2"),
exchange = @Exchange(name = "exchangename", type = ExchangeTypes.TOPIC),
key = {"#.key"}
))
public void receive2(String message) {
System.out.println("queuename2: " + message);
}
}

将消息转换方式改为JSON

  • 默认的消息转换方式是JDK的序列化和反序列化方式,但是这种方式带来的问题是消息内容过长,同时JDK序列化可能会被漏洞利用
  • 可以将消息转换方式改为JSON的方式

pom.xml

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

创建配置类

src/main/java/com/ApplicationConfig.java

package com;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

@SpringBootConfiguration
public class ApplicationConfig {

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

发送和接收消息

src/test/java/com/ApplicationTests.java

package com;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void send() {
Map<String, Object> map = new HashMap<>();
map.put("key", "value");
rabbitTemplate.convertAndSend("queuename", map);
}
}

src/main/java/com/listener/SpringRabbitMQListener.java

package com.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class SpringRabbitMQListener {

@RabbitListener(queues = "queuename")
public void receive(Map<String, Object> map) {
System.out.println("queuename: " + map);
}
}

哔哩哔哩——黑马程序员


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK