2

SpringBoot整合RabbitMQ实现六种工作模式 - 小码code

 2 years ago
source link: https://www.cnblogs.com/jeremylai7/p/16527309.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

SpringBoot整合RabbitMQ实现六种工作模式

RabbitMQ主要有六种工作模式,本文整合SpringBoot分别介绍工作模式的实现。

消息生产者或者发送者,使用P表示:

2448954-20220728090003892-700599624.png

消息从生产端发送到消费端,一定要通过队列转发,使用queue_name表示:

2448954-20220728090018891-1897552678.png

消费的消费者或者接收者,使用C表示,如果有多个消费者也可以用C1C2表示:

2448954-20220728090034559-520399712.png

SpringBoot整合RabbitMQ基本配置

  1. 添加maven依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.1.RELEASE</version>
</dependency>
  1. 添加application.yml 配置
spring:
  rabbitmq:
    host: 192.168.3.19
    port: 5672
    username: admin
    password: 123456

生产端发送消息,调用RabbitTemplate发送消息,比如:

@Autowired
private RabbitTemplate rabbitTemplate;

public String send() {
  rabbitTemplate.convertAndSend("routingKey","send message");
}

消费消息使用队列监听注解@RabbitListener,添加队列名称就能消费发送到队列上的消息了:

@RabbitListener(queuesToDeclare = @Queue("queue_name"))
public void consume(String message) {
  // 接收消息
}

1. 简单(simple)模式

2448954-20220728090112507-1261708797.png

最简单的消息发送

  • 生产者是消费者是一一对应,也叫做点对点模式,生产者发送消息经过队列直接发送给消费者。
  • 生产者和消费者在发送和接收消息时,只需要指定队列名称,而不需要指定Exchange 交换机。

代码示例

生产消息:

@GetMapping("/simple-send")
public String simpleSend() {
  rabbitTemplate.convertAndSend("simple","this is news");
  return "ok";
}
@RabbitListener(queuesToDeclare = @Queue("simple"))
public void consume(String message) {
  System.out.println(message);
}
this is news

无需创建交换机和绑定队列,只需要匹配发送端和消费端的队列名称就能成功发送消息。

2. 工作模式

2448954-20220728090133322-1291689851.png

在多个消费者之间分配任务

  • 工作模式简单模式差不多,只需要生产端、消费端、队列。
  • 不同在于一个生产者、一个队列对应多个消费者,也就是一对多的关系。
  • 在多个消费者之间分配消息(竞争消费者模式),类似轮询发送消息,每个消息都只发给一个消费者。

代码示例

  • 生产消息:
@GetMapping("/work-send")
public String simpleSend() {
  rabbitTemplate.convertAndSend("work","this is news");
  return "ok";
}
  • 消费消息:
@RabbitListener(queuesToDeclare = @Queue("work"))
public void consume(String message) {
  System.out.println("first:" + message);
}

@RabbitListener(queuesToDeclare = @Queue("work"))
public void consumeSecond(String message) {
  System.out.println("second:" + message);
}

创建一个生产者,两个消费者,发送两条消息,两个消费者分别接收到消息,输出:

first:this is news
second:this is news

两个消费者,轮流消费消息。类似nginx负载均衡

3. 发布订阅模式

2448954-20220728090200773-1285288570.png

一次向多个消费者发送消息

  • 发布订阅类似广播消息,每个消息可以同时发送给订阅该消息的消费者,
  • 上图中的X表示交换机,使用的扇形交换机(fanout),它将发送的消息发送到所有绑定交换机的队列。

代码示例

  • 创建队列、交换机以及绑定:
@Bean
public FanoutExchange fanoutExchange() {
  return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
}

@Bean
public Queue psFirstQueue() {
  return new Queue("psFirstQueue");
}

@Bean
public Queue psSecondQueue() {
  return new Queue("psSecondQueue");
}

@Bean
public Queue psThirdQueue() {
  return new Queue("psThirdQueue");
}

@Bean
public Binding routingFirstBinding() {
  return BindingBuilder.bind(psFirstQueue()).to(fanoutExchange());
}

@Bean
public Binding routingSecondBinding() {
  return BindingBuilder.bind(psSecondQueue()).to(fanoutExchange());
}

@Bean
public Binding routingThirdBinding() {
  return BindingBuilder.bind(psThirdQueue()).to(fanoutExchange());
}
  • 上面定义一个交换机fanoutExchange
  • 分别绑定三个队列psFirstQueuepsSecondQueuepsThirdQueue
  • 队列绑定交换机不需要routingKey,直接绑定即可。
2448954-20220728090438992-807358892.png
@GetMapping("/publish-sub-send")
public String publishSubSend() {
  rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, "publish/subscribe hello");
  return "ok";
}

无需指定routingKey,设置为null

@RabbitListener(queues = "psFirstQueue")
public void pubsubQueueFirst(String message) {
  System.out.println("【first】:" + message);
}

@RabbitListener(queues = "psSecondQueue")
public void pubsubQueueSecond(String message) {
  System.out.println("【second】:" + message);
}

@RabbitListener(queues = "psThirdQueue")
public void pubsubQueueThird(String message) {
  System.out.println("【third】:" + message);
}
  • 输出:
【first】: publish/subscribe hello
【second】: publish/subscribe hello
【third】: publish/subscribe hello

发送一条消息,绑定的队列都能接收到消息。

4. 路由模式

2448954-20220728090231940-784298202.png

根据routingKey有选择性的接收消息

  • 每个队列根据不同routingKey绑定交换机
  • 消息发送到交换机后通过routingKey发送给特定的队列,然后传到消费者消费。
  • 交换由扇形交换机(fanout)改成直连交换机(direct)。

代码示例

  • 创建队列、交换机以及绑定:
@Bean
public Queue routingFirstQueue() {
    return new Queue("routingFirstQueue");
}

@Bean
public Queue routingSecondQueue() {
    return new Queue("routingSecondQueue");
}

@Bean
public Queue routingThirdQueue() {
    return new Queue("routingThirdQueue");
}

@Bean
public DirectExchange routingExchange() {
    return new DirectExchange("routingExchange");
}

@Bean
public Binding routingFirstBind() {
    return BindingBuilder.bind(routingFirstQueue()).to(routingExchange()).with("firstRouting");
}

@Bean
public Binding routingSecondBind() {
    return BindingBuilder.bind(routingSecondQueue()).to(routingExchange()).with("secondRouting");
}

@Bean
public Binding routingThirdBind() {
    return BindingBuilder.bind(routingThirdQueue()).to(routingExchange()).with("thirdRouting");
}
  • 创建一个交换机,根据不同的路由规则匹配不同的队列routingExchange,根据不同的routingKey绑定不同的队列:
  • firstRouting路由键绑定routingFirstQueue队列。
  • secondRouting路由键绑定routingSecondQueue队列。
  • thirdRouting路由键绑定routingThirdQueue队列。
2448954-20220728090457812-1654252138.png
  • 生产消息:
@GetMapping("/routing-first")
public String routingFirst() {
    // 使用不同的routingKey 转发到不同的队列
    rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");
    rabbitTemplate.convertAndSend("routingExchange","secondRouting"," second routing message");
    rabbitTemplate.convertAndSend("routingExchange","thirdRouting"," third routing message");
    return "ok";
}
  • 消费消息:
@RabbitListener(queues = "routingFirstQueue")
public void routingFirstListener(String message) {
    System.out.println("【routing first】" + message);
}

@RabbitListener(queues = "routingSecondQueue")
public void routingSecondListener(String message) {
    System.out.println("【routing second】" + message);
}

@RabbitListener(queues = "routingThirdQueue")
public void routingThirdListener(String message) {
    System.out.println("【routing third】" + message);
}

输出:

【routing first】first routing message
【routing second】second routing message
【routing third】third routing message

分析:

rabbitTemplate.convertAndSend("routingExchange","firstRouting"," first routing message");

消息从生产者指定firstRouting路由键,找到对应的绑定队列routingFirstQueue,就被routingFirstQueue队列消费了。

5. 主题模式

2448954-20220728090253089-1432660541.png

基于某个主题接收消息

路由模式发送的消息,是需要指定固定的routingKey,如果想要针对一类路由。
比如:

  • 只接收以.com 结尾的消息。
  • www.开头的消息。

主题模式就派上场了,路由模式主题模式类似,路由模式是设置特定的routingKey绑定唯一的队列,而主题模式的是使用通配符匹配一个或者多个队列。

代码示例

  • 创建交换机和队列:
@Bean
public Queue topicFirstQueue() {
    return new Queue("topicFirstQueue");
}

@Bean
public Queue topicSecondQueue() {
    return new Queue("topicSecondQueue");
}

@Bean
public Queue topicThirdQueue() {
    return new Queue("topicThirdQueue");
}

@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
}
  • 使用通配符绑定交换机和交换机:
@Bean
public Binding topicFirstBind() {
    // .com 为结尾
    return BindingBuilder.bind(topicFirstQueue()).to(topicExchange()).with("*.com");
}

@Bean
public Binding topicSecondBind() {
    // www.为开头
    return BindingBuilder.bind(topicSecondQueue()).to(topicExchange()).with("www.#");
}

通配符有两种,*#,

  • * 表示可以匹配一个
  • # 表示可以匹配多个

比如:

  • #.com表示接收多个.com结尾的字段。

    • 例如: taobao.comwww.taobao.comwww.jd.com
  • *.com表示接收一个.com结尾的字段。

    • 例如: taobao.comjd.com
    • 多个字段是无法匹配的,比如www.taobao.comcn.taobao.com
  • www.#可以匹配多个www开头的字段。

    • 例如www.taobaowww.jd
  • www.*可以匹配一个www开头的字段。

    • 例如:www.taobaowww.jd
    • 多个字段是无法匹配的,比如www.taobao.comwww.jd.com
  • 生产消息:

@GetMapping("/topic-first-send")
public String topicFirstSend() {
    rabbitTemplate.convertAndSend("topicExchange","www.taobao.com","www.taobao.com");
    rabbitTemplate.convertAndSend("topicExchange","taobao.com","taobao.com");
    rabbitTemplate.convertAndSend("topicExchange","www.jd","www.jd");
    return "topic ok";
}
  • 消费消息:
@RabbitListener(queues = "topicFirstQueue")
public void topicFirstListener(String message) {
    System.out.println("【topic first】" + message);
}

@RabbitListener(queues = "topicSecondQueue")
public void topicSecondListener(String message) {
    System.out.println("【topic second】" + message);
}
  • 输出:
【topic second】www.taobao.com
【topic first】taobao.com
【topic second】www.jd

www.#可以匹配多个以www.开头的路由键,例如www.taobao.comwww.jd。而*.com只能匹配一个以.com结尾的路由键,例如taobao.com,而无法匹配www.taobao.com

6. RPC模式

2448954-20220728090314118-1069419873.png

消息有返回值

  • PRC模式和上面的几种模式唯一不同的点在于,该模式可以收到消费端的返回值
  • 生成端接收消费端的返回值。

代码示例

  • 消费端添加返回值:
@RabbitListener(queuesToDeclare =@Queue("rpcQueue"))
public String rpcListener(String message) {
  System.out.println("【rpc接收消息】" + message);
  return "rpc 返回" + message;
}
  • 生产端发送消息:
@GetMapping("/rpc-send")
	public void rpcSend() {
		Object receive = rabbitTemplate.convertSendAndReceive("rpcQueue","rpc send message");
		System.out.println("【发送消息消息】" + receive);
	}
【rpc接收消息】rpc send message
【发送端接收消息】rpc 返回rpc send message

交换机类型

上面的 订阅发布模式路由模式以及主题模式使用到了不同的交换机,分别是:

  • 直连交换机 Direct
  • 扇形交换机 Fanout
  • 主题交换器 Topic
2448954-20220728090332611-183987311.png

Direct Exchange(直连)

2448954-20220728090349735-1858101104.png

直连交换机被应用在路由模式下,该交换机需要通过特定的routingKey来绑定队列,交换机只有接收到了匹配的routingKey才会将消息转发到对应的队列中,否则就不会转发消息。

路由模式使用直连交换机,该模式下根据routingKey绑定特定的队列。

Fanout Exchange(扇形)

2448954-20220728090403774-469023427.png

扇形交换机没有路由键的概念,只需将队列绑定在交换机上,发送到交换机上的消息会转发到交换机所以绑定的队列里面,类似广播,只要打开收音机都能接收到广播消息。扇形交换机应用于发布订阅模式

Topic Exchange(主题)

2448954-20220728090420982-912390682.png

主题模式是将路由键根据一个主题进行分类,和直连模式不同的是,直连模式绑定特定的路由键,而主题模式使用通配符绑定路由键,绑定键有两种:

  • * 表示可以匹配仅一个
  • # 表示可以匹配零个或多个

整合SpringBoot实现RabbitMQ六种工作模式,并详细讲解RabbitMQ六种工作模式:

  • 简单模式
    • 无需创建交换机,匹配生产端和消费的routingKey即可。
  • 工作模式
    • 多个消费端公平竞争同一个消息。
  • 发布订阅模式
    • 一次向多个消费者发送消息。
  • 路由模式
    • 根据特定的路由键转发消息。
  • 主题模式
    • 根据通配符,匹配路由键转发消息。
  • RPC模式
    • 生产端接收消费端发送的返回值。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK