11

跟我学RocketMQ之定时消息源码解析

 3 years ago
source link: http://wuwenliang.net/2019/09/15/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E5%AE%9A%E6%97%B6%E6%B6%88%E6%81%AF%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文我们单独对RocketMQ的定时消息进行源码解析。

同事务消息类似,RocketMQ定时消息也是通过Topic替换,后台线程异步发送实现的。具体逻辑是通过org.apache.rocketmq.store.schedule.ScheduleMessageService实现的。

定时消息原理概述

在正式进行源码分析之前,我们先从概念上对定时消息做一个较为宏观的认知。

RocketMQ支持指定级别的消息延迟,默认为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

RocketMQ消息重试以及定时消息均是通过定时任务实现的。重试消息以及定时消息在存入commitLog之前会判断重试次数,如果大于0,则会将消息的topic设置为SCHEDULE_TOPIC_XXXX。

ScheduleMessageService在实例化之后会对SCHEDULE_TOPIC_XXXX主题下的消息进行定时调度,从而实现定时投递。

ScheduleMessageService源码解析

我们接着对ScheduleMessageService进行解析,了解RocketMQ具体是如何实现定时消息机制的。

在正式分析之前,先对ScheduleMessageService的重要成员变量做一下了解:

delayLevelTable,记录了对延迟级别的解析结果,key=延迟级别,value=对应延迟级别的毫秒数

private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
    new ConcurrentHashMap<Integer, Long>(32);

offsetTable,延迟级别对应的消费进度,key=延迟级别,value=对应延迟级别下的消费进度

private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
    new ConcurrentHashMap<Integer, Long>(32);

ScheduleMessageService的初始化是在DefaultMessageStore实现的,具体的调用链如下:

BrokerStartup
    |-main
        |-start
            |-createBrokerController
                |-BrokerController.initialize()    
                |-controller.start()
                    |-DefaultMessageStore.start()
                        |-new ScheduleMessageService(this)
                        |-scheduleMessageService.start()

从调用链可以看出,当broker启动完成,ScheduleMessageService就开始对定时消息进行调度。

对于ScheduleMessageService我们主要关注:

  • load()方法
  • start()方法

ScheduleMessageService.load()

首先关注一下load()方法逻辑。

[ScheduleMessageService.java]
public boolean load() {
    boolean result = super.load();
    result = result && this.parseDelayLevel();
    return result;
}

load()方法的逻辑比较清晰,它的主要职责为:

  1. 通过super.load()方法获取配置文件,加载延迟消息的消费进度
  2. 初始化delayLevelTable

RocketMQ将延时消息的消费进度存储于 ${RocketMQ_Home}/store/config/delayOffset.json下。

我们重点看一下parseDelayLevel();如何完成解析延时配置,并组装为delayLevelTable的。

[ScheduleMessageService.java]
public boolean parseDelayLevel() {
    // 初始化一个时间单位map,key为秒、分、时、天;value为对应单位的毫秒数
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    // 从defaultMessageStore中获取配置文件,从配置文件中获取延迟级别配置串,即:messageDelayLevel
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {

        // 根据空格进行拆分,分解为String数组
        String[] levelArray = levelString.split(" ");

        // 遍历String数组
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            // key=延迟级别,等于下标+1
            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            // value=单位对应毫秒数 * 解析得到的时间单位
            long delayTimeMillis = tu * num;
            // 存放到delayLevelTable
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }
    return true;
}

这段代码很好理解,就是对配置中的延时串通过空格进行分割为数组,按照下标及单位,计算得到每个等级对应的毫秒数,最终存放在delayLevelTable中实现delayLevelTable的初始化,便于后续在代码逻辑中进行使用。

如果没有设置则使用代码中的默认值。

ScheduleMessageService.start()

我们接着看一下start()方法的逻辑,该方法是延迟消息(定时消息)调度的核心逻辑。

[ScheduleMessageService.java]
public void start() {
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);

start方法的核心思想为

对不同的延迟级别创建对应的定时任务,通过定时任务对持久化的消息队列的进度进行存储。

// 首先对delayLevelTable进行迭代,取出每一个级别及其对应的延时长度。
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
    Integer level = entry.getKey();
    Long timeDelay = entry.getValue();
    Long offset = this.offsetTable.get(level);
    // 获取该级别对应的消费进度offset,如果不存在则设置为0
    if (null == offset) {
        offset = 0L;
    }

    // 如果延时不为空,则延迟1秒执行定时任务
    if (timeDelay != null) {
        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    }
}

这里简单总结一下,首先对delayLevelTable进行遍历,获取对应延迟级别level对应的消费进度,默认进度不存在,每个延迟级别对应的消费进度都从0开始。

创建定时任务开始进行调度,每个定时任务初始都延迟1秒开始进行调度。后续则使用对应的延迟级别进行调度。

注意:延时级别与消费队列的关系为:消息队列id=延时级别-1,具体逻辑在queueId2DelayLevel方法中。

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

这段代码的核心逻辑为,执行定时任务,每隔10s进行一次消费进度的持久化操作。具体的持久化刷盘频率可以通过flushDelayOffsetInterval参数进行配置。

定时任务实现:DeliverDelayedMessageTimerTask

上面的分析中我们得知,RocketMQ对定时消息的每一个延迟级别都设置了一个定时任务,这个定时任务识通过DeliverDelayedMessageTimerTask实现的。

DeliverDelayedMessageTimerTask继承了TimerTask,我们直接看它的run()方法实现。

@Override
public void run() {
    try {
        if (isStarted()) {
            this.executeOnTimeup();
        }
    } catch (Exception e) {
        // XXX: warn and notify me
        log.error("ScheduleMessageService, executeOnTimeup exception", e);
        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
            this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    }
}

可以看到,核心是executeOnTimeup()方法,当执行异常,延迟10s后继续执行调度。

我们进入executeOnTimeup()方法。

executeOnTimeup()

首先根据topic=SCHEDULE_TOPIC_XXXX,延迟级别转换为队列id,查询到当前的消费队列。

ConsumeQueue cq =
    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
        delayLevel2QueueId(delayLevel));

根据当前的offset从消费队列中获取当前所有的有效消息,如果未能获取到则更新拉取进度,等待定时任务下次进行尝试。

for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();

    if (cq.isExtAddr(tagsCode)) {
        if (cq.getExt(tagsCode, cqExtUnit)) {
            tagsCode = cqExtUnit.getTagsCode();
        } else {
            //can't find ext content.So re compute tags code.
            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                tagsCode, offsetPy, sizePy);
            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
        }
    }

定时任务每次执行到这里都进行时间比较,计算延迟时间与当前时间的差值,如果延迟时间-当前时间<=0说明该延迟消息应当被处理,使其能够被消费者消费。

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

根据消息偏移量及消息大小从commitLog中查询消息,如果查到,则开始执行正式的消息消费准备工作。

if (countdown <= 0) {
    MessageExt msgExt =
         ScheduleMessageService.
            this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

对消息执行重新存储操作,恢复原先的队列以及消息topic,再将消息重新持久化到commitLog中,此时的消息已经能够被消费者拉取到。

if (msgExt != null) {
         try {
             MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);

            PutMessageResult putMessageResult =
                ScheduleMessageService.this.writeMessageStore
                                    .putMessage(msgInner);

我们重点看一下messageTimeup(msgExt)方法是如何进行消息的恢复操作。

messageTimeup(msgExt)恢复原消息主题及队列

private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
        // 建立一个新的MessageExtBrokerInner实体
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties(msgInner, msgExt.getProperties());

        ...省略属性设置...

        msgInner.setWaitStoreMsgOK(false);
        // 清理消息延迟级别属性
        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

        // 恢复消息原主题
        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));

        // 恢复消息原队列id
        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
        int queueId = Integer.parseInt(queueIdStr);
        msgInner.setQueueId(queueId);

        return msgInner;
    }

经过上述操作,定时消息已经还原为普通消息。

我们继续回到 executeOnTimeup() 方法中,通过

PutMessageResult putMessageResult = 
ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);

将还原后的消息重新持久化到commitLog中。

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);

更新当前延迟队列的消息拉取进度,继续处理后续的消息。

本文我们完整的对RocketMQ的定时消息实现方式进行了分析,我们总结一下它的完整流程:

  1. 消息发送方发送消息,设置delayLevel。
  2. 如果delayLevel大于0,表明是一条延时消息,broker处理该消息,将消息的主题、队列id进行备份后,改变消息的主题为SCHEDULE_TOPIC_XXXX,队列id=延迟级别-1,将消息持久化。
  3. 通过定时任务ScheduleMessageService对定时消息进行处理,每隔1s从上次拉取偏移量取出所有的消息进行处理
  4. 从消费队列中解析出消息的物理偏移量,从而从commitLog中取出消息
  5. 根据消息的属性重建消息,恢复消息的topic、原队列id,将消息的延迟级别属性delayLevel清除掉,再次保存到commitLog中
  6. 将消息转发到原主题对应的消费队列中,此时消费者可以对该消息进行消费。


版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK