9

一文讲透 RocketMQ 消费者是如何负载均衡的 - 勇哥编程游记

 1 year ago
source link: https://www.cnblogs.com/makemylife/p/17367118.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

集群消费同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

up-86169443c61dd3759dbb42303370c1d86cd.png

广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

up-5a6025f1a89369e9e4f83c24e3cba4e4ac0.png

我们重点讲解下集群消费的消费流程 ,因为集群消费是使用最普遍的消费模式,理解了集群消费,广播消费也就能顺理成章的掌握了。

up-e0cd5ee091baccad0968b0d560da9221f79.png

集群消费示例代码里,启动消费者,我们需要配置三个核心属性:消费组名订阅主题消息监听器,最后调用 start 方法启动。

消费者启动后,我们可以将整个流程简化成:

up-7e05362aed004822005a9960245c49197cf.png

4 负载均衡

消费端的负载均衡是指将 Broker 端中多个队列按照某种算法分配给同一个消费组中的不同消费者

负载均衡是每个客户端独立进行计算,那么何时触发呢 ?

up-68b2cafcdf5204730ef05ebdfc7f6973f7c.png
  • 消费端启动时,立即进行负载均衡;

  • 消费端定时任务每隔 20 秒触发负载均衡;

  • 消费者上下线,Broker 端通知消费者触发负载均衡。

负载均衡流程如下:

1、发送心跳

消费者启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(消息消费分组名称订阅关系集合消息通信模式客户端实例编号等信息)。

Broker 端在收到消费者的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 channelInfoTable 中,为之后做消费端的负载均衡提供可以依据的元数据信息。

2、启动负载均衡服务

下图展示了按照主题负载均衡的代码片段:

up-c1c301218e89de0128114e8b42c5aefb01e.png

负载均衡服务会根据消费模式为”广播模式”还是“集群模式”做不同的逻辑处理,这里主要来看下集群模式下的主要处理流程:

(1) 获取该主题下的消息消费队列集合;

(2) 查询 Broker 端获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列;

up-1958c74ffbe61d28fd726eb8b3dd699ccbb.png

这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range ,最后遍历整个 range 而计算出当前消费端应该分配到的记录。

(4) 分配到的消息队列集合与 processQueueTable 做一个过滤比对操作

up-e18c2856d2749894b20deeb78c22fb3521a.png

消费者实例内 ,processQueueTable 对象存储着当前负载均衡的队列 ,以及该队列的消费快照。

标红的部分表示与分配到的消息队列集合互不包含,则需要将这些红色队列 Dropped 属性为 true , 然后从 processQueueTable 对象中移除。

绿色的部分表示与分配到的消息队列集合的交集,processQueueTable 对象中已经存在该队列。

黄色的部分表示这些队列需要添加到 processQueueTable 对象中,创建这些队列的消费快照。最后创建拉取消息请求列表,并将请求分发到消息拉取服务,进入拉取消息环节。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK