3

开源基础软件社区

 2 years ago
source link: https://ost.51cto.com/posts/16975
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RabbitMQ流程 原创 精华


简单介绍一个消息推送到接收的流程,提供一个简单的图:

RabbitMQ流程-开源基础软件社区

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的,所以对应的消息推送/接收模式也会有以下几种:

Direct Exchange

定向交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个定向交换机上,同时赋予一个路由键 routing key 。

然后当一个消息携带着路由值为key,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值key去寻找绑定值也是key的队列。

Fanout Exchange

广播交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

Topic Exchange

通配符交换机,这个交换机其实跟定向交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

简单地介绍下规则:

*(星号) 用来表示一个单词 (必须出现的)

#(井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子

队列Q1 绑定键为 .Army.          队列Q2绑定键为  Army.#

如果一条消息携带的路由键为 H.Army.I,那么队列Q1将会收到;

如果一条消息携带的路由键为Army.HH.II,那么队列Q2将会收到;

通配符交换机是非常强大的,点解敢讲?

当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时通配符交换机就拥有的定向交换机的行为。

所以通配符交换机也就实现了广播交换机的功能,和定向交换机的功能。

另外还有 Header Exchange 头交换机,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,这几个就不做讲述,该篇简单讲一下Topic Exchange使用,下篇讲RabbitMQ Bridge项目,如何开发一个公用的RabbitMQ后台,灵活新增交换机,队列,消费类。

本次实例创建一个springboot项目,一个rabbitmq-provider(生产者),一个rabbitmq-consumer(消费者)。

pom.xml里用到的jar依赖:

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

application.yml:

server:
  port: 8088
spring:
  #给项目来个名字
  application:
    name: rabbitmq-bridge
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: harmony
    password: harmony123
    #虚拟host 可以不设置,使用server默认host
    virtual-host: HarmonyOS

里面的虚拟host配置项不是必须的,我自己在rabbitmq服务上创建了自己的虚拟host,所以我配置了;你们不创建,就不用加这个配置项。

那么怎么建一个单独的host呢? 假如我就是想给某个项目接入,使用一个单独host,顺便使用一个单独的账号,就好像我文中配置的 harmony 这样, 都在另一篇 RabbitMQ简单使用 有介绍过,可以移步过去。

1. 创建一个交换机,队列,路由Key关系的类,如下:


@Slf4j
@Configuration
public class TopicExchangeConfig {

    public final static String ARMY_KEY = "topic.army";
    public final static String HARMONY_KEY = "topic.harmony";
    public final static String TOPIC_EXCHANGE = "topicExchange";

    // 创建一个ARMY_KEY队列
    @Bean(ARMY_KEY)
    public Queue SampleTopicQueueArmy() {
        return new Queue(ARMY_KEY,true);
    }
    // 创建一个HARMONY_KEY队列
    @Bean(HARMONY_KEY)
    public Queue SampleTopicQueueHarmony() {
        return new Queue(HARMONY_KEY,true);
    }
    // 创建一个通配符交换机
    @Bean(TOPIC_EXCHANGE)
    public TopicExchange SampleTopicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE,true,false);
    }
    // ARMY_KEY队列与交换机绑定,并设置路由值为topic.army
    @Bean
    Binding bindingTopicArmy(@Qualifier(ARMY_KEY) Queue queue,
                                      @Qualifier(TOPIC_EXCHANGE) TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("topic.army");
    }
    // HARMONY_KEY队列与交换机绑定,并设置路由值为topic.#通配
    @Bean
    Binding bindingTopicHarmony(@Qualifier(HARMONY_KEY) Queue queue,
                             @Qualifier(TOPIC_EXCHANGE) TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("topic.#");
    }

}

2. 创建一个发送信息Controller,如下:


@RestController
@Slf4j
public class SenderController {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage1")
    public void sendMessage1() {
        log.info("----------------------sendMessage1----------------------------");
        log.info("发一条路由值为topic.army,交换机为topicExchange的信息");

        Map<String, Object> params = new HashMap<>();
        params.put("routerKey", "topic.army");

        // 发一条路由值为topic.army,交换机为topicExchange的信息
        rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.army", params);
    }


    @GetMapping("/sendMessage2")
    public void sendMessage2() {
        log.info("----------------------sendMessage2----------------------------");
        log.info("发一条路由值为topic.harmony,交换机为topicExchange的信息");

        Map<String, Object> params = new HashMap<>();
        params.put("routerKey", "topic.harmony");

        // 发一条路由值为topic.harmony,交换机为topicExchange
        rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic.harmony", params);
    }

}

3. 创建两个监听类,一个是监听topic.army队列, 一个是监听topic.harmony队列,如下:

@Slf4j
@Component
@RabbitListener(queues = "topic.army")
public class TopicArmyReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        log.info("TopicArmyReceiver消费者收到消息  : " + testMessage.toString());
    }
}
@Slf4j
@Component
@RabbitListener(queues = "topic.harmony")
public class TopicHarmonyReceiver {

    @RabbitHandler
    public void process(Map testMessage) {
        log.info("TopicHarmonyReceiver消费者收到消息  : " + testMessage.toString());
    }
}

4. 项目启动起来后,先调用/sendMessage1接口:

然后看控制台输出情况:
TopicArmyReceiver监听队列1,绑定键为:topic.army
TopicHarmonyReceiver监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.army
所以可以看到两个监听消费者receiver都成功消费到了消息,因为这两个recevier监听的队列的绑定键都能与这条消息携带的路由键匹配上。

RabbitMQ流程-开源基础软件社区

接下来调用/sendMessage2接口:
然后看控制台输出情况:
TopicArmyReceiver监听队列1,绑定键为:topic.army
TopicHarmonyReceiver监听队列2,绑定键为:topic.#
而当前推送的消息,携带的路由键为:topic.harmony
所以可以看到两个监听消费者只有TopicHarmonyReceiver成功消费到了消息。​

RabbitMQ流程-开源基础软件社区

5. 总结:一般情况下使用Topic Exchange就可以得到我们想要的结果,下一篇将完整开发一个MQBridge项目,从而部署后,可以使用到项目中去。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK