6

Springboot 整合 RabbitMQ 消息队列

 2 years ago
source link: https://blog.51cto.com/u_15558033/5584970
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Fanout 模式

生产者工程

POM依赖

可以在创建工程时直接选择添加依赖。

Springboot 整合 RabbitMQ 消息队列_spring
Springboot 整合 RabbitMQ 消息队列_spring_02

application文件

因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置。

Springboot 整合 RabbitMQ 消息队列_spring_03
Springboot 整合 RabbitMQ 消息队列_配置文件_04

RabbitMQ配置文件

在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败。所以在使用springboot整合rabbitmq时,我们可以通过配置文件来进行交换机、队列的声明及二者之间的关系绑定。 由于目前在演示Fanout模式,所以使用FanoutExchange来声明交换机,其他模式则使用相对应的TopicExchange,DirectExchange来声明。

@Configuration
public class RabbitMQConfiguration {

//声明fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}

//声明队列
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}

@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}

@Bean
public Queue duanxinQueue() {
return new Queue("duanxin.fanout.queue", true);
}
//绑定

@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}

@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}

@Bean
public Binding duanxinBinding() {
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}

生产者业务代码

这部分代码就简单的通过调用rabbitTemplate来进行消息的分发。@Service public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void makeOrder() {
// 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("下单成功:" + orderId);
// 通过MQ完成消息的分发
// 参数1:交换机 ;参数2:路由key/队列名;参数3:消息内容
String exchangeName = "fanout_order_exchange";
rabbitTemplate.convertAndSend(exchangeName, "", orderId);
}
}

消费者工程和生产者工程类似,我们首先需要引入依赖,然后在application文件中进行相关的配置即可开始编写代码。 在消费者工程中我们也可以编写rabbitmq的配置文件来进行交换机及队列的声明。建议在消费端编写配置文件,因为消费端是先启动的工程,如果交换机和队列未创建会导致工程启动失败。 消息监听

我们通过RabbitListener注解来监听消息队列。需要注意的是我们需要通过Component注解将该监听交给spring管理,否则不能正常接收服务端的消息。 这边只给出一个email的消息监听,上文生产者声明的duanxin,sms队列可以自行创建,只需要修改队列名即可。@Service public class OrderService {

@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailService {
@RabbitHandler
public void receive(String message) {
System.out.println("email fanout -----》接收到" + message);
}
}

首先启动消费者工程,然后在生产者工程中创建测试类发送消息即可。

@SpringBootTest class SpringbootOrderRabbitmqProducerApplicationTests {

@Autowired
private OrderService orderService;

@Test
void contextLoads() {
orderService.makeOrder();
}
}

当发送消息后,我们可以在控制台中发现消费者成功接受消息。

Springboot 整合 RabbitMQ 消息队列_spring_05

Direct 模式

建立工程的步骤和上文相同。

配置和上文基本相同,由于该部分测试direct模式,所以需要使用DirectExchange创建交换机。需要注意的是该类中的方法名不能和上文rabbitmq的配置文件中的方法名相同,因为我们使用bean注解将其交给spring管理,如果名字相同,则会启动项目失败。

@Configuration
public class DirectRabbitMQConfiguration {

//声明direct模式的交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}

//声明队列
@Bean
public Queue smsDirectQueue() {
return new Queue("sms.direct.queue", true);
}

@Bean
public Queue emailDirectQueue() {
return new Queue("email.direct.queue", true);
}

@Bean
public Queue duanxinDirectQueue() {
return new Queue("duanxin.direct.queue", true);
}
//绑定

@Bean
public Binding smsDirectBinding() {
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}

@Bean
public Binding emailDirectBinding() {
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}

@Bean
public Binding duanxinDirectBinding() {
return BindingBuilder.bind(duanxinDirectQueue()).to(directExchange()).with("duanxin");
}
}
@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void makeOrderDirect() {
// 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("下单成功:" + orderId);
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName, "sms", orderId);
rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
}

}

和上文相同,只需注意队列名即可。

@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
@RabbitHandler
public void receive(String message) {
System.out.println("email direct -----》接收到" + message);
}
}

Topic 模式

上文中个模式都是通过配置文件来声明交换机,队列及绑定二者之间的关系;实际上我们还可以通过注解的方式来声明交换机及注解。

由于使用注解方式声明,所以我们不需要创建配置文件,直接编写业务代码即可。测试的时候我们只需修改路由名即可,具体如何修改,请前往文章开头链接查看各模式是如何使用的。

@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void makeOrderTopic() {
// 保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("下单成功:" + orderId);
String exchangeName = "topic_order_exchange";
String routingKey = "com.email";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}

代码和上文基本相同,区别在于我们直接在RabbitListener注解中将队列和交换机进行绑定。需要注意的是各参数中都是使用字符串。 value对应的是队列,相应的参数分别是队列名、持久化、自动删除。 exchange对应的交换机,相应的参数分别是交换机名以及交换机类型。 key对应的是路由名。

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "*.email.#"
))
@Component
public class TopicEmailService {
@RabbitHandler
public void receive(String message) {
System.out.println("email topic -----》接收到" + message);
}
}

以上便是springboot 整合 rabbitmq的两种方式。但是在日常开发中更推荐使用配置文件的形式来实现,因为在配置文件中可以更好的处理过期时间、死信队列等消息队列中的高级特性。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK