8

RabbitMQ延迟消息:死信队列 | 延迟插件 | 二合一用法+踩坑手记+最佳使用心得 - 福隆...

 2 years ago
source link: https://www.cnblogs.com/fulongyuanjushi/p/16535150.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前段时间写过一篇:

# RabbitMQ:消息丢失 | 消息重复 | 消息积压的原因+解决方案+网上学不到的使用心得

很多人加了我好友,说很喜欢这篇文章,也问了我一些问题。

因为最近工作比较忙,隔了一段时间没写,忙完后专门花时间把RabbitMQ剩下的一个重要技术点通过案例的方式整理出来,就是延迟消息的用法。

延迟消息含义不解释了,就是字面意思。

用法一共两种方式,死信队列和延迟插件,两种各有利弊,我会一一陈述并给出最佳用法。

死信队列方式

死信队列不要理解成很玄乎的东西,它就是普通队列绑定了死信交换机,而且配置参数还是固定的,无需动脑,作用的话你想象成回收站就好了,被拒绝或超时的消息就往这里边丢,然后还能继续被消费,就这么简单。

1、原理图解

4237caf329e440aeaf93544f67148071~tplv-k3u1fbpfcp-watermark.image?

2、引入MQ

0836a7113e40483fb1b398306eb3aa10~tplv-k3u1fbpfcp-watermark.image?

3、声明交换机和队列

声明普通交换机、队列、路由,这里我们声明两个预备延迟的队列,名称分别包含5s和15min,用来区分延迟消息是否达到预期效果。

我们接下来所有交换机和队列都是以Direct模式来创建的,也就是点对点方式,具体原因后面会讲。

另外,注意这里注释的延迟交换机、队列,都是为了特别说明,其实还是普通队列,参考上面的原理图解。

530470928c604f8eb231d5c42a041d01~tplv-k3u1fbpfcp-watermark.image?

创建交换机、队列、绑定关系。

c1eb460a242149388021ca2a7d76ebdc~tplv-k3u1fbpfcp-watermark.image?

聪明的小伙伴应该能发现,上面这段代码只有交换机和绑定队列的关系,却没有创建队列。

没错,接下来就是重点部分,创建队列时,要绑定死信交换机,这样就变成了一个死信队列。

可以看到,5s和15min的队列绑定的都是同一个死信交换机,只是路由规则、消息过期时间TTL不同。

这样,在项目启动后,RabbitMQ就会创建出两个具备不同过期时间的死信队列,后面会有截图专门给大家看。

79d9aab2d2444a99978c0f5509ce05af~tplv-k3u1fbpfcp-watermark.image?

绑定后的效果,在项目启动后RabbitMQ会把交换机和队列都创建出来,在控制台就能看到。

bf29de5dc56e4a1f8108c54621187eda~tplv-k3u1fbpfcp-watermark.image?

普通队列绑定死信交换机和对应的路由规则后,我们接下来就把死信交换机、路由规则、队列创建出来即可,其实和创建普通队列没区别。

ceb19456630d428a87611f0b363c8aa5~tplv-k3u1fbpfcp-watermark.image?

4、yml配置

为了演示方便,我们的生产者和消费者是写在同一个项目中的,所以配置文件没有区别。但是在线上环境中,为了解耦生产者和消费者往往是分开的。

1a832c8f68734c7395f496069b6fe7e4~tplv-k3u1fbpfcp-watermark.image?

这里可以发现,我们给RabbitMQ开启了消息确认机制,读过开头提过那篇文章的小伙伴应该知道,线上环境我们为了提高性能一般是不打开确认机制的,这里之所以打开,是为了演示消息的投递情况,同时也为了特别讲后面延迟插件会出现的一个问题。

5、创建生产者

这里加了诸如消息唯一ID、消息确认机制的写法,单纯为了展示给大家看,实际上你可以不加。

93a45628c4164285802bdd84fda43394~tplv-k3u1fbpfcp-watermark.image?

6、创建消费者

这里注意,监听的队列也就是我们前面声明的死信队列,因为过期的消息都通过绑定的死信交换机转发到了里面,如果对过程有疑惑,可以回到开头的图解那里对着图片来看。

4cd4848876ff4b21a61cd3c2dcfc6aee~tplv-k3u1fbpfcp-watermark.image?

7、测试接口

分别创建了5s延迟和15min延迟的测试接口

c04ac3d97b8c4794a39ed8a221e17049~tplv-k3u1fbpfcp-watermark.image?

e2d9f037071d418d80984e5cdd062c36~tplv-k3u1fbpfcp-watermark.image?

延迟插件方式

插件方式,要比死信队列方式简单得多,只需要安装插件,启动插件功能,然后创建延迟队列即可。

1、安装插件

这里给出源码安装方式和docker安装方式,大家根据各自情况自己选。

1)、源码安装

下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

本次演示下载的是3.8.0版本插件,它能兼容3.7.x和3.8.x的RabbitMQ

直通车:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

e6056833b2f24ddc8d797019b5af6352~tplv-k3u1fbpfcp-watermark.image?

提醒一下,要下载和RabbitMQ对应的版本,否则延迟队列不会生效。这里面有些版本是兼容低版本MQ的,可以点击进去具体查看支持的版本。

1f9a2256c0ba4a0dac205449da6dee92~tplv-k3u1fbpfcp-watermark.image?

将下载好的插件上传到RabbitMQ的plugins下:rabbitmq_server-3.7.24/plugins

然后开启延迟队列:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这样就可以开始愉快的使用啦,具体的操作和图示在下面的docker安装步骤里展示出来,这俩的操作没啥区别。

2)、docker安装

首先,还是一样的步骤,把下载好的插件上传到RabbitMQ服务器上,因为是docker方式,所以你还得把它上传到docker容器中。

复制文件到docker容器中:

进入容器查看是否上传成功

414117c4b2b9426088bd118a6c0b3b2f~tplv-k3u1fbpfcp-watermark.image?

开启延迟插件,开启成功后就是下面的提示效果。

5b3236b9afb84f1d8f59f03512f086a3~tplv-k3u1fbpfcp-watermark.image?

重启RabbitMQ,不重启不会生效,如果重启没效果,你可能需要kill进程再启动。

a3fe22b911204bf98b3600c4b279c48e~tplv-k3u1fbpfcp-watermark.image?

打开控制台界面,如果看到创建交换机的选项中有了x-delayed-message,就表示延迟插件安装和启动成功。

85d07021858147bf80526ec185309d30~tplv-k3u1fbpfcp-watermark.image?

2、yml配置

这里我专门画了红框,是因为这个配置在延迟插件中有一个作用,可以加也可以不加,后面的踩坑手记会单独解释。

e9705ecb3522430192ddba8f58266a55~tplv-k3u1fbpfcp-watermark.image?

3、声明交换机和队列

这里只需注意一点,交换机创建时要设置为延迟交换机,也就是setDelayed(true)

55853d1ab177480e84f6aa7dd9acaf2d~tplv-k3u1fbpfcp-watermark.image?

4、消息处理器工具类

我们通过MessagePostProcessor这个钩子来设置持久化模式和延迟时间,项目中可能多个地方会用到,所以单独抽取出来通过工具类获取对象。

9d57a405d5c54e2aadc6fcdb97f5fc0b~tplv-k3u1fbpfcp-watermark.image?

5、创建生产者

这个写法是固定的,可以直接复制到自己的项目中使用,延迟时间自己定义。

563f89a8a6e24c9992fc9a8db6cea74b~tplv-k3u1fbpfcp-watermark.image?

6、创建消费者

直接监听这个延迟队列即可,没有特别的地方。

bdb2b113c8b148e1bf9fbabe31937161~tplv-k3u1fbpfcp-watermark.image?

7、测试接口

这里我们测试这个6s延迟的消息是否成功

8882e1f4b566435bb612ee2175fcd206~tplv-k3u1fbpfcp-watermark.image?

可以看到,刚好6s后消费了消息,延迟效果没问题。

f6addff6c5384f8f9fa4a363afb85aba~tplv-k3u1fbpfcp-watermark.image?

1、死信队列坑点

其实死信队列的坑点比插件要少多了,但是死信队列没有插件那么简单直接好理解。

1)、原理一定要弄明白,否则你连交换机和队列怎么创建怎么绑定都不知道;

2)、绑定死信交换机时,x-dead-letter这些参数对应的值一定要写对,尤其是路由,写错了会导致过期消息不进入死信队列,找半天原因都找不到;

2、延迟插件坑点

延迟插件的坑点就真的多了,笔者用死信队列方式几乎是1次就成功,插件反倒折腾了我好几个回合。

1)、延迟插件的版本一定要下载对,和RabbitMQ本身版本对应,最好是点进去看下说明,一般都会告诉你兼容哪个版本,如果下载错了,我只能为你默哀;

2)、不要下载过高版本的RabbitMQ和插件,你可能会疯掉;

3)、开启插件后一定要重启RabbitMQ,否则不生效,如果重启也没生效,你可以尝试下kill掉MQ的进程,然后再启动;

4)、开启消息确认机制后,你会发现延迟插件很特殊的一个地方,就是每次投递消息都会进入returncallback回调中。

我着重来说明下最后一个坑点,也谈不上是坑点,只是你以后使用过程中很可能会产生疑惑。

还记得前文中yml文件画一个红框的配置吗,我们把那个参数注释掉看看效果。

5b7cf946d86b43538fcf093777ed3275~tplv-k3u1fbpfcp-watermark.image?

你会发现,一旦开启了消息确认机制,延迟消息每次都会先进入returncallback回调,然后才会投递成功,你们可以自己试一试,每次都会这样。

原因是什么呢?

这里就要提到延迟插件的原理了,从前文创建延迟交换机那里就可以看到,是给交换机设置了delayed:true,因为延迟消息实际上是挂载到交换机上,不会马上就通过路由投递出去

那么我们再来看看,上面这个图中returncallback回调打印出来的返回信息:replyText=NO_ROUTE,很明显了吧,说是没有路由,所以消息确认失败了,因为延迟插件没有马上通过路由投递。

那又有疑问了,没有马上投递,为什么会进入returncallback回调呢?

下面这张图是源码中的一段,告诉你了为什么。

56c085cbc5a6423f8325c343d9e0887e~tplv-k3u1fbpfcp-watermark.image?

因为有个mandatory参数,如果不配置它的话,它是为null的,当为null的时候,传递的值是this.properties.isPublisherReturns()

官方对mandatory的解释如下:

Enable mandatory messages. If a mandatory message cannot be routed to a queue by the server, it will return an unroutable message with a Return method.

翻译过来:

开启强制性的消息。如果强制消息不能被服务器路由到队列,它将使用return方法返回一个不可路由的消息。

所以,mandatory为true表示开启强制投递,为false表示不强制,而且这个值可以为null。

而我们前面yml文件中开启了消息确认机制:publisher-returns: true

所以,每次一定会走returncallback回调。

因此,我们要么不开启消息确认机制,要么就把mandatory设置为false,这样延迟插件的消息就不会每次走一遍回调了

最佳使用心得

1)、延迟插件方式更方便,但我建议首选死信队列方式,因为死信队列在RabbitMQ的使用中占比还挺重的,它不仅可以用在延迟消息,还可以用在其他很多地方,另外死信队列是必学的,并且拿来即用免去了安装插件带来的风险;

2)、死信队列在延迟消息这块其实是有隐形BUG的,它在多个延迟时间场景同时存在的情况下有先后执行顺序的问题,可能出现15min的延迟消息在前面,导致5s的延迟消息要等待15min的执行完了再执行,这是RabbitMQ本身的有序机制导致的,只对队尾消息判定,所以我们在使用时一定要做延迟消息隔离,不同延迟场景要分开处理;

3)、延迟插件就没有上述死信队列的问题,已经专门处理了复杂场景下的有序问题,所以一个项目在开始之初决定要使用RabbitMQ的时候,不管未来用不用得上,请务必就在安装时顺便也把延迟插件也装上,避免项目中期忽然想用时安装插件必须重启RabbitMQ带来的未知风险;

4)、大部分情况下,死信队列完全足够,但切记项目中不要频繁使用延迟队列,基本上用到的场景会很少,比如定时关单、定时释放锁资源等等,特定的对延迟时间准确性要求很高的场景才用,其他还是以分布式任务调度为主,延迟队列太多会引起不必要的认知混乱;

5)、我个人的经验,延迟消息的场景下,交换机最好使用Direct点对点模式,我们公司曾经出现过同事使用Topic模式,自以为路由规则匹配写的没问题,实际上导致规则冲突消息被其他消费者给消费掉了,MQ流转消息本身是静默处理的,所以他找死都找不到原因,直到后来被其他同事偶然发现才解决。所以对于这种单一场景最保险的方式还是Direct,一对一总不会有问题。

总结下来就以下几点:

1)、不管用不用,在安装RabbitMQ时就顺便把延迟插件也装上;

2)、推荐以死信队列方式为主;

3)、不要太多地方使用延迟队列;

4)、交换机模式使用Direct点对点。

最后,我会把本次案例的代码地址放在评论中,两种实现方式都有,可以直接运行起来,想要学习的可以下载来看看。

036ba551c8cf4a18ad6114b1db23b9ed~tplv-k3u1fbpfcp-watermark.image?

原创文章纯手打,觉得有一滴滴帮助就请举手之劳点个推荐吧~

持续分享工作中的真实经验和心得体会,喜欢的话就点个关注吧~

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK