35

跟我学RocketMQ之消息拉取源码解析

 3 years ago
source link: http://wuwenliang.net/2019/08/20/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E6%B6%88%E6%81%AF%E6%8B%89%E5%8F%96%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我们继续对消息消费流程的源码进行解析。

本文主要针对push模式下的消息拉取流程进行解析。我们重点分析集群消费模式,对于广播模式其实很好理解,每个消费者都需要拉取主题下面的所有消费队列的消息。

在集群消费模式下,同一个消费者组内包含了多个消费者实例,同一个topic下存在多个消费队列。对于单个消费者组,其内部维护了一个线程池进行消息消费,这部分内容可以移步 跟我学RocketMQ之消息消费源码解析(2)

之前我们已经研究了消费者的初始化流程,在启动MQClientInstance过程中,启动了一个消息拉取线程PullMessageService进行消息拉取工作。

PullMessageService启动

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

    ...省略其他...
    this.pullMessageService = new PullMessageService(this);
    ...省略其他...
    this.rebalanceService = new RebalanceService(this);
}

可以看到在MQClientInstance的构造初始化过程中,启动了PullMessageService线程。

PullMessageService启动

从之前文章中对消息消费启动过程的分析中得知,在消费者启动过程defaultMQPushConsumerImpl.start()中,我们启动了MQClientInstance。

在启动MQClientInstance的过程中,对消息拉取线程进行了start()。消息拉取线程开始运行,看下代码实现

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // 启动消息拉取线程
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

通过 this.pullMessageService.start() 启动了消息拉取线程。

PullMessageService消息拉取流程分析

PullMessageService是ServiceThread的子类,ServiceThread是RocketMQ实现的具备启停能力的线程实现,它实现了Runnable接口。我们看一下PullMessageService的声明。

public class PullMessageService extends ServiceThread {

当PullMessageService启动后,开始运行run方法,我们看一下run方法逻辑。

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // step 1
            PullRequest pullRequest = this.pullRequestQueue.take();
            // step 2
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

while (!this.isStopped()) { 这个写法是一种通用的设计技巧,stopped是一个声明为volatile的boolean类型变量,保证多线程下的可见性;每次执行逻辑时判断stopped是否为false,如果是则执行循环体内逻辑。

其他线程能够通过设置stopped为true,导致此处判断结果为false从而终止拉取线程的运行。

  • [step 1] 从pullRequestQueue(LinkedBlockingQueue无界队列)中通过take()获取一个PullRequest消息拉取任务;如果队列为空,则线程阻塞,等待新的PullRequest被放入恢复运行。
  • [step 2] 执行pullMessage方法进行真正的消息拉取操作。

PullRequest添加流程

在阅读pullMessage逻辑之前,我们先看一下PullRequest是从何添加的。

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
    if (!isStopped()) {
        this.scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    } else {
        log.warn("PullMessageServiceScheduledThread has shutdown");
    }
}

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

PullMessageService提供了即时添加与延时添加两种方式添加PullRequest,将其加入到pullRequestQueue阻塞队列中。PullRequest的创建过程是在RebalanceImpl中完成的,这个过程涉及到RocketMQ消息消费的重要过程 消息队列负载机制,这个过程我们会单独进行讲解。

PullRequest简介

我们简单看一下PullRequest的结构:

public class PullRequest {
    // 消费者组
    private String consumerGroup;
    // 待拉取到消息队列
    private MessageQueue messageQueue;
    // 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
    private ProcessQueue processQueue;
    // 带拉取消息的偏移量
    private long nextOffset;
    // 是否锁定
    private boolean lockedFirst = false;

PullRequest消息拉取流程

我们继续回到PullRequest拉取流程中来,查看PullMessageService.pullMessage方法。

private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}

这是一个私有方法,可以看到

  1. 首先从MQClientInstance中选择一个消费者,选取条件为当前拉取请求中的消费者组;
  2. 将该消费者实例强转为DefaultMQPushConsumerImpl
  3. 调用DefaultMQPushConsumerImpl的pullMessage方法进行消息拉取。

我们接着进入DefaultMQPushConsumerImpl.java中查看其pullMessage的具体实现。

DefaultMQPushConsumerImpl.pullMessage消息拉取逻辑

前方大段代码预警……我会将这个大方法拆分成一段一段的细分逻辑进行分析。

public void pullMessage(final PullRequest pullRequest) {
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    if (processQueue.isDropped()) {
        log.info("the pull request[{}] is dropped.", pullRequest.toString());
        return;
    }

    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

首先从pullRequest请求中获取到处理队列processQueue,如果processQueue已经被丢弃则结束拉取流程;

如果processQueue未被丢弃,则更新LastPullTimestamp属性未当前时间戳。

try {
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    return;
}

判断当前线程状态是否为运行态,makeSureStateOK()方法会通过 this.serviceState != ServiceState.RUNNING 进行服务状态的判断; 如果不是RUNNING状态则抛出异常结束消息拉取流程。

if (this.isPause()) {
    ...省略warn日志...
    // long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;单位毫秒
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    return;
}

对消费者是否挂起进行判断,如果消费者状态为已挂起,则将拉取请求延迟1s后再次放到PullMessageService的消息拉取任务队列中。

long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        ...省略warn日志...
    }
    return;
}

此处进行消息拉取流控校验:

如果processQueue当前处理的消息条数超过了PullThresholdForQueue(消息拉取阈值=1000)触发流控,结束本次拉取任务,50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。每触发1000次流控,打印warn日志;

if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            ...省略warn日志...
        }
        return;
    }

如果不是顺序消费,判断processQueue中队列最大偏移量与最小偏移量之间的间隔,如果大于ConsumeConcurrentlyMaxSpan(拉取偏移量阈值==2000)触发流控,结束本次拉取;50毫秒之后将该拉取任务再次加入到消息拉取任务队列中。

每触发1000次流程,打印warn日志。

} else {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                pullRequest, offset, brokerBusy);
            if (brokerBusy) {
                log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                    pullRequest, offset);
            }

            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
}

如果消息处理队列锁定成功,判断消息拉取请求是否锁定,如果没有锁定则计算从何处开始拉取。

判断broker是否繁忙,如果当前拉取的进度小于拉取请求中要拉取到下一个进度,表明当前broker处理的拉取请求还没有执行完成,因此brokerBusy为true,表示broker处于繁忙状态。

更新拉取请求的锁定标记为已锁定,更新下一次拉取的offset为计算出的offset。

如果消息处理队列未锁定,则延迟3s之后将将该拉取任务再次加入到消息拉取任务队列。打印日志表明稍后再进行消息拉取,原因为broker未被锁定。结束本次拉取。

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}

获取当前topic的订阅消息,如果订阅消息不存在,则结束当前消息拉取;延迟三秒之后将拉取任务再次加入到消息拉取任务队列中。

final long beginTimestamp = System.currentTimeMillis();

...省略pullCallback实现...

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue > 0) {
        commitOffsetEnable = true;
    }
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
    if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
        subExpression = sd.getSubString();
    }

    classFilter = sd.isClassFilterMode();
}

获取消息订阅信息,如果订阅信息存在则获取tag标识。

int sysFlag = PullSysFlag.buildSysFlag(
    commitOffsetEnable, // commitOffset
    true, // suspend
    subExpression != null, // subscription
    classFilter // class filter
);

这里主要构造了消息拉取的系统标识;

try {
    this.pullAPIWrapper.pullKernelImpl(
        pullRequest.getMessageQueue(),
        subExpression,
        subscriptionData.getExpressionType(),
        subscriptionData.getSubVersion(),
        pullRequest.getNextOffset(),
        this.defaultMQPushConsumer.getPullBatchSize(),
        sysFlag,
        commitOffsetValue,
        BROKER_SUSPEND_MAX_TIME_MILLIS,
        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC,
        pullCallback
    );

通过pullKernelImpl()方法发起真实的消息拉取请求,pullKernelImpl方法内部与服务端进行网络通信。底层调用了MQClientAPIImpl的pullMessage方法。此处涉及到网络通信,我们在后续的网络通讯代码部分进行分析。

注意此处的pullKernelImpl方法中的最后一个参数为PullCallback,PullCallback为从Broker拉取消息之后的回调方法,它的初始化代码如下,我们单独拿出来进行解析。

DefaultMQPushConsumerImpl.pullMessage.PullCallback实例化代码逻辑

通过匿名内部类的方式初始化了PullCallback回调接口,需要实现其OnSuccess、onException方法。

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), 
                            pullResult,
                            subscriptionData);

如果拉取结果不为空,表明拉取成功了,执行processPullResult对拉取结果进行解析。

// 判断拉取状态
switch (pullResult.getPullStatus()) {
    case FOUND:
        long prevRequestOffset = pullRequest.getNextOffset();
        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
        long pullRT = System.currentTimeMillis() - beginTimestamp;
        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
            pullRequest.getMessageQueue().getTopic(), pullRT);

        long firstMsgOffset = Long.MAX_VALUE;
        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

如果拉取结果响应中包含的消息列表为空,或者列表为空列表,则立即发起下一次拉取请求。以便唤醒PullMessageService再次执行拉取;之所以列表为空,是因为客户端通过TAG对消息进行了过滤,因此会出现过滤后列表为空的情况。

} else {
    // 取出第一个消息的offset
    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume);

将拉取到的消息保存到processQueue中,通过submitConsumeRequest方法将拉取到的消息提交给ConsumeMessageService进行消息消费,这里是一个异步方法。

PullCallBack将消息提交给consumeMessageService之后就直接返回了,不关心消息具体是如何消费的。

    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    } else {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }
}

这里的逻辑比较重要,实现了消息的持续拉取。具体的逻辑为:

将消息提交给消费者线程之后,PullCallBack立即返回,表明当前消息拉取已经完成。

判断PullInterval参数,如果PullInterval>0,等待PullInterval毫秒之后将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。

pullRequestQueue的下次拉取被激活,从而达到消息持续拉取的目的,拉取的频率几乎是准实时的。

    if (pullResult.getNextBeginOffset() < prevRequestOffset
        || firstMsgOffset < prevRequestOffset) {
        log.warn(
            "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
            pullResult.getNextBeginOffset(),
            firstMsgOffset,
            prevRequestOffset);
    }

    break;
case NO_NEW_MSG:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    break;
case NO_MATCHED_MSG:
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    break;

如果返回的拉取结果为NO_NEW_MSG、NO_MATCHED_MSG,则使用服务端校准的offset发起下一次拉取请求。

case OFFSET_ILLEGAL:
    log.warn("the pull request offset illegal, {} {}",
        pullRequest.toString(), pullResult.toString());
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

    pullRequest.getProcessQueue().setDropped(true);
    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

如果返回的拉取结果状态为OFFSET_ILLEGAL,即offset非法;首先设置ProcessQueue的Dropped为true,将该消息队列标记为丢弃。通过服务端下一次校准的offset尝试对当前消息的消费进度进行更新。

@Override
public void run() {
    try {
        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
            pullRequest.getNextOffset(), false);

持久化当前消息的消费进度

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

将当前消息队列从rebalanceImpl的ProcessQueue中移除,对当前队列的消息拉取进行暂停处理,等待下一次rebalance。

                            log.warn("fix the pull request offset, {}", pullRequest);
                        } catch (Throwable e) {
                            log.error("executeTaskLater Exception", e);
                        }
                    }
                }, 10000);
                break;
            default:
                break;
        }
    }
}

offset校准时,基本上使用原先的offset。

客户端进行消费进度,只有实际消费进度大于当前消费进度会会进行offset的覆盖操作,从而保证offset的准确性。

    @Override
    public void onException(Throwable e) {
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            log.warn("execute the pull request exception", e);
        }

        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    }
};

如果拉取结果异常,则3s后将消息拉取请求重新将PullRequest对象放到PullMessageService的pullRequestQueue消息拉取队列中。

本文主要对PullMessageService消息拉取逻辑进行了分析,整个流程还是比较复杂的。其中的一些编码套路在实战中也是能够借鉴的,希望本文能够对读者理解消息拉取有所帮助。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK