4

【RocketMQ】消息拉模式分析 - shanml

 1 year ago
source link: https://www.cnblogs.com/shanml/p/17062238.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【RocketMQ】消息拉模式分析

RocketMQ有两种获取消息的方式,分别为推模式和拉模式。

推模式
推模式在【RocketMQ】消息的拉取一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还是需要消费向Broker发送拉取请求获取消息内容,推模式对应的消息消费实现类为DefaultMQPushConsumerImpl,回顾一下推模式下的消息消费过程:

  1. 消费者在启动的时候做一些初始化工作,它会创建MQClientInstance并进行启动;
  2. MQClientInstance中引用了消息拉取服务PullMessageService和负载均衡服务RebalanceService,它们都继承了ServiceThread,MQClientInstance在启动后也会对它们进行启动,所以消息拉取线程和负载均衡线程也就启动了;
  3. 负载均衡服务启动后,会对该消费者订阅的主题进行负载均衡,为消费者分配消息队列,并创建PullRequest拉取请求,用于拉取消息;
  4. PullMessageService中等待阻塞队列中PullRequest拉取请求的到来,接着会调用DefaultMQPushConsumerImplpullMessage方法进行消息拉取;
  5. 消费者向Broker发送拉取消息的请求,从Broker拉取消息;
  6. 消费者对Broker返回的响应数据进行处理,解析消息进行消费;

推模式下进行消息消费的例子:

@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
    private String consumerGroup;
    private String topic = "FooBar";
    private String brokerName = "BrokerA";
    private MQClientInstance mQClientFactory;

    @Mock
    private MQClientAPIImpl mQClientAPIImpl;
    private static DefaultMQPushConsumer pushConsumer;

    @Before
    public void init() throws Exception {
        // ...
        // 消费者组
        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
        // 实例化DefaultMQPushConsumer
        pushConsumer = new DefaultMQPushConsumer(consumerGroup);
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置拉取间隔
        pushConsumer.setPullInterval(60 * 1000);
        // 注册消息监听器
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 处理消息
                    System.out.println(new String(x.getBody()));
                });
                return null;
            }
        });
        // ...
        // 设置订阅的主题
        pushConsumer.subscribe(topic, "*");
        // 启动消费者
        pushConsumer.start();
    }
}

消息推模式的详细过程可参考【RocketMQ】消息的拉取,接下来我们看一下拉模式。

拉模式
首先来看一下拉模式下进行消息消费的例子,拉模式下需要消费者不断调用poll方法获取消息,底层是一个阻塞队列,如果队列中没有数据,会进入等待直到队列中增加了数据:

 private void testPull() {
        // 创建DefaultLitePullConsumer
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumerGroup");;
        try {
            litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
            litePullConsumer.subscribe("LitePullConsumerTest", "*");
            litePullConsumer.start();
            litePullConsumer.setPollTimeoutMillis(20 * 1000);
            while(true) {
                // 获取消息
                List<MessageExt> result = litePullConsumer.poll();
                Optional.ofNullable(result).orElse(new ArrayList<MessageExt>()).stream().forEach(x-> {
                    // 处理消息
                    System.out.println(new String(x.getBody()));
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            litePullConsumer.shutdown();
        }
    }

推模式与拉模式的区别
对比上面推模式进行消费的例子,从使用方式上来讲,推模式不需要消费者主动去拉取消息,只需要注册消息监听器,当有消息到达时,触发consumeMessage方法进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,尽管底层还是需要消费者发起拉取请求向Broker拉取消息

拉模式在使用方式上,需要消费者主动调用poll方法获取消息,从表面上看消费者需要不断主动进行消息拉取,所以叫做拉模式。

拉模式实现原理

拉模式下对应的消息拉取实现类为DefaultLitePullConsumerImpl,在DefaultLitePullConsumerDefaultMQPullConsumer被标注了@Deprecated,已不推荐使用)的构造函数中,可以看到对其进行了实例化,并在start方进行了启动:

public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
    // 拉模式下默认的消息拉取实现类
    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;

    public DefaultLitePullConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.consumerGroup = consumerGroup;
        // 创建DefaultLitePullConsumerImpl
        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
    }

    @Override
    public void start() throws MQClientException {
        setTraceDispatcher();
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        // 启动DefaultLitePullConsumerImpl
        this.defaultLitePullConsumerImpl.start();
        // ...
    }
}

与消息推模式类似,DefaultLitePullConsumerImpl的start的方法主要做一些初始化的工作:

  1. 初始化客户端实例对象mQClientFactory,对应实现类为MQClientInstance,拉取服务线程、负载均衡线程都是通过MQClientInstance启动的;
  2. 初始化负载均衡类,拉模式对应的负载均衡类为RebalanceLitePullImpl
  3. 创建消息拉取API对象PullAPIWrapper,用于向Broker发送拉取消息的请求;
  4. 初始化消息拉取偏移量;
  5. 启动一些定时任务;
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }
                // 初始化MQClientInstance
                initMQClientFactory();
                // 初始化负载均衡
                initRebalanceImpl();
                // 初始化消息拉取API对象
                initPullAPIWrapper();
                // 初始化拉取偏移量
                initOffsetStore();
                // 启动MQClientInstance
                mQClientFactory.start();
                // 启动一些定时任务
                startScheduleTask();
                this.serviceState = ServiceState.RUNNING;
                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());
                operateAfterRunning();
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }
}

拉取模式对应的负载均衡类为RebalanceLitePullImpl(推模式使用的是RebalanceService),在initRebalanceImpl方法中设置了消费者组、消费模式、分配策略等信息:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    // 实例化,拉模式使用的是RebalanceLitePullImpl
    private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);

    private void initRebalanceImpl() {
        // 设置消费者组
        this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
        // 设置消费模式
        this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
        // 设置分配策略
        this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
        // 设置mQClientFactory
        this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    }
}

【RocketMQ】消息的拉取一文中已经讲到过,消费者启动后会进行负载均衡,对每个主题进行负载均衡,拉模式下处理逻辑也是如此,所以这里跳过中间的过程,进入到rebalanceByTopic方法,可以负载均衡之后如果消费者负载的ProcessQueue发生了变化,会调用messageQueueChanged方法触发变更事件:

public abstract class RebalanceImpl {
     private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // ...
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // ...
                if (mqSet != null && cidAll != null) {
                    // ...
                    try {
                        // 分配消息队列
                        allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    // 更新处理队列
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        // 触发变更事件
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
}

触发消息队列变更事件

RebalanceLitePullImplmessageQueueChanged方法中又调用了MessageQueueListenermessageQueueChanged方法触发消息队列改变事件:

public class RebalanceLitePullImpl extends RebalanceImpl {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
        if (messageQueueListener != null) {
            try {
                // 触发改变事件
                messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
            } catch (Throwable e) {
                log.error("messageQueueChanged exception", e);
            }
        }
    }
}

MessageQueueListenerImplDefaultLitePullConsumerImpl的内部类,在messageQueueChanged方法中,不管是广播模式还是集群模式,都会调用updatePullTask更新拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    class MessageQueueListenerImpl implements MessageQueueListener {
        @Override
        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
            MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
            switch (messageModel) {
                case BROADCASTING:
                    updateAssignedMessageQueue(topic, mqAll);
                    updatePullTask(topic, mqAll); // 更新拉取任务
                    break;
                case CLUSTERING:
                    updateAssignedMessageQueue(topic, mqDivided);
                    updatePullTask(topic, mqDivided); // 更新拉取任务
                    break;
                default:
                    break;
            }
        }
    }
}

更新拉取任务

在updatePullTask方法中,从拉取任务表taskTable中取出了所有的拉取任务进行遍历,taskTable中记录了之前分配的拉取任务,负载均衡之后可能发生变化,所以需要对其进行更新,这一步主要是处理原先分配给当前消费者的消息队列,在负载均衡之后不再由当前消费者负责,所以需要从taskTable中删除,之后调用startPullTask启动拉取任务:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();

    private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
        // 从拉取任务表中获取之前分配的消息队列进行遍历
        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
            // 如果与重新进行负载均衡的主题一致
            if (next.getKey().getTopic().equals(topic)) {
                // 如果重新分配的消息队列集合中不包含此消息独立
                if (!mqNewSet.contains(next.getKey())) {
                    next.getValue().setCancelled(true);
                    // 从任务表移除
                    it.remove();
                }
            }
        }
        // 启动拉取任务
        startPullTask(mqNewSet);
    }
}

提交拉取任务

startPullTask方法入参中传入的是负载均衡后重新分配的消息队列集合,在startPullTask中会对重新分配的集合进行遍历,如果taskTable中不包含某个消息队列,就构建PullTaskImpl对象,加入taskTable,这一步主要是处理负载均衡后新增的消息队列,为其构建PullTaskImpl加入到taskTable,之后将拉取消息的任务PullTaskImpl提交到线程池周期性的执行:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {

    private void startPullTask(Collection<MessageQueue> mqSet) {
        // 遍历最新分配的消息队列集合
        for (MessageQueue messageQueue : mqSet) {
            // 如果任务表中不包含
            if (!this.taskTable.containsKey(messageQueue)) {
                // 创建拉取任务
                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
                // 加入到任务表
                this.taskTable.put(messageQueue, pullTask);
                // 将任务提交到线程池定时执行
                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
}

PullTaskImpl继承了Runnable,在run方法中的处理逻辑如下:

  1. 获取消息队列对应处理队列ProcessQueue;
  2. 获取消息拉取偏移量,也就是从何处开始拉取消息;
  3. 调用pull方法进行消息拉取;
  4. 判断拉取结果,如果拉取到了消息,将拉取到的结果封装为ConsumeRequest进行提交,也就是放到了阻塞队列中,后续消费者从队列中获取数据进行消费;
   public class PullTaskImpl implements Runnable {
        private final MessageQueue messageQueue;
        private volatile boolean cancelled = false;
        private Thread currentThread;

        @Override
        public void run() {
            // 如果未取消
            if (!this.isCancelled()) {
                this.currentThread = Thread.currentThread();
                // ...
                // 获取消息队列对应的ProcessQueue
                ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                // ...  跳过一系列校验
                long offset = 0L;
                try {
                    // 获取拉取偏移量
                    offset = nextPullOffset(messageQueue);
                } catch (Exception e) {
                    log.error("Failed to get next pull offset", e);
                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
                    return;
                }

                if (this.isCancelled() || processQueue.isDropped()) {
                    return;
                }
                long pullDelayTimeMills = 0;
                try {
                    SubscriptionData subscriptionData;
                    // 获取主题
                    String topic = this.messageQueue.getTopic();
                    // 获取主题对应的订阅信息SubscriptionData
                    if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                        subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
                    } else {
                        subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
                    }
                    // 拉取消息
                    PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
                    if (this.isCancelled() || processQueue.isDropped()) {
                        return;
                    }
                    // 判断拉取结果
                    switch (pullResult.getPullStatus()) {
                        case FOUND: // 如果获取到了数据
                            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
                            synchronized (objLock) { // 加锁
                                if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                                    processQueue.putMessage(pullResult.getMsgFoundList());
                                    // 将拉取结果封装为ConsumeRequest,提交消费请求
                                    submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
                                }
                            }
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("The pull request offset illegal, {}", pullResult.toString());
                            break;
                        default:
                            break;
                    }
                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
                } catch (InterruptedException interruptedException) {
                    log.warn("Polling thread was interrupted.", interruptedException);
                } catch (Throwable e) {
                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                    log.error("An error occurred in pull message process.", e);
                }
                // ...
            }
        }
    }

submitConsumeRequest方法中可以看到将创建的ConsumeRequest对象放入了阻塞队列consumeRequestCache中:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    // 阻塞队列
    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
        try {
            // 放入阻塞队列consumeRequestCache中
            consumeRequestCache.put(consumeRequest);
        } catch (InterruptedException e) {
            log.error("Submit consumeRequest error", e);
        }
    }
}

在前面的例子中,可以看到消费者是调用poll方法获取数据的,进入到poll方法中,可以看到是从consumeRequestCache中获取消费请求的,然后从中解析出消息内容返回:

public class DefaultLitePullConsumerImpl implements MQConsumerInner {
    
    public synchronized List<MessageExt> poll(long timeout) {
        try {
            // ...
            long endTime = System.currentTimeMillis() + timeout;
            // 从consumeRequestCache中获取数据进行处理
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            // ...
            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                // 获取消息内容
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                this.resetTopic(messages);
                // 返回消息内容
                return messages;
            }
        } catch (InterruptedException ignore) {
        }
        return Collections.emptyList();
    }
}

参考

RocketMQ源码分析之pull模式consumer

RocketMQ版本:4.9.3


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK