22

深入理解 RocketMQ 消息查询机制

 4 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzA5MDA5Njk0NQ%3D%3D&%3Bmid=2456618949&%3Bidx=1&%3Bsn=b10afe7e39a894f4a2c43a4a3fc7bd9e&%3Bchksm=8789739bb0fefa8d718e6a0df8ab3fb517d61f65e2cee5f9690b36701117a1a31a854110a120&%3Btoken=821287329&%3Blang=zh_CN
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

miyU7zZ.png!web

在实际开发中,经常需要查看MQ中消息的内容, RocketMQ提供了多种消息查询方式,给开发和运维带来了极大的便利,一些其他消息中间件,如Kafka,并不具备消息查询能力。

本文对RocketMQ提供到的查询机制和背后原理进行深入的介绍。 文章主要包括3个部分:

  • 消息查询介绍: 介绍消息查询中使用到的 Message Key Unique Key Message Id 的区别

  • 消息查询工具: 分别介绍命令行工具、管理平台、客户端API这三种工具的详细用法,以及如何让消费者重新消费特定的消息。

  • 实现原理: 介绍Message Key & Unique Key与Message Id的实现机制上区别,Unique Key在精确一次消费(Exactly Once)语义下的作用,以及为什么Message Id查询效率更高。

1 消息查询介绍

RocketMQ提供了3种消息查询方式:            

  • 按照Message Key 查询: 消息的key是业务开发同学在发送消息之前自行指定的,通常会把具有业务含义,区分度高的字段作为消息的key,如用户id,订单id等。

  • 按照Unique Key查询: 除了业务开发同学明确的指定消息中的key,RocketMQ生产者客户端在发送发送消息之前,会自动生成一个UNIQ_KEY,设置到消息的属性中, 从逻辑上唯一代表一条消息

  • 按照Message Id 查询: Message Id 是消息发送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,并会将Message Id作为发送结果的一部分进行返回。Message Id中属于精确匹配, 从物理上唯一代表一条消息 ,查询效率更高。

RocketMQ有意弱化Unique Key与Message Id的区别,有时都称之为Message Id 。在通过RocketMQ的命令行工具或管理平台进行查询时,二者可以通用。在根据Unique Key进行查询时,本身是有可能查询到多条消息的,但是查询工具会进行过滤,只会返回一条消息。 种种情况导致很多RocketMQ的用户,并未能很好对二者进行区分

业务开发同学在使用RocketMQ时,应该养成良好的习惯,在发送/消费消息时,将这些信息记录下来,通常是记录到日志文件中,以便在出现问题时进行排查。

以生产者在发送消息为例,通常由以下3步组成:

mU3IZfy.png!web

第1步: 构建消息

构建消息对象Message,在这里我们通过setKeys方法设置消息的key,如果有多个key可以使用空格" "进行分割

第2步: 发送消息

发送消息,会返回一个SendResult对象表示消息发送结果。

第3步: 打印发送结果

结果中包含Unique Key和Message Id,如下所示:

3AvMBfr.png!web

其中:

  • sendStatus: 表示消息发送结果的状态       

  • msgId: 注意这里的命名虽然是msgId,但实际上其是Unique Key

  • offsetMsgId: Broker返回的Message ID 。 在后文中,未进行特殊说明的情况下,Message ID总是表示offsetMsgId

  • messageQueue: 消息发送到了哪个的队列,如上图显示发送到broker-a的第0个的队列 

  • queueOffset: 消息在队列中的偏移量,每次发送到一个队列时,offset+1

事实上,用户主动设置的Key以及客户端自动生成的Unique Key,最终都会设置到Message对象的properties属性中,如下图所示:

3iAfAj3.png!web

其中:

  • KEYS: 表示用户通过setKeys方法设置的消息key,

  • UNIQ_KEY: 表示消息发送之前由RocketMQ客户端自动生成的Unique Key。 细心的读者发现了其值与上述打印SendResult结果中的msgId字段的值是一样的,这验证了前面所说的msgId表示的实际上就是Unique Key的说法。

在了解如何主动设置Key,以及如何获取自动生成的Unique Key和Message Id后,就可以利用一些工具来进行查询。

2 消息查询工具

RocketMQ提供了3种方式来根据Message Key、Unique Key、Message Id来查询消息,包括:

  • 命令行工具: 主要是运维同学使用

  • 管理平台: 运维和开发同学都可以使用

  • 客户端API: 主要是开发同学使用

这些工具除了可以查询某条消息的内容, 还支持将查询到的历史消息让消费者重新进行消费 ,下面分别进行讲述。

2.1 命令行工具

RocketMQ自带的mqadmin命令行工具提供了一些子命令,用于查询消息,如下:

Mzya6rR.png!web

此外,还有一个queryMsgByOffset子命令,不在本文讲述范畴内

2.1.1 按照Message Key查询

mqadmin工具的queryMsgByKey子命令提供了根据key进行查询消息的功能。 注意,由于一个key可能对应多条消息,查询结果只会展示出这些消息对应的Unique Key,需要根据Unique Key再次进行查询。

queryMsgByKey子命令使用方法如下所示:

emEzUnJ.png!web

例如,要查询在TopicA中,key为Key1的消息

A3Y7VbA.png!web

这里,输出结果中包含了4条记录。 其中:

  • Message ID列: 这里这一列的名字显示有问题,实际上其代表的是Unique Key

  • QID列: 表示队列的ID

  • Offset: 消息在在队列中的偏移量

在查询到Unique Key之后,我们就可以使用另外一个命令: queryMsgByUniqueKey,来查询消息的具体内容。

2.1.2 按照Unique Key查询

mqadmin工具的queryMsgByUniqueKey的子命令有2个功能:

  • 根据Unique Key查询消息,并展示结果

  • 让消费者重新消费Unique Key对应的消息 

我们将分别进行讲述。 queryMsgByUniqueKey子命令的使用方式如下:

yIBvA3y.jpg!web

这里对-i 参数进行下特殊说明,其即可接受Unique Key,即SendResult中的msgId字段; 也可以接受Message Id,即SendResult中的offsetMsgId字段。

根据Unique Key查询消息:

2umqUvj.jpg!web

对于消息体的内容,会存储到 Message Body Path 字段指定到的路径中。

可通过cat命令查看(仅适用于消息体是字符串):

$ cat /tmp/rocketmq/msgbodys/C0A80103511B18B4AAC24296D2CB0000

message body

指定消费者重新消费:

queryMsgByUniqueKey子命令还接收另外两个参数: -g参数用于指定消费者组名称,-d参数指定消费者client id。 指定了这两个参数之后,消息将由消费者直接消费,而不是打印在控制台上。

首先,通过consumerStatus命令,查询出消费者组下的client id信息,如:

e2e6buU.png!web

这里显示了消费者组group_X下面只有一个消费者,client id为192.168.1.3@26868。

接着我们可以在queryMsgByUniqueKey子命令中,添加-g和-d参数,如下所示:

bmieA3a.png!web

以看到,这里并没有打印出消息内容,取而代之的是消息消费的结果。 在内部,主要是分为3个步骤来完成让指定消费者来消费这条消息,如下图所示:

nIBjyay.png!web

第1步:

命令行工具给所有Broker发起 QUERY_MESSAGE 请求查询消息,因为并不知道UNIQ_KEY这条消息在哪个Broker上,且最多只会返回一条消息,如果超过1条其他会过滤掉;如果查询不到就直接报错。

第2步:

根据消息中包含了Store Host信息,也就是消息存储在哪个Broker上,接来下命令行工具会直接给这个Broker发起 CONSUME_MESSAGE_DIRECTLY 请求,这个请求会携带msgId,group和client id的信息

第3步:

Broker接收到这个请求,查询出消息内容后,主动给消费者发送 CONSUME_MESSAGE_DIRECTLY 通知请求,注意虽然与第2步使用了同一个请求码,但不同的是这个请求中包含了消息体的内容,消费者可直接处理。注意:这里并不是将消息重新发送到Topic中,否则订阅这个Topic的所有消费者组,都会重新消费这条消息。

2.1.3 根据Message Id查询

前面讲解生产者发送消息后,返回的SendResult对象包含一个offsetMsgId字段,这也就是我们常规意义上所说的Message Id,我们也可以根据这个字段来查询消息。

根据Message Id查询使用queryMsgById子命令,这个命令有3个作用:

  • 根据Message Id查询消息

  • 通知指定消费者重新消费这条消息,与queryMsgByUniqueKey类似,这里不再介绍

  • 将消息重新发送到Topic中,所有消费者组都将重新消费 

queryMsgById子命令用法如下所示:

2I3An2N.jpg!web

参数说明如下:

-d和-g参数: 类似于queryMsgById命令,用于将消息发送给某个消费者进行重新消费

-i 参数: 指定Message Id,即SendResult对象的offsetMsgId字段,多个值使用逗号","分割。

-s参数: 是否重新发送消息到Topic。 如果同时指定了-d和-g参数,-s参数不生效。

根据Message Id查询消息:

下图根据SendResult的offsetMsgId字段,作为-i参数,来查询一条消息:

640?wx_fmt=png

与queryMsgByUniqueKey子命令输出基本类似,主要是在输出开头多出了OffsetID字段,即offsetMsgId。 需要注意的是,queryMsgById不能接受Unqiue Key作为查询参数。

重新发送消息到topic:

在指定-s参数后,消息将重新发送到topic ,如下(输出进行了格式化):

zQnU7nV.jpg!web

可以看到,这里因为消息是重新发送到了Topic中,因此与我们之前使用生产者发送消息一样,输出的是一个SendResult。 在这种情况下,订阅这个Topic的所有消费者组都会重新消费到这条消息。

在实际开发中,如果多个消费者组订阅了某个Topic的消息,如果所有的消费者都希望重新消费,那么就应该使用-s参数。如果只是某个消费者希望重新消费,那么应该指定-g和-d参数。

另外,我们看到发送前打印的originalMsgId和发送后SendResult中的offsetMsgId值并不一样,这是因为消息发送到Topic重新进行了存储,因此值不相同。 这也是为什么我们说Message Id可以唯一对应一条消息的原因。

而输出的SendResult结果中的msgId,即Unique Key,并没有发生变化,因此尽管名字是Unique Key,但是实际上还是有可能对应多条消息的。 而前面根据queryMsgByUniqueKey查询之所以只有一条消息,实际上是进行了过滤。

2.2 管理平台

RocketMQ提供的命令行工具,虽然功能强大,一般是运维同学使用较多。 通过RocketMQ提供的管理平台进来行消息查询,则对业务开发同学更加友好。

在管理平台的 消息 一栏,有3个TAB,分别用于:根据Topic时间范围查询、Message Key查询、Message Id查询,下面分别进行介绍。

根据Topic时间范围查询:

按 Topic 查询属于范围查询,不推荐使用,因为时间范围内消息很多,不具备区分度。 查询时,尽可能设置最为精确的时间区间,以便缩小查询范围,提高速度。 最多返回2000条数据。

jeE773U.jpg!web

根据Message Key查询:

按 Message Key 查询属于模糊查询,仅适用于没有记录 Message ID 但是设置了具有区分度的 Message Key的情况。 目前,根据Message Key查询,有一个很大局限性: 不能指定时间范围,且最多返回64条数据。 如果用户指定的key重复率比较高的话,就有可能搜不到。

ri63Ivi.jpg!web

根据Message Id查询:

按 Message ID 查询属于精确查询,速度快,精确匹配,只会返回一条结果,推荐使用。 在这里,传入Unique Key,offsetMsgId都可以。

查看消息详情:

在按照Topic 时间范围查询,按照Message Key查询,结果列表有一个 Message Detail 按钮,点击可以看到消息详情:包括消息key、tag、生成时间,消息体内容等。在详情页面,也可以将消息直接发送给某个消费者组进行重新消费。

需要注意的是,在消息体展示的时候,只能将消息体转换成字符串进行展示,如果消息的内容是protobuf、thrift、hessian编码的,那么将显示一堆乱码。 如果公司内部有统一的IDL/Schema管理平台,则可以解决这个问题,通过为每个Topic关联一个IDL,在消息展示时,可以根据IDL反序列化后在进行展示

2.3 客户端API

除了通过命令行工具和管理平台,还可以通过客户端API的方式来进行查询,这其实是最本质的方式,命令行工具和管理平台的查询功能都是基于此实现。

MQAdmin接口中,定义了以下几个方法用于消息查询:

fEvEneR.jpg!web

对于MQAdmin接口,可能部分同学比较陌生。 不过我们常用的DefaultMQProducer、DefaultMQPushConsumer等,都实现了此接口,因此都具备消息查询的能力,如下所示:

yQf2Enz.jpg!web

对于命令行工具,底层实际上是基于MQAdminExt接口的实现来完成的。

细心的读者会问,相同的查询功能在在多处实现是不是太麻烦了?事实上,这只是对外暴露的接口,在内部,实际上都是基于 MQAdminImpl 这个类来完成的。

viewMessage方法:

两种viewMessage方法重载形式,都只会返回单条消息。 下面以生产者搜索为例,讲解如何使用API进行查询:

//根据UniqueKey查询

String uniqueKey = "C0A8010354C418B4AAC242A281360000";

MessageExt msg = producer.viewMessage("TopicA", uniqueKey);


//打印结果:这里仅输出Unique Key与offsetMsgId

MessageClientExt msgExt= (MessageClientExt) msg;

System.out.println("Unique Key:"+msgExt.getMsgId()

+"\noffsetMsgId:"+msgExt.getOffsetMsgId());

输出结果如下:

Unique Key:C0A8010354C418B4AAC242A281360000

offsetMsgId:C0A8010300002A9F000000000007BF94

如果我们把offsetMsgId当做方法参数传入,也可以查询到相同的结果。 这是因为,在方法内部实际上是分两步进行查询的:

  • 首先,先把参数当做offsetMsgId,即Message Id进行查询

  • 如果失败,再尝试当做Unique Key进行查询。

源码如下所示:

DefaultMQProducer#viewMessage(String,String)

2uqi6fe.jpg!web

前面提到,Unique Key只是从逻辑上代表一条消息,实际上在Broker端可能存储了多条, 因此在当做Unique Key进行查询时,会进行过滤,只取其中一条。 源码如下所示:

MQAd minImpl#queryMessageByUniqKey

BBbqmuF.jpg!web

我们也可以通过另外只接收一个参数的viewMessage方法进行查询,但是需要注意的是,参数只能是offsetMsgId,不能是Unique Key。

String offsetMsgId = "C0A8010300002A9F000000000007BF94";

producer.viewMessage(offsetMsgId);

queryMessage方法:

其是根据消息Key进行查询,这里不再介绍API如何使用。 则与前面两种viewMessage方法重载不同,其返回的是一个QueryResult对象,包含了多条消息。

主要是注意这个方法接收时间范围参数,相比较于管理平台更加灵活。 管理平台按照消息Key查询,默认最多返回64条消息,且不能支持指定时间范围,如果消息Key重复度较高,那么可能有些消息搜索不到。 如果是在指定时间范围内返回64条消息,如果没有发现想找到的消息,再选择其他时间范围,则可以规避这个问题。

3 实现原理

Unqiue Key & Message Key都需要利用RocketMQ的哈希索引机制来完成消息查询,由于建立索引有一定的开销,因此Broker端提供了相关配置项来控制是否开启索引。 关于RocketMQ索引机制将在后面的文章进行详细的介绍。

Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息,可以精确匹配一条消息,查询消息更好。 下面分别介绍 Unqiue Key & Message Id的生成和作用。

3.1 Unique Key生成与作用

3.1.1 Unique Key生成

Unique Key是生产者发送消息之前,由RocketMQ 客户端自动生成的,具体来说,RocketMQ发送消息之前,最终都要通过以下方法:

DefaultMQProducerImpl#sendKernelImpl

a67biqz.jpg!web

如上所示,如果不是批量消息,会通过MessageClientIDSetter的setUniqID方法

为消息设 Uniq ue key,该方法实现如下所示:

MessageClientIDSetter#setUniqID

uqMfInV.png!web

如果消息的Unique Key属性为null,就通过 createUniqID() 方法为消息创建一个新的Unique Key,并设置到消息属性中。之所以要判断Unique Key是否为null与其作用有关。

3.1.2 Unique Key作用

了解Unique Key的作用对于我们理解消息重复的原因有很大的帮助 。RocketMQ并不保证消息投递过程中的Exactly Once语义,即消息只会被精确消费一次,需要消费者自己做幂等。而通常导致消息重复消费的原因,主要包括:

导致生产者对于无序消息发送重复的原因可能是: 一条消息已被成功发送到服务端并完成持久化,由于网络超时此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败,此时生产者将再次尝试发送消息。

在重试发送时,sendKernelImpl会被重复调用,意味着setUniqID方法会被重复调用,不过由于setUniqID方法实现中进行判空处理,因此重复设置Unique Key。在这种情况下,消费者后续会收到两条内容相同并且 Unique Key 也相同的消息(offsetMsgId不同,因为对Broker来说存储了多次)

那么消费者如何判断,消费重复是因为重复发送还是Rebalance导致的重复消费呢?

消费者实现MessageListener接口监听到的消息类型是MessageExt,可以将其强制转换为MessageClientExt,之后调用getMsgId方法获取Unique Key,调用getOffsetMsgId获得Message Id。 如果多消息的Unique Key相同,但是offsetMsgId不同,则有可能是因为重复发送导致。

3.1.3 批量发送模式下的Unique Key

DefaultMQProducer提供了批量发送消息的接口:

public SendResult send(Collection<Message> msgs)

在内部,这批消息首先会被构建成一个 MessageBatch 对象。在前面sendKernelImpl 法中我 们也看到了,对于MessageBatch对象,并不会设置Unique Key。这是因为在将批量消息转换成MessageBatch时,已经设置过了。

可能有一部分同学会误以为一个批量消息中每条消息Unique Key是相同的,其实不然,每条消息Unique Key都不同。

这里通过一个批量发送案例进行说明:

//构建批量消息

ArrayList<Message> msgs = new ArrayList<>();

Message msg1 = new Message("Topic_S",("message1").getBytes());

Message msg2 = new Message("Topic_S",("message2").getBytes());

msgs.add(msg1);

msgs.add(msg2);


//发送

SendResult result = producer.send(msgs);

//打印


System.out.println(result);

输出如下所示:

aMfmean.jpg!web

可以看到,此时输出的msgId(即Unique Key)和offsetMsgId都会包含多个值。

户端给批量消息中每条消息设置不同的Unqiue Key, 可参考DefaultMQProducer#batch方法源码。

3.2 Message Id生成

SendResult中的offsetMsgId,即常规意义上我们所说的Message Id是在Broker端生成的,用于唯一标识一条消息,在根据Message Id查询的情况下,最多只能查询到一条消息。

Message Id总共 16 字节,包含消息存储主机地址,消息 Commit Log offset。 如下图所示:

rI3EZzj.png!web

注:这里是《rocketmq develop guide》中的图重新画了一下,这是绝大部分用户未能很好的区分Message Id与Unique Key的根本原因,因为称Broker返回的offsetMsgId为Message Id,而在很多其他地方又进行了混用。

RocketMQ内部通过一个MessageId对象进行表示:

public class MessageId {

private SocketAddress address; //broker地址

private long offset; //commit log offset

并提供了一个MessageDecoder对象来创建或者解码MessageId。

public static String createMessageId(final ByteBuffer input,

final ByteBuffer addr, final long offset)

public static MessageId decodeMessageId(final String msgId)

Broker端在顺序存储消息时,首先会通过createMessageId方法创建msgId。 源码如下所示:

CommitLog.DefaultAppendMessageCallback#doAppend

yYj63ai.jpg!web

而客户端在根据msgId向Broker查询消息时,首先会将通过MessageDecoder的decodeMessageId方法,之后直接向这个broker进行查询指定位置的消息。

参见: MQAdminImpl#viewMessage

6jaEju3.jpg!web

由于根据Message Id进行查询,实际上是直接从特定Broker的CommitLog中的指定位置进行查询的,属于精确匹配,并不像用户设置的key,或者Unique Key那么样,需要使用到哈希索引机制,因此效率很高。

4 总结

  • RocketMQ提供了3种消息查询方式: Message Key & Unique Key & Message Id

  • RocketMQ提供了3种消息查询工具: 命令行、管理平台、客户端API,且支持将查询到让特定/所有消费者组重新消费

  • RocketMQ有意对用户屏蔽Unique Key & Message Id区别,很多地方二者可以通用

  • Message Key & Unique Key 需要使用到哈希索引机制,有额外的索引维护成本

  • Message Id由Broker和commit log offset组成,属于精确匹配,查询效率更高

长按二维码,关注我,加好友,进群交流

En6nqub.jpg!web

往期精彩

消息中间件的四种投递模式对比

RocketMQ NameServer深入剖析

深入理解RocketMQ Rebalance机制

数据库中间件详解

异地多活场景下的数据同步之道

mysql binlog应用场景与原理深度剖析

InnoDB MVCC 机制

深入理解数据库编程中的超时设置

可靠消息一致性的奇淫技巧

详解HTTP 与TCP中Keep-Alive机制的区别

TCP粘包、拆包与通信协议详解

剖析Spring多数据源


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK