5

解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

 3 years ago
source link: https://www.skypyb.com/2020/01/jishu/1323/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)

上一篇文章:  RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗? 里最终得出的结论:

RabbitMQ 死信机制可以作为延时任务这个场景的解决方案

但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景  下的解决方案。

如果说,果真我的业务就是有这个需求呢?

既需要延时触发、也可以满足延时时间不定长

一、解决方案

本身 RabbitMQ 没有这种功能,不过仍然可以使用 RabbitMQ 解决这个场景。

那就是使用插件大法。这也应该是使用 RabbitMQ 时,除了管控台插件外用的最多的一个插件。

需要用到的插件就是这个: rabbitmq_delayed_message_exchange 插件

见名思意,延时消息交换机;  对,他的实现方式已经和队列已经无关了。

这个插件启用后的作用是在原来的 direct、topic、fanout 等这些 exchange 基础上,又新加了一个 exchange 。这个 exchange 的类型是 x-delayed-message

只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时

二、如何安装插件

之前安装 RabbitMQ 的时候,那是真的搞了我一段时间。可以看这篇文章 –> RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)

现在都 0202 年了,还这么搞就太挫了。其实早在N久之前我电脑上的 RabbitMQ 就已经改成了 docker 运行。

那么就在这次插件安装过程中来顺便说一下。

首先 docker 安装 RabbitMQ 很简单,这是安装并运行 RabbitMQ3.7.7 的命令

还自带了管控台插件,只需配置好端口映射/文件夹映射,并设置下默认账号密码就完事了

docker run --name rabbitmq -dt\
  -v /opt/dockerdata/rabbitmq:/var/lib/rabbitmq\
  -p 5672:5672 -p 15672:15672\
  --hostname rabbit\
  -e RABBITMQ_DEFAULT_VHOST=/\
  -e RABBITMQ_DEFAULT_USER=admin\
  -e RABBITMQ_DEFAULT_PASS=614\
  rabbitmq:3.7.7-management

好,只要有了RabbitMQ 接下来就只需要安装插件并启用了。

这是插件的github地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

先将兼容版本的插件下载到本地, 然后复制进docker容器内执行 rabbitmq-plugins 命令启用就OK了

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
rm rabbitmq_delayed_message_exchange-3.8.0.ez
docker exec -it rabbitmq bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

三、在项目中使用

那么现在万事俱备了,就只需要在项目中实际的使用起来,测试一下是否真能达到想要的效果。

下面的代码都会排除干扰项,删掉之前写的所有和本次无关的配置,以便于阅读

3.1、 首先,当然还是先创建绑定关系。

* Rabbitmq的绑定配置,绑定Exchange、MQ、RoutingKey
* Create by skypyb on 2019-11-16
@Configuration
public class RabbitBindConfig {
    public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange";
    public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue";
    public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay";
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //自定义交换机
        return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args);
    @Bean
    public Queue delayQueue() {
        return new Queue(SKYPYB_DELAY_QUEUE, false, false, true);
    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs();

值得注意的是,在使用插件给我们带来的新的延迟交换机时, 由于 SpringAMQP 中并没有内置这种模型,所以需要创建 CustomExchange,也就是自定义交换机。

并且需要设置 CustomExchange 的类型为 x-delayed-message

至于队列和绑定关系的设置,该怎么配就怎么配。

在创建绑定关系时,最终需要调用一下 noargs() 方法,BindingBuilder 在绑定 CustomExchange 时 with() 方法返回值并不会是 Binding 类。

3.2、消费者编写

在绑定关系创建完毕之后,对应的消费者也是需要的。

其实这个消费者没有任何特殊的地方,毕竟使用了此插件也只是交换机和发消息时要做出改变,队列本身是没有变化的。

@RabbitListener(queues = {RabbitBindConfig.SKYPYB_DELAY_QUEUE})
@Component
public class DelayReceiver {
    private Logger logger = LoggerFactory.getLogger(DelayReceiver.class);
    @RabbitHandler
    public void onDelayMessage(@Payload String message,
                              @Headers Map<String, Object> headers,
                              Channel channel) throws IOException {
        logger.info("监听延时交换机, 收到消息: {}", message);
        //delivery tag可以从headers中get出来
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED);
            channel.basicNack(deliveryTag, false, !redelivered);

3.3、测试延时消息发送

那么现在是 “真” 万事俱备。

写个测试类,来往指定的交换机里发送消息。这里当然是向我们刚创建的延时交换机发消息啦。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitmqTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class);
    @Test
    public void testDelay() {
        rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-5s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(5000);
                    return msg;
        rabbitTemplate.convertAndSend(
                RabbitBindConfig.SKYPYB_DELAY_EXCHANGE,
                RabbitBindConfig.SKYPYB_DELAY_KEY,
                "消息体-3s",
                (msg) -> {
                    msg.getMessageProperties().setDelay(3000);
                    return msg;
        logger.info("-----消息发送完毕-----");

在发送消息的地方,也是需要做出处理的。

可以通过以下方法来设置消息的 Header。来达到指定延时时间的目的。

message.getMessageProperties().setHeader("x-delay",3000);

但是有一点很奇妙的是 SpringAMQP 他居然自己集成了对应的API (那为啥不集成延时交换机的API? )

所以可以通过这个方式来设置延时时间:

message.getMessageProperties().setDelay(3000);

最后,代码均编写完毕。

启动消费者服务用以监听队列,然后启动测试类观察消息投放,

最终控制台打印:

2020-01-18 12:26:28.808 INFO 24592 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–

2020-01-18 12:26:31.827 INFO 22844 — [cTaskExecutor-1] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-3s

2020-01-18 12:26:33.813 INFO 22844 — [cTaskExecutor-3] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-5s

可以看到其完美的符合了需求。

延时任务这个场景具体的解决方案也就差不多到这了。

死信机制除了比较复杂的延时场景以外,其实也可以满足大多数需求。

那么若是遇到了死信也解决不了的延时场景,RabbitMQ 本身的机制无法实现的话,那么我们可以靠插件来实现对应的需求。

确确实实,RabbitMQ 的这个延时交换机插件还是有点东西的,也难怪 Spring 给他集成了对应的 API。

看了这两篇文章的人,以后若是遇到对应的场景,该用什么就不用我多说了吧 (


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK