10

聊聊 RocketMQ 主从复制 - 勇哥编程游记

 1 year ago
source link: https://www.cnblogs.com/makemylife/p/17517723.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

提到主从复制,我们可能立马会联想到 MySQL 的主从复制。

MySQL 主从复制是 MySQL 高可用机制之一,数据可以从数据库服务器主节点复制到一个或多个从节点。

这篇文章,我们聊聊 RocketMQ 的主从复制,希望你读完之后,能够理解主从复制的精髓。

2487169-20230630195413140-1830976675.png

1 同步与异步

在 RocketMQ 的集群模式中,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。

每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。

2487169-20230630195412074-744632871.webp

Master 节点负责接收客户端的写入请求,并将消息持久化到磁盘上。而 Slave 节点则负责从 Master 节点复制消息数据,并保持与 Master 节点的同步。

  • 同步复制
2487169-20230630195412054-1935122053.webp

生产者发送消息后,Master 接收到存储消息请求,将消息数据同步给 Slave 后,才将存储结果返回给生产者。同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低。

  • 异步复制
2487169-20230630195410125-495638183.webp

生产者发送消息后,Master 接收到存储消息请求,将消息存储后,直接将存储结果返回给生产者。 Master 和 Slave 再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量。

若 Master 出现故障,有些数据可能未写入 Slave ,未同步的数据可能丢失。

复制流程分为两个部分:元数据复制消息数据复制

  • 主从服务器同步主题,消费者进度,延迟消费进度,消费者配置数据
  • 主从服务器同步消息数据

2 元数据复制

Slave Broker 定时任务每隔 10 秒会同步元数据,包括主题消费进度延迟消费进度消费者配置

2487169-20230630195411735-1819543430.webp

同步主题时, Slave Broker 向 Master Broker 发送 RPC 请求,返回数据后,首先加入本地缓存里,然后持久化到本地。

%E5%90%8C%E6%AD%A5rpc.webp

3 消息数据复制

下图是 Master 和 Slave 消息数据同步的流程图。

%E6%B6%88%E6%81%AF%E6%95%B0%E6%8D%AE%E5%A4%8D%E5%88%B6.webp

1、Master 启动后监听指定端口;

Master 启动后创建 AcceptSocketService 服务 , 用来创建客户端到服务端的 TCP 链接。

master%E7%9B%91%E5%90%AC%E7%AB%AF%E5%8F%A3.webp

RocketMQ 抽象了链接对象 HAConnection , HAConnection 会启动两个线程,分别用于读服务和写服务:

  • 读服务:处理 Slave 发送的请求
  • 写服务:用于向 Slave 传输数据
2487169-20230630195411968-1605421507.png

2、Slave 启动后,尝试连接 Master ,建立 TCP 连接;

HAClient 是客户端 Slave 的核心类 ,负责和 Master 创建连接和数据交互。

2487169-20230630195414863-315932994.webp

客户端在启动后,首先尝试连接 Master , 查询当前消息存储中最大的物理偏移量 ,并存储在变量 currentReportedOffset 里。

3、Slave 判定拉取间隔是否大于 5 秒,则向 Master 汇报已拉取消息偏移量;

2487169-20230630195412964-1416757524.webp

上报进度的数据格式是一个 Long 类型的 Offset , 8个字节 , 非常简洁 。

2487169-20230630195412712-714224648.webp

发送到 Socket 缓冲区后 , 修改最后一次的写时间 lastWriteTimestamp 。

4、Master 解析请求偏移量,从消息文件中检索该偏移量后的所有消息;

当 Slave 上报数据到 Master 时,触发 SelectionKey.OP_READ 事件,Master 将请求交由 ReadSocketService 服务处理:

2487169-20230630195414297-576373161.webp

当 Slave Broker 传递了自身 commitlog 的 maxPhyOffset 时,Master 会马上中断 selector.select(1000) ,执行 processReadEvent 方法。

2487169-20230630195414114-2025326101.webp

processReadEvent 方法的核心逻辑是设置 Slave 的当前进度 offset ,然后通知复制线程当前的复制进度。

写服务 WriteSocketService 从消息文件中检索该偏移量后的所有消息,并将消息数据发送给 Slave。

2487169-20230630195458206-1078965869.webp

5、Slave 接收到数据,将消息数据 append 到消息文件 commitlog 里 。

2487169-20230630195450699-955536039.webp

首先 HAClient 类中调用 dispatchReadRequest 方法 , 解析出消息数据 ;

2487169-20230630195412301-1218255518.webp

然后将消息数据 append 到本地的消息存储。

2487169-20230630195414731-1178243900.webp

4 同步的实现

从数据复制流程图,我们发觉数据复制本身就是一个异步执行的,但是同步是如何实现的呢?

Master Broker 接收到写入消息的请求后 ,调用 Commitlog 的 aysncPutMessage 方法写入消息。

2487169-20230630195414502-618061499.webp

这段代码中,当 commitLog 执行完 appendMessage 后, 需要执行刷盘任务同步复制两个任务。

但这两个任务并不是同步执行,而是异步的方式,使用了 CompletableFuture 这个异步神器

当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。

2487169-20230630195415358-402816567.webp

最后 Broker 组装响应命令 ,并将响应命令返回给客户端。

1、主从复制包含元数据复制和消息数据复制两个部分;

2、元数据复制

​ Slave Broker 定时任务每隔 10 秒向 Master Broker 发送 RPC 请求,将元数据同步到缓存后,然后持久化到磁盘里;

3、消息数据复制

  • Master 启动监听指定端口
  • Slave 启动 HaClient 服务,和 Master 创建 TCP 链接
  • Slave 向 Master 上报存储进度
  • Master 接收进度,消息文件中检索该偏移量后的所有消息,并传输给 Slave
  • Slave 接收到数据后,将消息数据 append 到本地的消息存储。

4、同步的实现

​ 当 commitLog 执行完 appendMessage 后, 需要执行刷盘任务同步复制两个任务,这里用到了 CompletableFuture 这个异步神器。

​ 当 HAConnection 读服务接收到 Slave 的进度反馈,发现消息数据复制成功,则唤醒 future 。最后 Broker 组装响应命令 ,并将响应命令 返回给客户端 。


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

2487169-20230630195413152-1937942857.webp

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK