8

跟我学RocektMQ之事务消息提交及回查源码解析

 3 years ago
source link: http://wuwenliang.net/2019/09/01/%E8%B7%9F%E6%88%91%E5%AD%A6RocektMQ%E4%B9%8B%E4%BA%8B%E5%8A%A1%E6%B6%88%E6%81%AF%E6%8F%90%E4%BA%A4%E5%8F%8A%E5%9B%9E%E6%9F%A5%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文进入了事务消息源码解析的最后部分,该部分是RocketMQ二阶段事务的第二阶段,即:提交/回滚事务以及事务回查。

消息提交/回滚[客户端逻辑]:endTransaction

在事务消息源码解析的第一篇末尾,我们分析了DefaultMQProducerImpl.endTransaction的逻辑:

[DefaultMQProducerImpl.endTransaction]
......
String transactionId = sendResult.getTransactionId();
// 获取broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
    case COMMIT_MESSAGE:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
        break;
    case ROLLBACK_MESSAGE:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
        break;
    case UNKNOW:
        requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
        break;
    default:
        break;
}
// 设置生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置队列offset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
// 设置消息id
requestHeader.setMsgId(sendResult.getMsgId());

String remark = localException != null ? 
        ("executeLocalTransactionBranch exception: " + localException.toString())
        : null;
// 发送事务结束请求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());

代码的主要逻辑概括如下:

  1. 根据消息丛书的消息队列获取broker的ip及端口信息
  2. 构造事务结束请求头,设置事务id以及commit的offset
  3. 根据本地事务实际的执行状况,为事务结束请求头设置 提交/回滚/什么都不做 标记
  4. 通过当前客户端持有的MQClientInstance发送事务结束请求到broker

消息提交/回滚[服务端逻辑]:EndTransactionProcessor.processRequest

broker在启动时会加载EndTransactionProcessor,处理客户端发送的事务结束请求;调用链如下:

BrokerStartup.main(String[] args)
    |-BrokerStartup.start(BrokerController controller)
    |-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
        |-BrokerController.initialize()
            |-BrokerController.registerProcessor()
                |-RemotingServer.registerProcessor(
                        final int requestCode,                   (RequestCode.END_TRANSACTION,)
                        final NettyRequestProcessor processor,   (new EndTransactionProcessor(this))
                        final ExecutorService executor);         (this.endTransactionExecutor)

EndTransactionProcessor的核心逻辑如下:

[EndTransactionProcessor.processRequest]
OperationResult result = new OperationResult();
// 如果MessageSysFlag为事务提交类型
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
    // 执行消息提交
    result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 结束消息事务
            MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
            msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
            RemotingCommand sendResult = sendFinalMessage(msgInner);
            // 删除半消息
            if (sendResult.getCode() == ResponseCode.SUCCESS) {
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return sendResult;
        }
        return res;
    }
} 
// 如果MessageSysFlag为事务回滚类型
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
    // 执行消息回滚
    result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
    if (result.getResponseCode() == ResponseCode.SUCCESS) {
        RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
        if (res.getCode() == ResponseCode.SUCCESS) {
            // 删除半消息
            this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
        }
        return res;
    }
}
// 组装返回体
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;

总结一下EndTransactionProcessor逻辑:

  1. 判断MessageSysFlag,如果MessageSysFlag为事务提交类型,则执行事务提交操作
    1. 执行TransactionalMessageService.commitMessage进行半消息提交操作
    2. 执行endMessageTransaction,恢复消息原主题,原队列,恢复原消息
    3. 重新对消息进行持久化,存储到commitLog中,执行sendFinalMessage将原消息转发至实际的消息消费队列中,以便消费者进行消费
    4. 执行deletePrepareMessage方法删除prepare消息,内部实现为将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中;标识该消息已被处理,为事务回查提供依据
  2. 如果MessageSysFlag为事务回滚类型,则执行事务回滚操作
    1. 执行TransactionalMessageService.rollbackMessage进行半消息回滚操作
    2. 通过deletePrepareMessage将prepare半消息进行删除,实现方式与事务提交相同,将prepare消息转储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中,标识该消息已被处理。

我们具体分析一下这个过程中涉及到的子流程:

prepare消息提交:commitMessage

首先是commitMessage,执行事务提交操作

[TransactionalMessageServiceImpl.commitMessage]
@Override
public OperationResult commitMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

private OperationResult getHalfMessageByOffset(long commitLogOffset) {
    OperationResult response = new OperationResult();
    // 根据消息的物理偏移commitLogOffset获取消息MessageExt
    MessageExt messageExt = this.transactionalMessageBridge.lookMessageByOffset(commitLogOffset);
    // 将消息设置到OperationResult返回体中
    if (messageExt != null) {
        response.setPrepareMessage(messageExt);
        response.setResponseCode(ResponseCode.SUCCESS);
    } 
    ...省略messageExt为空的逻辑...
    return response;
}

endMessageTransaction,恢复消息原主题,原队列,恢复原消息

[EndTransactionProcessor.endMessageTransaction()]
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
    // 初始化新的消息实体MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    // 从属性中恢复消息的原topic
    msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    // 从属性中恢复消息的原队列id
    msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    // 复制消息体,消息属性
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
    msgInner.setWaitStoreMsgOK(false);
    msgInner.setTransactionId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    msgInner.setSysFlag(msgExt.getSysFlag());
    TopicFilterType topicFilterType =
        (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
            : TopicFilterType.SINGLE_TAG;
    long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
    msgInner.setTagsCode(tagsCodeValue);
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
    return msgInner;
}

sendFinalMessage,将原消息转发至实际的消息消费队列中,以便消费者进行消费

[EndTransactionProcessor.sendFinalMessage]
final PutMessageResult putMessageResult =
        this.brokerController.getMessageStore().putMessage(msgInner);
...省略后续校验逻辑...

最终通过DefaultMessageStore.putMessage将恢复后的原消息再次持久化到commitLog中。

deletePrepareMessage,删除prepare消息,非物理删除

[TransactionalMessageServiceImpl.deletePrepareMessage]
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
    if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
        log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", 
        msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
        return true;
    } else {
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", 
        msgExt.getMsgId(), msgExt.getQueueId());
        return false;
    }
}

可以看到是通过transactionalMessageBridge.putOpMessage实现的逻辑删除

[TransactionalMessageBridge.putOpMessage]
public boolean putOpMessage(MessageExt messageExt, String opType) {
    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
        this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
    if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
        // 将该prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题中
        return addRemoveTagInTransactionOp(messageExt, messageQueue);
    }
    return true;
}

通过addRemoveTagInTransactionOp将prepare消息存储到RMQ_SYS_TRANS_OP_HALF TOPIC 主题

[TransactionalMessageBridge.addRemoveTagInTransactionOp]
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
    Message message = new Message(
        TransactionalMessageUtil.buildOpTopic(), 
        TransactionalMessageUtil.REMOVETAG,
        String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    writeOp(message, messageQueue);
    return true;
}

可以看到最终是通过writeOp实现的消息转储

private void writeOp(Message message, MessageQueue mq) {
    MessageQueue opQueue;
    ...省略opQueue校验过程...
    if (opQueue == null) {
        opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
    }
    putMessage(makeOpMessageInner(message, opQueue));
}

通过putMessage将prepare消息持久化到commiLog。topic为RMQ_SYS_TRANS_OP_HALF_TOPIC;我们看一下topic的创建方法TransactionalMessageUtil.buildOpTopic()

[TransactionalMessageUtil.buildOpTopic]
public static String buildOpTopic() {
    return MixAll.RMQ_SYS_TRANS_OP_HALF_TOPIC;
}

这里的写法可以参考,即通过一个静态方法将常量进行封装。到此我们对prepare消息提交的分析就告一段落。

最终半消息的删除已依靠文件删除机制实现的。

prepare消息回滚:rollbackMessage

接着看一下prepare消息的回滚逻辑。

首先执行TransactionalMessageService.rollbackMessage进行半消息回滚操作

[TransactionalMessageServiceImpl.rollbackMessage]
@Override
public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader) {
    return getHalfMessageByOffset(requestHeader.getCommitLogOffset());
}

这里的逻辑同commitMessage相同,同样是根据消息的物理偏移commitLogOffset获取消息MessageExt;获取到消息之后执行deletePrepareMessage将prepare消息删除。实现方式与事务提交相同,这部分代码上文已经分析过就不再重复。

事务消息回查逻辑

我们最后分析一下二阶段中重要的一个流程,事务回查的实现。

事务回查service初始化

事务回查实现是通过线程TransactionalMessageCheckService实现的,它的初始化调用链如下:

BrokerStartup.main(String[] args)
    |-BrokerStartup.start(BrokerController controller)
    |-BrokerStartup.createBrokerController(String[] args) 返回BrokerController
        |-BrokerController.initialize()
            |-BrokerController.initialTransaction();
                |-this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);

RocketMQ的broker关键的异步逻辑基本上都是通过该调用链实现的,包括但不限于上文中提到的事务提交/回滚处理器EndTransactionProcessor的注册过程。

在broker启动完成之后事务回查线程TransactionalMessageCheckService也随之加载完毕。

事务回查逻辑

我们查看一下回查线程TransactionalMessageCheckService的run方法,核心的回查逻辑就在该方法中

[TransactionalMessageCheckService.java]
@Override
public void run() {
    log.info("Start transaction check service thread!");
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

checkInterval为回查任务的间隔时间,默认为60秒,

[BrokerConfig.java]
@ImportantField
private long transactionCheckInterval = 60 * 1000;

checkInterval的值可通过在broker.conf文件中配置transactionChecklnterval来改变,单位为毫秒。

我们继续进入 waitForRunning方法中

[TransactionalMessageCheckService.java]
protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }
    ......
}

进入onWaitEnd方法中;TransactionalMessageCheckService重写了父类ServiceThread的onWaitEnd方法,我们分析一下具体逻辑

[TransactionalMessageCheckService.java]
@Override
protected void onWaitEnd() {
    // TransactionTimeOut默认值为5秒
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 回查最大次数为15次;
    // 如果超过最大检测次数还是无法获得事务状态,RocketMQ将直接丢弃该消息即相当于回滚事务。
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    // 回查逻辑
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

这里我们可以得出,事务回查操作周期默认为60s一次,每次执行的超时时间为5秒;最大回查次数为15次,超过最大回查次数则丢弃消息,相当有对事务进行了回滚。

回查逻辑TransactionalMessageService.check

[TransactionalMessageServiceImpl.check]
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
    log.warn("The queue of topic is empty :" + topic);
    return;
}

上述逻辑首先获取了RMQ_SYS_TRANS_HALF_TOPIC半消息中的所有队列。

......
// 迭代队列
for (MessageQueue messageQueue : msgQueues) {
    long startTime = System.currentTimeMillis();
    MessageQueue opQueue = getOpQueue(messageQueue);
    // 获取半消息队列的消费偏移量
    long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
    // 获取op队列已经删除消费队列的偏移量
    long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
    log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
    if (halfOffset < 0 || opOffset < 0) {
        log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
            halfOffset, opOffset);
        continue;
    }

    List<Long> doneOpOffset = new ArrayList<>();
    HashMap<Long, Long> removeMap = new HashMap<>();

    // 确认消息是否删除
    PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
    if (null == pullResult) {
        log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
            messageQueue, halfOffset, opOffset);
        continue;
    }

根据方法fillOpRemoveMap确认半消息是否已经被删除,具体逻辑如下:

[TransactionalMessageServiceImpl.fillOpRemoveMap]
......
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
    log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
    return pullResult;
}
for (MessageExt opMessageExt : opMsg) {
    Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
    if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
        if (queueOffset < miniOffset) {
            doneOpOffset.add(opMessageExt.getQueueOffset());
        } else {
            removeMap.put(queueOffset, opMessageExt.getQueueOffset());
        }
    } else {
        log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
    }
}
return pullResult;

比较半消息消费队列中的最大偏移量miniOffset 与 删除消费队列的消息偏移量queueOffset;

如果queueOffset >= miniOffset,说明半消息已经删除过了,但是半消息还没有更新,将半消息存放在removeMap中。

我们继续回到check方法中

while (true) {
        ......
    // 如果半消息已经被处理过,偏移量继续递增,往后处理
    if (removeMap.containsKey(i)) {
        log.info("Half offset {} has been committed/rolled back", i);
        removeMap.remove(i);
    } else {
        // 查找半消息
        GetResult getResult = getHalfMsg(messageQueue, i);
        MessageExt msgExt = getResult.getMsg();
        ...省略半消息不存在处理逻辑...
        // 如果超过存储时间needSkip(默认3天)或者 超过回查次数needDiscard(默认15次)
        // 继续往后执行
        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
            listener.resolveDiscardMsg(msgExt);
            newOffset = i + 1;
            i++;
            continue;
        }
        // 消息存储时间大于开始时间的不处理
        if (msgExt.getStoreTimestamp() >= startTime) {
            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                new Date(msgExt.getStoreTimestamp()));
            break;
        }

        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
        long checkImmunityTime = transactionTimeout;
        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
        if (null != checkImmunityTimeStr) {
            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
            // 如果存储时间小于需要进行回查的时间,跳过继续下一个
            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                    newOffset = i + 1;
                    i++;
                    continue;
                }
            }
        } 

这段代码的逻辑总结如下:

  1. 判断removeMap中包含该消息,表明消息已经被处理过,只是半消息队列未更新;跳过这个消息不进行处理
  2. 如果消息回查次数大于15次或者消息已经超过了存储时间则不对消息进行处理
  3. 消息存储时间大于目前的回查程序开始时间的暂时不处理,等待后续进行回查

            List<MessageExt> opMsg = pullResult.getMsgFoundList();
            boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                || (valueOfCurrentMinusBorn <= -1);
    
            if (isNeedCheck) {
                // 将半消息重新存储在topic--RMQ_SYS_TRANS_HALF_TOPIC中
                if (!putBackHalfMsgQueue(msgExt, i)) {
                    continue;
                }
                // 发给客户端选择一个producerGroup进行回查
                listener.resolveHalfMsg(msgExt);
            } else {
                pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                    messageQueue, pullResult);
                continue;
            }
        }
        newOffset = i + 1;
        i++;
    }
    // 保存op消费进度
    if (newOffset != halfOffset) {
        transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
    }
    

这段逻辑对本次回查未能获取结果的消息重新存储到RMQ_SYS_TRANS_HALF_TOPIC中,等待下次回查的执行;否则发给客户端进行回查,最终保存已处理消息与半消息的消费进度。

客户端进行事务回查

客户端ClientRemotingProcessor通过checkTransactionState方法响应事务回查请求

[ClientRemotingProcessor.java]
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {

    // 请求头解码
    final CheckTransactionStateRequestHeader requestHeader =
        (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if (messageExt != null) {
        if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
            messageExt.setTopic(NamespaceUtil
                .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
        }
        String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null != transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if (group != null) {
            // 选择一个事务消息生产者实例
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            if (producer != null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                // 执行事务回查
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group);
            }
        ...省略异常日志打印...
    return null;
}

客户单选择一个生产者实例发起真正的事务回查操作,通过producer.checkTransactionState(addr, messageExt, requestHeader)执行回查

回查是在客户端中起线程异步执行的,通过异步回调客户端TransactionListener的checkLocalTransactionState方法实现。

// 构造一个回查任务
Runnable request = new Runnable() {
    private final String brokerAddr = addr;
    private final MessageExt message = msg;
    private final CheckTransactionStateRequestHeader checkRequestHeader = header;
    private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

    @Override
    public void run() {

首先获取客户端设置的事务监听器

TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable exception = null;
    try {
        if (transactionCheckListener != null) {

通过transactionCheckListener.checkLocalTransactionState(message);执行回查操作

        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
    } else if (transactionListener != null) {
        log.debug("Used new check API in transaction message");
        localTransactionState = transactionListener.checkLocalTransaction(message);
    } else {
        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
    }
} catch (Throwable e) {
    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
    exception = e;
}

将回查得到的本地事务执行结果发送给broker,以便broker对半消息进行回滚/提交等操作

        this.processTransactionState(
            localTransactionState,
            group,
            exception);
    } else {
        log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
    }
}

通过processTransactionState方法将事务回查的结果提交给broker

private void processTransactionState(
    // 本地事务执行状态
    final LocalTransactionState localTransactionState,
    final String producerGroup,
    final Throwable exception) {

    final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
    thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
    thisHeader.setProducerGroup(producerGroup);
    thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
    thisHeader.setFromTransactionCheck(true);

    String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
    if (uniqueKey == null) {
        uniqueKey = message.getMsgId();
    }
    thisHeader.setMsgId(uniqueKey);
    thisHeader.setTransactionId(checkRequestHeader.getTransactionId());

构造事务结束请求实体EndTransactionRequestHeader,根据回查得到的本地事务执行结果,设置具体的消息执行状态MessageSysFlag。这里与半消息发送阶段对本地事务的处理是一致的。

switch (localTransactionState) {
    case COMMIT_MESSAGE:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
        break;
    case ROLLBACK_MESSAGE:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
        break;
    case UNKNOW:
        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
        break;
    default:
        break;
}

String remark = null;
if (exception != null) {
    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}

try {

通过endTransactionOneway将事务回查状态发送给broker,具体的逻辑与本文开头一致。

            DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                3000);
        } catch (Exception e) {
            log.error("endTransactionOneway exception", e);
        }
    }
};

实际上回查的请求就是通过客户端设置的回查线程池提交的,这句代码可见端倪。

// 提交回查请求到事务回查线程池
this.checkExecutor.submit(request);

可以看到,回查任务最终提交到了TransactionMQProducer的事务回查线程池中执行,最终调用了应用程序实现的
TransactionListener 的checkLoca!Transaction 方法,根据执行结果返回真实的事务状态。

我们最后再总结一下RocketMQ事务消息的实现思想:

RocketMQ的事务消息是基于两阶段提交思想,并配合事务状态回查机制实现的。

两阶段提交部分:首先发送prepare半消息,根据本地事务执行的提交或者回滚发送半消息commit/rollback命令给broker;

broker端通过定时任务,默认以1分钟为回查频率,对Prepare消息存储队列(topic=RMQ__SYS_TRANS _HALF_TOPIC)及半消息处理队列(topic=RMQ_SYS_TRANS_OP_HALF_TOPIC存储已经提交或者回滚的消息)中的消息进行比较,对需要进行回查的prepare消息发送给客户端进行回查;根据回查结果最终决定对半消息进行commit/rollback操作。

到此事务消息的提交/回滚及回查的解析就告一段落,事务消息部分的源码解析就到此结束。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK