4

Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析 - Cuzzz

 8 months ago
source link: https://www.cnblogs.com/cuzzz/p/17966672
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

系列文章目录和关于我

零丶引入#

《Rocketmq学习1——Rocketmq架构&消息存储&刷盘机制》中我们学习了rocketmq的架构,以及消息存储设计,在此消息存储设计之上,rocketmq提供了诸如:延时消息、事务消息、消息过滤、消息回溯等高级特性。这一篇将对这些高级特性的原理进行浅显地学习。

这一篇不会展示这些高级特性怎么使用,如何使用可用查看rocketmq-example源码

一丶消息过滤#

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,在kafka中,如果想实现消息过滤,需要消费者拿到消息后,反序列化消息识别其中的tag进行过滤。

但是RocketMQ是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正是基于这个字段值的。

2605549-20240115235323880-1506994275.png

主要支持如下2种的过滤方式
(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费

image-20240115223750553

如上是tag消息过滤的大致逻辑,可用看到最终还是从commitLog中根据偏移量获取消息,那么为什么rocketmq不解析一下消息内容,再次根据tag字符串进行比较昵?

这是因为这里使用了MappedByteBuffer避免将整个CommitLog读取到内存中,如果试图将消息读取到内存中,比较tag的话,maybe出现磁盘IO和内核态和用户态的切换(如果这个消息没有被预先加载到物理内存中,操作系统会触发一个缺页中断,这时候会从用户态切换到内核态,从磁盘上读取消息,然后加载到物理内容,然后再从内核态切换到用户态)

image-20240115224755137

(2) SQL92的过滤方式:这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

image-20240115225806895

大致原理是,根据消息属性中获取序列化的布隆过滤器数据,如果布隆过滤器表示不符合那么肯定是不符合,如果符合那么需要进一步进行过滤。

二丶事务消息#

1.事务消息大致流程#

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

2605549-20240115230354837-1991283794.png

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

  1. 事务消息发送及提交:

    1. 发送消息(half消息):这一阶段的消息对消费者来说是不可见的,RocketMQ事务消息是这样实现half消息不可见的:

      如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

      这里可看到生产者组的作用:如果生产者服务器A和B是一个生产者组,生产者A挂了,rocketmq会请求生产者B来回程事务提交状态

    2. 服务端响应消息写入结果。

    3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

    4. 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

  2. 补偿流程:补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况

    1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
    2. Producer收到回查消息,检查回查消息对应的本地事务的状态
    3. 根据本地事务状态,重新Commit或者Rollback

可用看到rocketmq通过主动会查实现最终一致性,但是不会无限制的重试下去,默认回查15次,如果15次回查还 是无法得知事务状态,rocketmq默认回滚该消息。

如下如果发送事务消息,那么会在消息中标记是一个事务消息

image-20240115231511189

在Broker端,如果根据此字段可得知是否时事务消息,如果是,那么会有存储为half消息

image-20240115231929395

如上,可看到如果是事务消息会备份原topic,然后替换为事务topic,然后使用Store进行存储。

2.Commit和Rollback操作以及Op消息的引入#

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。

3.Op消息的存储和对应关系#

RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。

2605549-20240115233333256-1700275924.png

4.Half消息的索引构建#

在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

5.如何处理二阶段失败的消息?#

如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。

image-20240115233603884

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。

三丶延迟消息#

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。基本实现方式和事务消息类似

broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意, messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可: msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟 的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

image-20240115234446106

如下是rocketmq基于调度线程池,实现定时任务处理延迟消息

image-20240115234933476

#


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK