4

跟我学RocektMQ之理解长轮询机制

 3 years ago
source link: http://wuwenliang.net/2019/09/22/%E8%B7%9F%E6%88%91%E5%AD%A6RocektMQ%E4%B9%8B%E7%90%86%E8%A7%A3%E9%95%BF%E8%BD%AE%E8%AF%A2%E6%9C%BA%E5%88%B6/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
跟我学RocektMQ之理解长轮询机制 | 朝·闻·道

RoceketMQ提供了两种消息消费者,DefaultMQPushConsumer、DefaultMQPullConsumer。我们都知道DefaultMQPullConsumer是基于拉模式的消费,而DefaultMQPushConsumer是基于推模式的消费。我们先简单复习一下推拉模式的概念。

推模式:当服务端有数据立即通知客户端,这个策略依赖服务端与客户端之间的长连接,它具有高实时性、客户端开发简单等优点;同时缺点也很明显,比如:服务端需要感知与它建立链接的客户端,要实现客户端节点的发现,服务端本身主动推送,需要服务端对消息做额外的处理,以便能够及时将消息分发给客户端。

拉模式:客户端主动对服务端的数据进行拉取。客户端拉取数据,拉取成功后处理数据,处理完成再次进行拉取,循环执行。缺点是如果不能很好的设置拉取的频率,时间间隔,过多的空轮询会对服务端造成较大的访问压力,数据的实时性也不能得到很好的保证。

基于对上述两个策略的优缺点的综合考虑,RocketMQ的DefaultMQPushConsumer采用了结合了推拉模式两者优点的长轮询机制,对消息进行消费。这样,既能保证主动权在客户端,还能保证数据拉取的实时性。

本文我们就对RocketMQ的长轮询机制进行分析讲解,从而更好的理解RocketMQ的设计精巧之处。

首先了解一下什么是 长轮询 机制:

什么是“长轮询”机制

长轮询机制,顾名思义,它不同于常规轮询方式。常规的轮询方式为客户端发起请求,服务端接收后该请求后立即进行相应的方式。

长轮询本质上仍旧是轮询,它与轮询不同之处在于,当服务端接收到客户端的请求后,服务端不会立即将数据返回给客户端,而是会先将这个请求hold住,判断服务器端数据是否有更新。如果有更新,则对客户端进行响应,如果一直没有数据,则它会在长轮询超时时间之前一直hold住请求并检测是否有数据更新,直到有数据或者超时后才返回。

RocketMQ如何实现长轮询–客户端实现

了解了长轮询机制的概念,我们就容易理解RocketMQ对长轮询机制的应用了。请跟随笔者的思路,进入到源码中一探究竟。

首先复习一下客户端如何进行消息拉取:

从上文中,我们已经得知,DefaultMQPushConsumer应用了长轮询机制,从之前的源码分析文章中,我们知道RocketMQ消息拉取是通过消息拉取线程PullMessageService实现的,关于这部分的逻辑可以移步 跟我学RocketMQ之消息拉取源码解析

我们进入PullMessageService类,重点看它的 run() 方法。

[PullMessageService.java]
@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

当broker启动后,会在启动MQClientInstance过程中启动PullMessageService,当PullMessageService启动后一直执行run方法进行消息拉取(只要stopped == false)。

回顾一下PullRequest的结构:

public class PullRequest {
    // 消费者组
    private String consumerGroup;
    // 待拉取到消息队列
    private MessageQueue messageQueue;
    // 消息处理队列,消息从broker中拉取以后会先存到该ProcessQueue中,然后再提交给消费者线程池进行消费
    private ProcessQueue processQueue;
    // 带拉取消息的偏移量
    private long nextOffset;
    // 是否锁定
    private boolean lockedFirst = false;

对于每个MessageQueue,都有对应的一个pullRequest,每个MessageQueue还对应一个processQueue,保存该MessageQueue消息处理的快照;通过nextOffset来标识当前读取的位置。

消息拉取最终是由PullAPIWrapper.java执行的,在它的pullKernelImpl()方法中,真正的消息拉取逻辑如下:

[PullAPIWrapper.java.pullKernelImpl()]

// 组装消息拉取请求头
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
// 设置broker最大阻塞时间,默认为15秒,BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);

// 获取拉取broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
    brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}

// 执行消息拉取
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
    brokerAddr,
    requestHeader,
    timeoutMillis,
    communicationMode,
    pullCallback);
return pullResult;

这里的参数brokerSuspendMaxTimeMillis(默认值为15s)代表进行消息拉取时,broker的最长阻塞时间。

当进行消息拉取时,如果broker端没有消息,则进行阻塞,否则会对消息体进行打包并直接返回。

RocketMQ如何实现长轮询–服务端实现

RocketMQ的长轮询是在broker上实现的,具体的代码实现在PullMessageProcessor中。我们进入代码中一窥芳容。

它的启动链路如下:

BrokerStartup
    |-start()
        |-createBrokerController(String[] args) 
            |-BrokerController() // BrokerController构造方法
            |-new PullMessageProcessor(this);

当broker启动完成之后,PullMessageProcessor便能够被远程的消费者访问到,通过网络进行消息拉取调用操作。

我们重点看方法processRequest,它是消息拉取网络交互的核心方法。

processRequest()

processRequest为broker对外提供消息拉取的服务方法,它提供针对不同拉取结果的处理逻辑。

[PullMessageProcessor.java.processRequest]
// 根据客户端发送的拉取消息头,构建拉取结果响应体
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
...各种前置校验...
// 从请求头中取出消费者组、主题、队列id、offset、消息最大拉取条数、过滤条件等,去commitLog中查找对应的消息
switch (getMessageResult.getStatus()) {
        case FOUND:
            response.setCode(ResponseCode.SUCCESS);
            break;
        case MESSAGE_WAS_REMOVING:
            response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
            break;
...省略其他case分支...

// 根据上面拉取结果中设置的code进行处理           
switch (response.getCode()) {
    ...省略其他case分支...
    case ResponseCode.PULL_NOT_FOUND:

            if (brokerAllowSuspend && hasSuspendFlag) {
                long pollingTimeMills = suspendTimeoutMillisLong;
                if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                }

                String topic = requestHeader.getTopic();
                long offset = requestHeader.getQueueOffset();
                int queueId = requestHeader.getQueueId();
                PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                response = null;
                break;
            }

对于ResponseCode.SUCCESS的拉取响应码,RocektMQ将消息拉取结果以byte数组形式设置到拉取响应中,并会返回给客户端;我们重点关注 ResponseCode.PULL_NOT_FOUND 类型,即 当前未拉取到消息

对于ResponseCode.PULL_NOT_FOUND类型,RocketMQ会调用PullRequestHoldService将请求holkd住,不会返回客户端响应,这里就是长轮询的核心逻辑,代码如下:

case ResponseCode.PULL_NOT_FOUND:
    // 判断broker是否允许被挂起
    if (brokerAllowSuspend && hasSuspendFlag) {
        // 获取长轮询超时时长
        long pollingTimeMills = suspendTimeoutMillisLong;
        // 如果长轮询支持未开启,则pollingTimeMills为短轮询时间,ShortPollingTimeMills默认为1秒
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }

        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        // 根据入参request,Nio的channel,轮询时间,当前消息存储时间戳,消息拉取offset,订阅信息,消息过滤表达式等信息构建长轮询拉取请求
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        // 通过PullRequestHoldService对拉取请求进行hold,使用pullRequest对指定topic、queueId的队列进行长轮询消息拉取
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        // 设置拉取返回为null,不对客户端进行返回
        response = null;
        break;
    }

我们总结一下这里的逻辑:

  • 首先判断broker是否允许被hold,如果允许则执行长轮询业务逻辑
  • 获取长轮询超时时长,该参数可配置,如果长轮询支持未开启则改用短轮询时间,默认为1s
  • 从消息拉取请求头中获取topic、队列offset、队列id
  • 构造长轮询消息拉取请求对象PullRequest
  • 调用PullRequestHoldService进行长轮询操作
  • 拉取返回为空,在超时之前不对客户端进行返回

PullRequestHoldService核心逻辑

从上面的分析我们得知,长轮询真正的执行者为PullRequestHoldService,我们看下这个类的代码,PullRequestHoldService继承了ServiceThread,我们重点关注其run方法。

@Override
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            // 如果支持长轮询,则等待5秒
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                // 短轮询则默认等待1s
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // 检测hold请求
            this.checkHoldRequest();
            // 如果检测花费时间超过5s打印日志
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    log.info("{} service end", this.getServiceName());
}

run方法不断检测被hold住的请求,它不断检查是否有消息获取成功。检测方法通过执行方法suspendPullRequest实现

private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
    new ConcurrentHashMap<String, ManyPullRequest>(1024);

public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
    String key = this.buildKey(topic, queueId);
    // 从pullRequestTable中获取对应topic+queueId下的拉取请求ManyPullRequest
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (null == mpr) {
        mpr = new ManyPullRequest();
        ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
        if (prev != null) {
            mpr = prev;
        }
    }
    // 将等待检测的pullRequest添加到ManyPullRequest中
    mpr.addPullRequest(pullRequest);
}

注意,这里的ManyPullRequest对象实际上是一组PullRequest的集合,它封装了一个topic+queueId下的一批消息。

具体的检测逻辑通过方法checkHoldRequest()实现。

private void checkHoldRequest() {
    // 迭代PullRequest Map,key=topic@queueId
    for (String key : this.pullRequestTable.keySet()) {
        // 解析出topic  queueId
        String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
        if (2 == kArray.length) {
            String topic = kArray[0];
            int queueId = Integer.parseInt(kArray[1]);
            // 获取当前获取的数据的最大offset
            final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
            try {
                // 通知消息到达
                this.notifyMessageArriving(topic, queueId, offset);
            } catch (Throwable e) {
                log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
            }
        }
    }
}

checkHoldRequest()方法解析pullRequestTable的keySet,对key进行解析,取出topic及queueId,获取topic+queueId对应的当前MessageQueue的最大offset,并与当前的offset对比从而确定是否有新消息到达,具体逻辑在notifyMessageArriving(topic, queueId, offset);方法中实现

这里的检测逻辑整体是异步的,后台检测线程PullRequestHoldService一直在运行;在PullMessageProcessor中提交待检测的PullRequest到PullRequestHoldService,将其放入pullRequestTable,等待被PullRequestHoldService进行处理。

notifyMessageArriving(topic, queueId, offset)

public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
    String key = this.buildKey(topic, queueId);
    ManyPullRequest mpr = this.pullRequestTable.get(key);
    if (mpr != null) {
        // 根据key=topic@queueId从pullRequestTable获取ManyPullRequest
        // 如果ManyPullRequest不为空,拷贝ManyPullRequest中的List<PullRequest>
        List<PullRequest> requestList = mpr.cloneListAndClear();
        if (requestList != null) {
            // 构造响应list
            List<PullRequest> replayList = new ArrayList<PullRequest>();
            // 迭代请求list
            for (PullRequest request : requestList) {
                long newestOffset = maxOffset;
                // 如果当前最新的offset小于等于请求的offset
                if (newestOffset <= request.getPullFromThisOffset()) {
                    // 当前最新的offset就是队列的最大offset
                    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                }
                // 如果当前最新offset大于请求offset,也就是有新消息到来
                if (newestOffset > request.getPullFromThisOffset()) {
                    // 判断消息是否满足过滤表达式
                    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
                        new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
                    // match by bit map, need eval again when properties is not null.
                    if (match && properties != null) {
                        match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
                    }
                    if (match) {
                        try {
                            // 消息匹配,则将消息返回客户端
                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                request.getRequestCommand());
                        } catch (Throwable e) {
                            log.error("execute request when wakeup failed.", e);
                        }
                        continue;
                    }
                }
                // 判断是否超时
                if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                    try {
                        // 如果当前时间 >= 请求超时时间+hold时间,则返回客户端消息未找到
                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                            request.getRequestCommand());
                    } catch (Throwable e) {
                        log.error("execute request when wakeup failed.", e);
                    }
                    continue;
                }
                replayList.add(request);
            }
            if (!replayList.isEmpty()) {
                mpr.addPullRequest(replayList);
            }
        }
    }
}

总结一下,notifyMessageArriving主要作用为判断消息是否到来,并根据判断结果对客户端进行相应。

  • 比较maxOffset与当前的offset,如果当前最新offset大于请求offset,也就是有新消息到来,则将新消息返回给客户端
  • 校验是否超时,如果当前时间 >= 请求超时时间+hold阻塞时间,则返回客户端消息未找到

该方法会在PullRequestHoldService中循环调用进行检查,也会在DefaultMessageStore中消息被存储的时候调用。这里体现了主动检查与被动通知共同作用的思路。

当服务端处理完成之后,相应客户端,客户端会在消息处理完成之后再次将拉取请求pullRequest放到PullMessageService中,等待下次轮询。这样就能够一直进行消息拉取操作。

本文对RocketMQ消息拉取的长轮询机制进行了分支,我们得知:

RocektMQ并没有使用推模式或者拉模式,而是使用了结合两者优点的长轮询机制,它本质上还是拉模式,但服务端能够通过hold住请求的方式减少客户端对服务端的频繁访问,从而提高资源利用率及消息响应实时性。这种策略在服务端开发的其他方向如:IM等领域都有广泛的实践,因此了解它的原理是有必要的。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK