9

跟我学RocketMQ之消息消费源码解析(1)

 3 years ago
source link: http://wuwenliang.net/2019/08/14/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E6%B6%88%E6%81%AF%E6%B6%88%E8%B4%B9%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-1/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文我们接着分析一下RocektMQ实现消息消费的源码细节,这部分的内容较多,因此拆分为几个章节分别进行讲解。

本章节重点讲解DefaultMQPushConsumer的代码逻辑。

DefaultMQPushConsumer使用样例

按照惯例还是先看一下DefaultMQPushConsumer的使用样例。

@PostConstruct
public void init() {
    defaultMQPushConsumer = new DefaultMQPushConsumer("ORDER_RESULT_NOTIFY_GROUP");
    defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe("ORDER_RESULT_NOTIFY_TOPIC", "*");
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        throw new RuntimeException("[订单结果通知消息消费者]--NotifySendConsumer加载异常!", e);
    }
    LOGGER.info("[订单结果通知消息消费者]--NotifySendConsumer加载完成!");
}

初始化过程中需要调用registerMessageListener将具体的消费实现Listener注入。

@Component(value = "notifySendListenerImpl")
public class NotifySendListenerImpl implements MessageListenerConcurrently {
...省略部分代码...

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

    try {
        for (MessageExt msg : msgs) {
            // 消息解码
            String message = new String(msg.getBody());
            // 消费次数
            int reconsumeTimes = msg.getReconsumeTimes();
            String msgId = msg.getMsgId();
            String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;

            LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-接收到消息,message={},{}", message, logSuffix);
            // 请求组装
            OrderResultNofityProtocol protocol = new OrderResultNofityProtocol();
            protocol.decode(message);
            // 参数加签,获取用户privatekey
            String privateKey = protocol.getPrivateKey();
            String notifyUrl = protocol.getMerchantNotifyUrl();
            String purseId = protocol.getPurseId();
            ChargeNotifyRequest chargeNotifyRequest = new ChargeNotifyRequest();
            chargeNotifyRequest.setChannel_orderid(protocol.getChannelOrderId())
                    .setFinish_time(DateUtil.formatDate(new Date(System.currentTimeMillis())))
                    .setOrder_status(NotifyConstant.NOTIFY_SUCCESS)
                    .setPlat_orderid(protocol.getOrderId())
                    .setSign(chargeNotifyRequest.sign(privateKey));
            LOGGER.info("[通知发送消息消费者]-OrderNotifySendProducer-订单结果通知入参:{},{}", chargeNotifyRequest.toString(), logSuffix);
            // 通知发送
            return sendNotifyByPost(reconsumeTimes, logSuffix, protocol, notifyUrl, purseId, chargeNotifyRequest);
        }
    } catch (Exception e) {
        LOGGER.error("[通知发送消息消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

上面就是一个较为标准的在spring框架中使用RocektMQ的DefaultMQPushConsumer进行消费的主流程。

接下来我们重点分析一下源码实现。

初始化DefaultMQPushConsumer

首先看一下DefaultMQPushConsumer的初始化过程。

进入DefaultMQPushConsumer.java类,查看构造方法:

public DefaultMQPushConsumer(final String consumerGroup) {
    this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
}

调用了它的同名构造,采用AllocateMessageQueueAveragely策略(平均散列队列算法

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

可以看到实际初始化是通过DefaultMQPushConsumerImpl实现的,DefaultMQPushConsumer持有一个defaultMQPushConsumerImpl的引用。

[DefaultMQPushConsumerImpl.java]
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
    // 初始化DefaultMQPushConsumerImpl,将defaultMQPushConsumer的实际引用传入
    this.defaultMQPushConsumer = defaultMQPushConsumer;
    // 传入rpcHook并指向本类的引用
    this.rpcHook = rpcHook;
}

注册消费监听MessageListener

我们接着看一下注册消费监听器的流程。

消费监听接口MessageListener有两个具体的实现,分别为

MessageListenerConcurrently     -- 并行消费监听
MessageListenerOrderly          -- 顺序消费监听

本文以MessageListenerConcurrently为主要讲解的对象。

查看MessageListenerConcurrently的注册过程。

@Override
public void registerMessageListener(
            MessageListenerConcurrently messageListener) {
    // 将实现指向本类引用
    this.messageListener = messageListener;
    // 进行真实注册
    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}

接着看defaultMQPushConsumerImpl.registerMessageListener

DefaultMQPushConsumerImpl.java
public void registerMessageListener(MessageListener messageListener) {
    this.messageListenerInner = messageListener;
}

可以看到DefaultMQPushConsumerImpl将真实的messageListener实现指向它本类的messageListener引用。

订阅topic

接着看一下订阅topic的主流程。

topic订阅主要通过方法subscribe实现,首先看一下DefaultMQPushConsumer的subscribe实现

@Override
public void subscribe(String topic, String subExpression) 
                                    throws MQClientException {
    this.defaultMQPushConsumerImpl
        .subscribe(withNamespace(topic), subExpression);
}

可以看到是调用了DefaultMQPushConsumerImpl的subscribe方法。

public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        // 构建主题的订阅数据,默认为集群消费
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
            topic, subExpression);
        // 将topic的订阅数据进行保存
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            // 如果MQClientInstance不为空,则向所有的broker发送心跳包,加锁
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

看一下buildSubscriptionData代码逻辑

[FilterAPI.java]
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
    String subString) throws Exception {
    // 构造一个SubscriptionData实体,设置topic、表达式(tag)
    SubscriptionData subscriptionData = new SubscriptionData();
    subscriptionData.setTopic(topic);
    subscriptionData.setSubString(subString);

    // 如果tag为空或者为"*",统一设置为"*",即订阅所有消息
    if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
    } else {
        // tag不为空,则先按照‘|’进行分割
        String[] tags = subString.split("\\|\\|");
        if (tags.length > 0) {
            // 遍历tag表达式数组
            for (String tag : tags) {
                if (tag.length() > 0) {
                    String trimString = tag.trim();
                    if (trimString.length() > 0) {
                        // 将每个tag的值设置到tagSet中
                        subscriptionData.getTagsSet().add(trimString);
                        subscriptionData.getCodeSet().add(trimString.hashCode());
                    }
                }
            }
        } else {
            // tag解析异常
            throw new Exception("subString split error");
        }
    }
    return subscriptionData;
}

看一下sendHeartbeatToAllBrokerWithLock代码逻辑

[MQClientInstance.java]
public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            // 发送心跳包
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

可以看到,同步发送心跳包给所有的broker,而该过程是通过RemotingClient统一实现的,通过调用RemotingClient.invokeSync实现心跳包的发送,底层是通过Netty实现的。具体细节本文不进行展开。

启动消费客户端

上述初始化流程执行完毕之后,通过start()方法启动消费客户端。

@Override
public void start() throws MQClientException {
    // 设置消费者组
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    // 启动消费客户端
    this.defaultMQPushConsumerImpl.start();
    // trace处理逻辑
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

关于trace的处理逻辑,本文不再展开,感兴趣的同学可以移步 跟我学RocketMQ之消息轨迹实战与源码分析

接着看defaultMQPushConsumerImpl.start()方法逻辑

[DefaultMQPushConsumerImpl.java]
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={},
             isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

首次启动后,执行配置检查,该方法为前置校验方法,主要进行消费属性校验。

this.checkConfig();

将订阅关系配置信息进行复制

this.copySubscription();

如果当前为集群消费模式,修改实例名为pid

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

创建一个新的MQClientInstance实例,如果已经存在直接使用该存在的MQClientInstance

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

为消费者负载均衡实现rebalanceImpl设置属性

// 设置消费者组
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 设置消费模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 设置队列分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 设置当前的MQClientInstance实例
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);


this.pullAPIWrapper = new PullAPIWrapper(
    mQClientFactory,
    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册消息过滤钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

处理offset存储方式

// offsetStore不为空则使用当前的offsetStore方式
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    // 否则根据消费方式选择具体的offsetStore方式存储offset
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 如果是广播方式,则使用本地存储方式
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        // 如果是集群方式,则使用远端broker存储方式存储offset
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
 // 加载当前的offset
this.offsetStore.load();

根据MessageListener的具体实现方式选取具体的消息拉取线程实现。

// 如果是MessageListenerOrderly顺序消费接口实现
// 消息消费服务选择:ConsumeMessageOrderlyService(顺序消息消费服务)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} 
// 如果是MessageListenerConcurrently并行消息消费接口实现
// 消息消费服务选择:ConsumeMessageConcurrentlyService(并行消息消费服务)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

选择并初始化完成具体的消息消费服务之后,启动消息消费服务。consumeMessageService主要负责对消息进行消费,它的内部维护了一个线程池。

// 启动消息消费服务
this.consumeMessageService.start();

接着向MQClientInstance注册消费者,并启动MQClientInstance。这里再次强调

一个JVM中所有消费者、生产者持有同一个MQClientInstance,且MQClientInstance只会启动一次

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;

如果MQClientInstance已经启动,或者已经关闭,或者启动失败,重复调用start会报错。这里也能直观的反映出:MQClientInstance的启动只有一次

    case RUNNING:
    case START_FAILED:
    case SHUTDOWN_ALREADY:
        throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    default:
        break;
}

启动完成执行后续收尾工作

    // 订阅关系改变,更新Nameserver的订阅关系表
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    // 检查客户端状态
    this.mQClientFactory.checkClientInBroker();
    // 发送心跳包
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 唤醒执行消费者负载均衡
    this.mQClientFactory.rebalanceImmediately();
}

copySubscription(),消息重试topic处理逻辑

消费者启动流程较为重要,我们接着对其中的重点方法展开讲解。这部分内容可以暂时跳过,不影响对主流程的把控。

我们研究一下copySubscription方法的实现细节。

[DefaultMQPushConsumerImpl.java]
private void copySubscription() throws MQClientException {
    try {

        // 首先获取订阅信息
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }

        // 为defaultMQPushConsumer设置具体的MessageListener实现
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }

根据消费类型选择是否进行重试topic订阅

        switch (this.defaultMQPushConsumer.getMessageModel()) {

            // 如果是广播消费模式,则不进行任何处理,即无重试
            case BROADCASTING:
                break;

            // 如果是集群消费模式,订阅重试主题消息
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                    retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

如果是集群消费模式,会订阅重试主题消息

获取重试topic,规则为 RETRY_GROUP_TOPIC_PREFIX + consumerGroup,即:“%RETRY%”+消费组名

为重试topic设置订阅关系,订阅所有的消息;

消费者启动的时候会自动订阅该重试主题,并参与该topic的消息队列负载过程。

到此,我们就DefaultMQPushConsumer的初始化、启动、校验以及topic订阅、重试等代码实现
细节进行了较为详细的讲解。

下一章节,我将带领读者对消息消费线程 consumeMessageService 的实现进行分析,我们下篇文章见。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK