6

跟我学RocketMQ之订阅关系一致性源码讨论

 3 years ago
source link: http://wuwenliang.net/2019/08/20/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E8%AE%A2%E9%98%85%E5%85%B3%E7%B3%BB%E4%B8%80%E8%87%B4%E6%80%A7%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RocketMQ消费者在进行消费时,需要遵循 “订阅关系一致” 原则,关于订阅关系一致,我引用阿里云RocketMQ页面的解释,如下图:

rmq0.png
rmq0.png

从图中可以提炼出关键词,即:

同一个消费者组订阅的topic、topic中的tag必须保持一致,否则会出现消费不到消息的情况。

举个例子:比如我们有个topic名为DEMO_TOPIC,它有两个tag,分别为tagA、tagB。用一个消费者组demo_group分别订阅tagA、tagB,这时就会出现某个tag对应的消费者消费不到消息的情况。

解决方法就是:针对不同的tag使用不同的消费者组,在上面的案例中的解决方法为:使用demo_group_A 订阅tagA,使用demo_group_B订阅tagB。

提供了解决方案,还是有些意犹未尽,那么我们就深入RocketMQ的源码,感受一下订阅关系一致的机理。

心跳维持MQClientInstance.java

在之前的源码解析中,我们已经讲到了消费者客户端实例MQClientInstance中启动了心跳维持线程,具体的代码如下:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();

我们进入方法startScheduledTask();

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

这段代码中,主要向定时任务调度线程池中提交了清理离线Broker、发送心跳包到所有broker这两个任务,我们重点看心跳包发送方法sendHeartbeatToAllBrokerWithLock();

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

继续进入sendHeartbeatToAllBroker()方法查看逻辑。

private void sendHeartbeatToAllBroker() {

    // 构造前置心跳包
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();

    // 没有消费者或生产者
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer");
        return;
    }

    if (!this.brokerAddrTable.isEmpty()) {
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            // 真正发送心跳的逻辑
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            ...省略异常...
                        }
                    }
                }
            }
        }
    }
}

这个方法中向所有Broker发送心跳,心跳消息类型为是HEART_BEAT类型的消息,这类消息在broker中使用ClientManageProcessor处理,那么我们就进入ClientManageProcessor看下心跳处理逻辑:heartBeat()方法

broker心跳处理逻辑

[ClientManageProcessor.java]
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 解码客户端的心跳请求体
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
        ctx.channel(),
        heartbeatData.getClientID(),
        request.getLanguage(),
        request.getVersion()
    );

    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {

        // 消息订阅组配置
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                data.getGroupName());
        boolean isNotifyConsumerIdsChangedEnable = true;
        if (null != subscriptionGroupConfig) {
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        }

        // 注册消费者实例
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
            data.getGroupName(),
            clientChannelInfo,
            data.getConsumeType(),
            data.getMessageModel(),
            data.getConsumeFromWhere(),
            data.getSubscriptionDataSet(),
            isNotifyConsumerIdsChangedEnable
        );

        if (changed) {
            log.info("registerConsumer info changed {} {}",
                data.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        }
    }

    for (ProducerData data : heartbeatData.getProducerDataSet()) {
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
            clientChannelInfo);
    }
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

我们主要关注registerConsumer()方法,此处broker会根据consumer发送的消息,获取自身记录的消费者订阅信息,这个逻辑是按照消费组为单位获取的,我们进入registerConsumer方法体

[ConsumerManager.java]
public boolean registerConsumer(final String group, 
final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, 
    MessageModel messageModel, 
    ConsumeFromWhere consumeFromWhere,
    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

    // 获取消费者组信息
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);

    // 不存在则根据心跳构造新的消费组信息
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    // 更新ClientChannelInfo
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);

    // 更新订阅关系表
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    return r1 || r2;
}

我们仔细研究一下这段代码,首先

ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);

首次不存在订阅关系直接讲订阅关系放置到订阅关系表。

接着进入consumerGroupInfo.updateSubscription(subList);方法

[ConsumerGroupInfo.java]
public boolean updateSubscription(final Set<SubscriptionData> subList) {
    boolean updated = false;

    // 遍历订阅关系列表
    for (SubscriptionData sub : subList) {
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        // 如果原先的订阅关系不存在
        if (old == null) {
            // 更新本订阅关系
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            if (null == prev) {
                updated = true;
                log.info("subscription changed, add new topic, group: {} {}",
                    this.groupName,
                    sub.toString());
            }
        // 如果当前的version大于原有version,则更新订阅关系
        // version值为系统时间戳
        // (SubscriptionData.java)
        // private long subVersion = System.currentTimeMillis();
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                log.info("subscription changed, group: {} OLD: {} NEW: {}",
                    this.groupName,
                    old.toString(),
                    sub.toString()
                );
            }

            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }

这里主要做订阅关系表更新逻辑,如果不存在订阅关系则直接更新;如果存在则比较哪一个更新,最新的会覆盖老的那一个。

Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
    Entry<String, SubscriptionData> next = it.next();
    String oldTopic = next.getKey();

    boolean exist = false;
    for (SubscriptionData sub : subList) {
        if (sub.getTopic().equals(oldTopic)) {
            exist = true;
            break;
        }
    }

    if (!exist) {
        log.warn("subscription changed, group: {} remove topic {} {}",
            this.groupName,
            oldTopic,
            next.getValue().toString()
        );

        it.remove();
        updated = true;
    }
}

this.lastUpdateTimestamp = System.currentTimeMillis();

return updated;

继续往下看,对订阅关系表进行迭代处理。

如果当前的订阅的topic与上次的topic不相等,则exist(topic存在标识)设置为true,进入if代码块,执行remove操作,将老的topic删掉,后续的topic就覆盖了老的topic。

consumerTable中存放按照消费者进行划分依据的消费者信息。如果一个组的消费信息不一样,在上文举的例子中,则订阅了topicA的消费者心跳信息首先通知broker自己组订阅了topicA/tagA,broker记录了该订阅关系并更新了本地的订阅关系表。当另外的心跳发送过来,通知broker当前组订阅的是topicB/tagB,后来的这一个的时间戳必然大于前一个,就会将前一个覆盖,导致订阅关系发生变化。

这样会导致了订阅消息相互覆盖,当拉取消息时,会存在一个消费者没法拉到消息,因为Broker上查询不到该订阅信息。

除了上述原因外,还有一个更为重要的原因在于消息的Rebalance过程。我们看一下RebalanceImpl.java

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            ...省略...
        case CLUSTERING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
            }

这里对某个topic下消息的进行Rebalance,我们进入 this.mQClientFactory.findConsumerIdList(topic, consumerGroup);这一行

[MQClientInstance.java]
public List<String> findConsumerIdList(final String topic, final String group) {
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            this.updateTopicRouteInfoFromNameServer(topic);
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }

        if (null != brokerAddr) {
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
            } catch (Exception e) {
                log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
            }
        }

        return null;
    }

这里根据topic获取到broker地址,如果broker地址存在则获取消费者id列表。

这里是根据consumerGroup组来进行选择的,如果同一个group订阅了两个以上topic或者多个tag,则会把另外一个topic的消费者也取下来,导致Rebalance之后出现问题,这会导致每个topic下面的数据量少一半(如果是2个不同topic)

关于消息的Rebalance过程我们在后续的文章中会单独进行分析。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK