8

一张图进阶 RocketMQ - 消息发送 - 三此君

 2 years ago
source link: https://www.cnblogs.com/sancijun/p/16445085.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片链接,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。

【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息发送(视频版)

本文是“一张图进阶 RocketMQ” 系列第 3 篇,对 RocketMQ 不了解的同学可以先看看三此君的
一张图进阶 RocketMQ-整体架构一张图进阶 RocketMQ - NameServer
在了解了 RocketMQ 的整体架构之后,我们来深入的分析下生产者消息发送的设计与实现。本文从一个生产者示例开始,以两行代码为切入点,逐步剖析生产者启动流程以及同步消息发送流程。

生产者示例

消息发送分为同步消息、异步消息和单向消息,简单来说:

  • 同步消息:消息发送之后会等待 Broker 响应,并把响应结果传递给业务线程,整个过程业务线程在等待。
  • 异步消息:调用异步发送 API,Producer 把消息发送请求放进线程池就返回。逻辑处理,网络请求都在线程池中进行,等结果处理完之后回调业务定义好的回调函数。
  • 单向消息:只负责发送消息,不管发送结果。

我们先来回顾下同步消息发送的例子:

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("sancijun","order", "orderId", "我一定会关注三此君".getBytes("UTF-8")); 
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
      	// 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
  • 首先,实例化一个生产者 producer,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。
  • 然后 producer 得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先初始化通信模块等。
  • producer 已经准备好了,那得准备好要发的内容,把 "我一定会关注三此君" 发送到 Topic=”sanicjun“。
  • 内容准备好,那 producer 就可以把消息发送出去了。producer 怎么知道 Broker 地址呢?他会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。
  • 生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。

其中有两个关键的地方:producer.start()producer.send(),也就是生产者初始化及消息发送。我们以这两行代码为切入点,看看 RocketMQ Producer 的设计与实现。

Tips:因为本文是RocketMQ 设计与实现分析,虽然不会粘贴任何源码,但是图文中会有大量的类名和方法名,看的时候不必执着于这些陌生的类名和方法名,三此君会解释这些类和方法的用途。

目标:将消息发送给 Broker 进行存储
关键点 1: 怎样根据 topic+路由信息 建立网络通道,进行消息的发送
关键点 2: 消息在发送过程中又经过了哪些处理?

生产者启动

我们实例化一个生产者 DefaultMQProducer,并调用 DefaultMQProducer.start() 方法进行初始化:

启动流程比较长,其实最重要的就是初始化了通信模块,并启动了多个定时任务,这些在后面的消息发送过程中都会用到:

  • 检查配置是否合法:生产者组名是否为空、是否满足命名规则、长度是否满足等。

  • 启动通信模块服务 Netty RemotingClient:RemotingClient 是一个接口,底层使用的通讯框架是Netty,提供了实现类 NettyRemotingClient,RemotingClient 在初始化的时候实例化 Bootstrap,方便后续用来创建 SocketChannel;后文会介绍 RocketMQ 的通信机制,大家稍安勿躁。

  • 启动 5 个后台定时任务:定时更新 NameServerAddr 信息,定时更新 topic 的路由信息,定时向 Broker 发送心跳及清理下线的 Broker,定时持久化 Consumer 的 Offset 信息,定时调整线程池;

    生产者每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系(路由信息)存在本地内存中,如果发现新的 Broker 就会和其建立长连接,每 30s 会发送心跳至 Broker 维护连接。

Tips:生产者为什么要启动消息拉取服务?重平衡服务是什么?简单来说,这两个服务都是用于消费者的,这里我们暂且不理会。消息拉取服务 pullMessageService 是从 Broker 拉取消息的服务 ,重平衡服务 rebalanceService 用于消费者的负载均衡,负责分配消费者可消费的消息队列。

总体上讲,消息发送可以划分为三个层级:

  • 业务层:准备需要发送的消息。
  • 消息处理层:获取业务发送的 Message,经过一系列的参数检查、消息发送准备、参数包装等操作。
  • 通信层:基于 Netty 封装的一个网络通信服务,将消息发送给 Broker。

我们通过前面的示例来看整个同步消息发送的处理流程,整个过程我们的主要目标就是把消息发送到 Broker:

  • 第一步:业务层构建待发送消息 Message msg = new Message("sancijun","order", "orderId", "我一定会关注三此君".getBytes("UTF-8"));

  • 第二步:然后我们调用 producer.send(msg) 发送消息,可是 producer 怎么知道发给谁呢?消息本身又需要经过哪些处理呢?我们进入调用链直到 sendDefaultImpl

    • 检查消息是否为空,消息的 Topic 的名字是否为空或者是否符合规范,消息体大小是否符合要求,最大值为4MB,可以通过 maxMessageSize 进行设置。

    • 执行 tryToFindTopicPublishInfo() 方法:获取 Topic 路由信息,如果不存在则抛出异常。如果本地缓存没有路由信息,就通过Namesrv 获取路由信息,更新到本地。消息构建的时候我们指定了消息所属 Topic,根据 Topic 路由信息我们可以找到对应的 Broker。

      Tips:从 NameServer 获取的路由信息 TopicRouteData 会包含指定 Topic 的 topicQueueTable、brokerAddrTable。在 NameServer 集群元数据管理部分我们讲过,通过 topicName 从 topicQueueTable 获取对应的 brokerName,再根据 brokerName 从 brokerAddrTable 中获取 Broker IP 地址。

    • 计算消息发送的重试次数,同步重试和异步重试的执行方式是不同的。在同步发送情况下如果发送失败会默认重投两次(默认retryTimesWhenSendFailed = 2),并且不会选择上次失败的 Broker,会向其他 Broker 投递。

    • 执行队列选择方法 selectOneMessageQueue()。根据 lastBrokerName(上次发送消息失败的 Broker 的名字)和 Topic 路由信息选一个 MessageQueue。
      首次发送时 lastBrokerName 为 null,采用轮询策略选择一个 MessageQueue。如果上次发送失败,也是采用轮询策略选择一个 MessageQueue,但是会跳过上次发送失败 Broker 的 MessageQueue,也就是换一个 Broker 发送。

      Tips:选择一个 MessageQueue,什么是 MessageQueue 呢?这和 Broker 的存储结构相关,我们会在存储部分详细介绍,这里先说结论,我们创建 Topic 时指定了这个 Topic 的读写队列数,每个 MessageQueue 有不同的 queueId(0-3)。

      我们也可以通过sendLatencyFaultEnable 来设置是否总是发送到延迟级别较低的Broker,默认值为False,我么这里就不展开讨论了。

    • 执行 sendKernelImpl() 方法。

  • 第三步:sendDefaultImpl 做了一系列逻辑处理,我们已经得到了待发送的 BrokerName,而我们的目标是把消息发送到 Broker。sendKernelImpl 方法是发送消息的核心方法,主要用于准备通信层的入参(比如Broker地址、请求体等),将请求传递给通信层。

    • 根据 MessageQueue.brokerName 获取 Broker IP 地址,给 message 添加全局唯一 ID。

      Tips:sendKernelImpl 也有很多的逻辑处理,我们暂时先略过这里的压缩、事务消息、钩子函数、重试消息:

      对大于4k的普通消息进行压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。

      如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE

      如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑,通过DefaultMQProducerImpl#registerSendMessageHook注册钩子处理类,并且可以注册多个。

      构建发送消息请求头:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记、消息扩展属性、消息重试次数、是否是批量消息等

      处理重试消息。

    • 调用 MQClientAPIImpl.sendMessage(),首先构建一个远程请求 RemotingCommand,根据发送类型(同步或异步)调用不同的通信层实现方法。我们这里是同步消息,则调用 RemotingClient.invokeSync()。

    • 处理返回结果,将通信层返回的结果封装成 SendResult 对象返回给业务层。

  • 第四步:RemotingClient 是基于 Netty 实现的,熟悉 Netty 的同学已经大概知道后面的流程,不熟悉的同学也没有关系,这里先混个眼熟,下面我们会对 Netty 做简单的介绍。

    • RemotingClient.invokeSync() 先是通过 Broker Addr 获取或者创建 Netty Channel。先从 channelTables Map 本地缓存中,以Broker Addr 为 key 获取 Channel,没有获取到则通过 Netty Bootstrap.connect( Broker Addr) 创建 Channel,并放入缓存。
    • 然后生成<opaque, ResponseFuture>的键值对放入 responseTable 缓存中,结果返回的时候根据 opaque 从缓存中获取结果。
    • 调用 channel.writeAndFlush() 将消息通过网络传输给指定 Broker。这里是 Netty 框架的 API,已经不在 RocketMQ 范畴。
    • 调用 ResponseFuture.waitResponse() 方法,直到 Netty 接收 Broker的返回结果。其实就是执行 countDownLatch.await()。
  • 第五步:结果处理及返回。

    • Broker 处理结果返回,Netty 产生可读事件,由 Channelhandler 处理可读事件,这里是 NettyClientHandler.channelRead0()接收写入数据,处理可读事件。
    • 然后处理返回结果,从 responseTable 取出 ResponseFuture,并执行 responseFuture.putResponse()。实际上就只执行 countDownLatch.countDown() 唤醒第四步中等待的调用线程,返回 Broker 的处理结果 RemotingCommand。
    • 结果层层返回,直到 MQClientAPIImpl.sendMessageSync() 出手了,这里调用 MQClientAPIImpl.processSendResponse() 处理返回结果,封装成 SendResult 对象返回给业务层。

到这里,生产者已经将消息发送到指定的 Broker 了,其中包括了消息的层层校验及封装;还有很重要的是如何选择一个 MessageQueue 进行发送(重试),重试是保证消息发送可靠的关键步骤;最后通过 Netty 将请求发送给 Broker。我们先不管 Broker 收到请求如何处理,但是要明白消息如何送到 Broker 进行存储,需要对 Netty 有简单的理解。

以上就是 RocketMQ 消息发送的主要内容,我们简单的总结下:

  • 生产者启动:主要是调用 NettyRemotingClient.start() 初始化 Netty 客户端,并启动 5 个后台线程;
  • 消息发送:业务层封装发送的消息,逻辑层进行层层校验及封装,轮询策略选择一个 MessageQueue 发送(重试),通信层基于 Netty 将消息发送给 Broker。
  • RocketMQ 官方文档

  • RocketMQ 源码

  • 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.

  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.

  • 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK