29

跟我学RocketMQ之消息重试

 3 years ago
source link: http://wuwenliang.net/2019/03/28/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E6%B6%88%E6%81%AF%E9%87%8D%E8%AF%95/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
跟我学RocketMQ之消息重试 | 朝·闻·道

本文中,我将讲解RocketMQ使用过程中,如何进行消息重试。

首先,我们需要明确,只有当消费模式为 MessageModel.CLUSTERING(集群模式) 时,Broker才会自动进行重试,对于广播消息是不会重试的。

集群消费模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功。

当消费者消费该重试消息后,需要返回结果给broker,告知broker消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)。

这里有个问题,如果消费者业务本身故障导致某条消息一直无法消费成功,难道要一直重试下去吗?

答案是显而易见的,并不会一直重试。

事实上,对于一直无法消费成功的消息,RocketMQ会在达到最大重试次数之后,将该消息投递至死信队列。然后我们需要关注死信队列,并对该死信消息业务做人工的补偿操作。

那如何返回消息消费失败呢?

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  1. 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 业务消费方返回null
  3. 业务消费方主动/被动抛出异常

前两种情况较容易理解,当返回ConsumeConcurrentlyStatus.RECONSUME_LATER或者null时,broker会知道消费失败,后续就会发起消息重试,重新投递该消息。

注意 对于抛出异常的情况,只要我们在业务逻辑中显式抛出异常或者非显式抛出异常,broker也会重新投递消息,如果业务对异常做了捕获,那么该消息将不会发起重试。因此对于需要重试的业务,消费方在捕获异常的时候要注意返回ConsumeConcurrentlyStatus.RECONSUME_LATER或null并输出异常日志,打印当前重试次数。(推荐返回ConsumeConcurrentlyStatus.RECONSUME_LATER

RocketMQ重试时间窗

这里介绍一下Apache RocketMQ的重试时间窗,当消息需要重试时,会按照该规则进行重试。

我们可以在RocketMQ源码的MessageStoreConfig.java中找到消息重试配置:

MessageStoreConfig.java
// 消息延迟级别定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

默认重试次数与重试时间间隔对应关系表如下:

重试次数 距离第一次发送的时间间隔 1 10s 2 30s 3 1m 4 2m 5 3m 6 4m 7 5m 8 6m 9 7m 10 8m 11 9m 12 10m 13 20m 14 30m 15 1h 16 2h

可以看到,RocketMQ采用了“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小。

【源码分析】

在RocketMQ的客户端源码DefaultMQPushConsumerImpl.java中,对重试机制做了说明,源码如下:

private int getMaxReconsumeTimes() {
    // default reconsume times: 16
    if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
        return 16;
    } else {
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }
}

解释下,首先判断消费端有没有显式设置最大重试次数 MaxReconsumeTimes, 如果没有,则设置默认重试次数为16,否则以设置的最大重试次数为准。

当消息消费失败,服务端会发起消费重试,具体逻辑在broker源码的 SendMessageProcessor.java 中的consumerSendMsgBack方法中涉及,源码如下:

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    || delayLevel < 0) {
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
        DLQ_NUMS_PER_GROUP,
        PermName.PERM_WRITE, 0
    );
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }
} else {
    if (0 == delayLevel) {
        delayLevel = 3 + msgExt.getReconsumeTimes();
    }

    msgExt.setDelayTimeLevel(delayLevel);
}

判断消息当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则获取死信队列的Topic,后续将超时的消息send到死信队列中。

正常的消息会进入else分支,对于首次重试的消息,默认的delayLevel是0,rocketMQ会将给该level + 3,也就是加到3,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,从表格中看也就是距离首次发送10s后重试。

当延时级别设置完成,刷新消息的重试次数为当前次数加1,broker将该消息刷盘,逻辑如下

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
// 刷新消息的重试次数为当前次数加1
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 消息刷盘
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

那么什么是msgInner呢,即:MessageExtBrokerInner,也就是对重试的消息,rocketMQ会创建一个新的 MessageExtBrokerInner 对象,它实际上是继承了MessageExt。

我们继续进入消息刷盘逻辑,即putMessage(msgInner)方法,实现类为:DefaultMessageStore.java, 核心代码如下:

long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessage(msg);

主要关注 this.commitLog.putMessage(msg); 这句代码,通过commitLog我们可以认为这里是真实刷盘操作,也就是消息被持久化了。

我们继续进入commitLog的putMessage方法,看到如下核心代码段:

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // 处理延时级别
    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }
        // 更换Topic
        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        // 队列ID为延迟级别-1
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        // 重置topic及queueId
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}

ScheduleMessageService.java
public static int delayLevel2QueueId(final int delayLevel) {
    return delayLevel - 1;
}   

可以看到,如果是重试消息,在进行延时级别判断时候,返回true,则进入分支逻辑,通过这段逻辑我们可以知道,对于重试的消息,rocketMQ并不会从原队列中获取消息,而是创建了一个新的Topic进行消息存储的。也就是代码中的SCHEDULE_TOPIC,看一下具体是什么内容

public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

emmm,这个名字比较有意思,就叫 SCHEDULE_TOPIC_XXXX

到这里我们可以得到一个结论:

对于所有消费者消费失败的消息,rocketMQ都会把重试的消息 重新new出来(即上文提到的MessageExtBrokerInner对象),然后投递到主题 SCHEDULE_TOPIC_XXXX 下的队列中,然后由定时任务进行调度重试,而重试的周期符合我们在上文中提到的delayLevel周期,也就是:

MessageStoreConfig.java
// 消息延迟级别定义
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

同时为了保证消息可被找到,也会将原先的topic存储到properties中,也就是如下这段代码

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));    

这里将原先的topic和队列id做了备份。

死信的业务处理方式

默认的处理机制中,如果我们只对消息做重复消费,达到最大重试次数之后消息就进入死信队列了。

我们也可以根据业务的需要,定义消费的最大重试次数,每次消费的时候判断当前消费次数是否等于最大重试次数的阈值。

如:重试三次就认为当前业务存在异常,继续重试下去也没有意义了,那么我们就可以将当前的这条消息进行提交,返回broker状态ConsumeConcurrentlyStatus.CONSUME_SUCCES,让消息不再重发,同时将该消息存入我们业务自定义的死信消息表,将业务参数入库,相关的运营通过查询死信表来进行对应的业务补偿操作。

RocketMQ 的处理方式为将达到最大重试次数(16次)的消息标记为死信消息,将该死信消息投递到 DLQ 死信队列中,业务需要进行人工干预。实现的逻辑在 SendMessageProcessor 的 consumerSendMsgBack 方法中,大致思路为首先判断重试次数是否超过16或者消息发送延时级别是否小于0,如果已经超过16或者发送延时级别小于0,则将消息设置为新的死信。死信 topic 为:%DLQ%+consumerGroup。

我们接着看一下死信的源码实现机制。

死信源码分析

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final ConsumerSendMsgBackRequestHeader requestHeader =
            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

        ......

        // 0.首先判断重试次数是否大于等于16,或者消息延迟级别是否小于0
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {
            // 1. 如果满足判断条件,设置死信队列topic= %DLQ%+consumerGroup
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE, 0
            );
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return response;
            }
        } else {
            // 如果延迟级别为0,则设置下一次延迟级别为3+当前重试消费次数,达到时间衰减效果
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }

            msgExt.setDelayTimeLevel(delayLevel);
        }

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

        // 3.死信消息投递到死信队列中并落盘
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        ......
        return response;
    }

我们总结一下死信的处理逻辑:

  1. 首先判断消息当前重试次数是否大于等于16,或者消息延迟级别是否小于0
  2. 只要满足上述的任意一个条件,设置新的topic(死信topic)为:%DLQ%+consumerGroup
  3. 进行前置属性的添加
  4. 将死信消息投递到上述步骤2建立的死信topic对应的死信队列中并落盘,使消息持久化。

发送失败如何重试

上文中,我们讲解了对于消费失败的重试策略,这个章节中我们来了解下消息发送失败如何进行重试。

当发生网络抖动等异常情况,Producer生产者侧往broker发送消息失败,即:生产者侧没收到broker返回的ACK,导致Consumer无法进行消息消费,这时RocketMQ会进行发送重试。

使用DefaultMQProducer进行普通消息发送时,我们可以设置消息发送失败后最大重试次数,并且能够灵活的配合超时时间进行业务重试逻辑的开发,使用的API如下:

/**设置消息发送失败时最大重试次数*/
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
    this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}

/**同步发送消息,并指定超时时间*/
public SendResult send(Message msg,
                    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, timeout);
}

通过API可以看出,生产者侧的重试是比较简单的,例如:设置生产者在3s内没有发送成功则重试3次的代码如下:

/**同步发送消息,如果3秒内没有发送成功,则重试3次*/
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducerGroup");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 3000L);

本文中,我们主要介绍了RocketMQ的消息重试机制,该机制能够最大限度的保证业务能够往我们期望的方向流转。

这里还需要注意,业务重试的时候我们的消息消费端需要保证消费的 幂等性, 关于消息消费的幂等如何处理,我们在后续的文章会展开讲解。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK