14

跟我学RocketMQ之事务消息存储源码解析

 3 years ago
source link: http://wuwenliang.net/2019/08/31/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E4%BA%8B%E5%8A%A1%E6%B6%88%E6%81%AF%E5%AD%98%E5%82%A8%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我们接着对RocketMQ的事务消息的存储阶段源码进行解析。

事务消息正式发送阶段

首先接着上文,介绍一下事务消息正式发送阶段。

在DefaultMQProducerlmpl.sendKernelImpl方法中设置消息类型为事务消息:

final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

如果消息类型的确是事务消息,则设置sysFlag为事务消息标识== 0x1 << 2。方便broker对消息进行识别。

broker存储事务消息

broker端收到消息后,对消息进行处理,如果消息类型为事务半消息 (prepare消息)则执行半消息存储方法prepareMessage,否则按照普通消息进行处理(普通消息存储执行putMessage方法)。

具体逻辑如下:

[SendMessagePocessor.sendMessage]
// 解码消息发送请求头中属性为Map
Map<String, String> oriProps = MessageDecoder
    .string2messageProperties(requestHeader.getProperties());
// 获取属性PROPERTY_TRANSACTION_PREPARED(TRAN_MSG)的值,这个属性在客户端进行事务消息发送时设置
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 如果traFlag不为空且true,说明是事务消息

if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    // 如果broker不允许接受事务消息则响应“broker拒绝接受事务消息”,默认为允许接受
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    // 执行事务消息存储
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    // 执行普通消息存储
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}   

总结一下:

  1. 对请求头requestHeader的属性值解码为Map,读取其中的事务消息属性,判断是否为true
  2. 如果是事务消息则判断broker是否能够接受事务消息。

broker可以通过配置属性rejectTransactionMessage为true/false来决定是否能够接受事务消息请求,默认为false即允许接受事务消息。

prepareMessage

我们进入prepareMessage方法查询具体的事务消息存储逻辑。

[TransactionalMessageServiceImpl.prepareMessage]
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.putHalfMessage(messageInner);
}

继续查看transactionalMessageBridge.putHalfMessage(messageInner);

MessageExtBrokerInner对象为将请求RemotingCommand转换后的broker端对消息的封装实体。

[TransactionalMessageBridge.putHalfMessage]
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    // 备份消息原主题
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    // 备份消息原队列id
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    // 重置sysFlag值为
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // 设置topic为RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    // 设置队列id为0
    msgInner.setQueueId(0);
    // 转存消息属性Map为字符串形式
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

这里是RocektMQ对事务消息处理过程的一个巧妙之处。

RocketMQ对事务消息进行了主题更换操作,备份了原先的topic、队列id之后,将事务消息的topic统一更换为 RMQ_SYS_TRANS_HALF_TOPIC,队列id统一更换为0。

通过store.putMessage(parseHalfMessageInner(messageInner)); 对消息进行了存储,这里可以看到,对事务消息进行真正的存储的时候是按照普通消息进行的。但此时topic及队列id已经更换为事务消息的topic及队列id。

通过这个操作,使得事务消息在提交之前不会被消费者消费到。

RocektMQ会通过定时任务起线程去消费该事务topic下的消息,当消息满足提交条件,则将该消息的主题和队列id进行恢复(之前已经备份过),最终会被消息的消费者消费到,这个思路在定时消息的实现上也用到了。

我们能够发现,事务消息最终落盘其实还是按照普通消息的方式落盘,区别只是对topic和队列id进行了变换,以便该事务消息在提交之前不会被消费者消费到,借此保证消息的提交与回滚与本地事务的提交与回滚是同时成功、同时失败的。

关于消息的持久化(消息落盘)的具体过程我们在后续的消息存储源码分析中会专门说到,此处就简单的看一下,不进行展开了。

消息持久化是RocketMQ的store模块实现,具体的代码段如下:

[DefaultMessageStore.java]
PutMessageResult result = this.commitLog.putMessage(msg);

最终是通过commitLog.putMessage(msg)实现了消息的最终持久化,我们后续会详细分析。

本文我们对事务消息如何发送到broker,及broker如何对事务消息进行预处理并落盘的主要过程进行了分析。

事务消息发送的第一阶段就分析完毕了;事务消息系列的下一篇文章,我们将对事务消息的第二阶段:事务消息提交/回滚以及事务消息回查过程进行解析,我们下文见。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK