4

搭了一个RocketMQ高可用集群,同事直呼哇塞!

 1 year ago
source link: https://www.51cto.com/article/742674.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

搭了一个RocketMQ高可用集群,同事直呼哇塞!

作者:不才陈某 2022-12-20 08:32:02
本节内容主要介绍了MQ的基本知识以及RocketMQ集群搭建过程,有兴趣的可以按照笔者的整个搭建过程尝试一遍,至于其中一些配置属性以及生产、消费消息将会在后文介绍。

RocketMQ作为阿里系的一款开源的MQ中间件,经历了双十一的高并发场景的消息流转,能够处理万亿级别的消息。

这篇文章将作为《RocketMQ 进阶》专栏的第一篇文章,介绍一下实际生产中如何搭建一个高可用的RocketMQ集群。集群整体架构图如下:

图片

为什么要用MQ?

消息队列是一种“先进先出”的数据结构

图片

其应用场景主要包含以下3个方面

1、应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

图片

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

图片

2、流量削峰

图片

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

图片

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

处于经济考量目的:

业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

3、数据分发

图片

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

图片

各种MQ产品的比较

常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。

图片

关于MQ技术选型详细可以看笔者之前的文章:聊聊 MQ 技术选型

RocketMQ中的几个重要角色

从上述的集群架构图中可以知道RocketMQ中涉及到的几个重要的角色:

  • NameServer:相当于微服务中的注册中心,提供broker的服务发现和注册功能,各个节点之间无相互通信,一旦broker节点启动,将会主动上报信息给NameServer。
  • producer:消息生产者,发送消息给broker;拥有同一个的groupId的producer为一个集群
  • broker:消息暂存和传输,接收producer发送的消息,采用push/pull模式传递给consumer
  • consumer:消息消费者,消费broker传递的消息,拥有同一个groupId的consumer为一个集群

以上四个是RocketMQ对外四种角色,另外内部还有一些重要角色,如下:

  • Topic:消息主题,通过Topic对不同的业务消息进行分类。
  • Tag:消息标签,用来进一步区分某个Topic下的消息分类,消息从生产者发出即带上的属性。
  • Message Queue:队列,相当于Topic的分区,用于并行发送和消费消息,一个Topic中对应多个Queue

关于Topic和Tag的区别:比如电商中的下单、支付流程,为了提高并发量通常都会使用消息队列进行异步处理,那么可以定义消息的Topic为Topic_order,但是其中还涉及了创建订单、付款、完成订单这三类消息,如何去区分?

此时就该用到Tag去细分了,此时的对应关系如下图:

图片

Topic和Message Queue的关系如下图:

图片

一个Topic中包含多个Message Queue(队列)

RocketMQ下载

阿里将RocketMQ贡献给了Apache,所以要去Apache的官网去下载对应的版本;

地址:https://rocketmq.apache.org/dowloading/releases/

我的《RocketMQ 进阶》这个专栏选用的版本是4.9.4

下载地址:https://rocketmq.apache.org/download

针对RocketMQ对外的四种角色,集群部署有以下几点需要注意的地方:

  • NameServer之间是不相互通信的,因此NameServer集群部署非常简单,直接启动多个服务
  • broker部署分为master和slave节点,一个master对应多个slave节点,主从节点通过brokerId区分,主节点为0,从节点为1;每个broker与NameServer建立长连接,定时注册Topic信息到NameServer中。
  • producer与NameServer中的任意一个节点建立长连接,定期获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

注意这里说的集群模式是针对broker,因为涉及到broker的节点之间的数据同步问题。

NameServer各个节点间不互相通信,只需要启动多个服务便可实现一个集群

RocketMQ支持四种集群模式,如下:

1. 单Master模式

不建议使用,一旦服务重启或者宕机将导致整个服务不可用

2. 多Master模式

这个集群模式无slave节点,全部都是master节点,该模式如下图:

图片

该模式的优缺点如下:

  • 优点:该模式性能最高
  • 缺点:一旦一台服务宕机了,那么在这台服务上的消息不能被订阅消费,消息实时性会受到影响

3. 多Master多Slave(同步)

每个master对应一个slave节点,有多对master-slave,主从之间的数据复制采用同步双写的形式,如下图:

图片

主从同步双写是什么意思?

producer发送一条消息给broker的主节点,只有主节点将数据同步到从节点才会返回结果

此时的发送消息流程如下:

图片

需要经过以上4步才能实现消息发送成功,此时如果主从数据复制阻塞,那么producer必须等待直到成功。

这种模式的优缺点如下:

  • 优点:无单点故障,数据不会丢失,即使master宕机了,salve节点依然能够对外提供服务
  • 缺点:由于是同步复制,性能比异步复制的模式低

4. 多Master多Slave(异步)

每个Master配置一个Slave,有多对master-slave,采用异步复制的方式,如下:

图片

消息发送到的master后直接返回,不必等待主从复制,而是内部通过异步的方式进行复制。

该种模式的优缺点如下:

  • 优点:无单点故障,消息无延迟,即使master宕机了,salve节点依然能够对外提供服务
  • 性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高。

主从同步集群搭建

根据上面的介绍,主从同步集群模式使用4个节点,分别是两个主节点、两个从节点。

笔者这里是使用两台机器将节点均摊,如下图:

图片

在安装之前需要做些准备工作,如下:

  • 准备两台服务器/虚拟机
  • 安装好JDK1.8的环境
  • 下载好rocketmq-all-4.9.4-bin-release

笔者使用的是Centos7的虚拟机进行演示,如下:

192.168.47.146

nameserver、brokerserver

Master1、Slave2

192.168.47.145

nameserver、brokerserver

Master2、Slave1

1. 添加环境变量

RocketMQ的启动需要依赖的一个环境变量:ROCKETMQ_HOME(RocketMQ的根目录)

export PATH=$JAVA_HOME/bin:$PATH
export ROCKETMQ_HOME=/usr/local/rocketmq-all-4.9.4-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

除了以上RocketMQ的环境变量配置,还需添加JDK的配置,省略...

配置保存之后,执行下述命令:

source /etc/profile

2. 创建消息存储路径

RocketMQ是将消息存储在磁盘,因此需要创建存储路径,如下:

mkdir -p /usr/local/rocketmq/store/master
mkdir -p /usr/local/rocketmq/store/master/commitlog
mkdir -p /usr/local/rocketmq/store/master/consumequeue
mkdir -p /usr/local/rocketmq/store/master/index

mkdir -p /usr/local/rocketmq/store/slave
mkdir -p /usr/local/rocketmq/store/slave/commitlog
mkdir -p /usr/local/rocketmq/store/slave/consumequeue
mkdir -p /usr/local/rocketmq/store/slave/index

3. broker配置文件

总共四个节点,分别配置如下:

(1)master1

这个配置文件是broker-a.properties,如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptinotallow=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeCnotallow=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathCnotallow=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushCnotallow=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactinotallow=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

(2)slave2

修改配置文件broker-b-s.properties,如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptinotallow=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeCnotallow=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/slave/commitlog
#消费队列存储路径存储路径
storePathCnotallow=/usr/local/rocketmq/store/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushCnotallow=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactinotallow=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

(3)master2

修改broker-b.properties,如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptinotallow=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeCnotallow=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathCnotallow=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushCnotallow=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactinotallow=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

​(4)slave1

修改broker-a-s.properties,如下:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptinotallow=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeCnotallow=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/slave
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/slave/commitlog
#消费队列存储路径存储路径
storePathCnotallow=/usr/local/rocketmq/store/slave/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/slave/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/slave/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/slave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushCnotallow=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactinotallow=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

关于上面的各个配置有什么用后面章节会详细介绍

4、开放端口

宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙

# 关闭防火墙
systemctl stop firewalld.service 
# 查看防火墙的状态
firewall-cmd --state 
# 禁止firewall开机启动
systemctl disable firewalld.service

或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:

  • nameserver 默认使用 9876 端口
  • master 默认使用 10911 端口
  • slave 默认使用11011 端口

执行以下命令:

# 开放name server默认端口
firewall-cmd --remove-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --remove-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --remove-port=11011/tcp --permanent 
# 重启防火墙
firewall-cmd --reload

5. Host添加信息

需要在hosts中添加信息,这样后面的配置就不用通过ip指定了。

执行如下命令进入hosts文件:

vim /etc/hosts

配置信息如下:

# nameserver
192.168.47.146 rocketmq-nameserver1
192.168.47.145 rocketmq-nameserver2
# broker
192.168.47.146 rocketmq-master1
192.168.47.146 rocketmq-slave2
192.168.47.145 rocketmq-master2
192.168.47.145 rocketmq-slave1

配置完成后, 重启网卡:

systemctl restart network

6. 修改启动脚本

内置RocketMQ启动对服务器内存要求较高,由于笔者本地测试的配置较低,因此需要修改JVM启动参数,以下两个脚本都在bin目录下。

(1)runbroker.sh脚本修改:

图片

根据自己服务器的配置进行修改

(2) runserver.sh 脚本修改:

图片

7. 服务启动

RocketMQ启动分为两步:

  • 启动NameServer
  • 启动borker集群

(1)启动NameServer

分别在两台服务器上启动,命令如下:

cd /usr/local/rocketmq-all-4.9.4-bin-release/bin
nohup sh mqnamesrv &

(2)启动broker集群

这里master和slave总计四个,均摊在两个服务器上,下面分别启动

master1启动,命令如下:

cd /usr/local/rocketmq-all-4.9.4-bin-release/bin
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &

slave2启动,命令如下:

cd /usr/local/rocketmq-all-4.9.4-bin-release/bin
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &

master1和slave2在同一台服务器上(192.168.47.146)

master2启动,命令如下:

cd /usr/local/rocketmq-all-4.9.4-bin-release/bin
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &

slave1启动,命令如下:

cd /usr/local/rocketmq-all-4.9.4-bin-release/bin
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &

master2和slave1在同一台服务器上(192.168.47.145)

8. 查看进程状态

第7步启动成功后,查询进程状态观察RocketMQ是否启动成功,命令如下:

图片

9. 查看日志

同时也可以观察RocketMQ的日志看下是否异常,命令如下:

# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log

集群监控平台

RocketMQ有一个对其扩展的开源项目rocketmq-dashboard,直接将该项目拉到本地,修改其中的几个参数编译打包即可

修改application.yml中的NameServer的配置,改成自己搭建的地址,如下:

图片

然后打包运行,命令如下:

//打包
mvn clean package -Dmaven.test.skip=true

//运行
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

运行成功之后,浏览器访问:http://ip:8080

图片

进入集群这一栏,看下自己搭建的集群信息,如下图:

图片

本节内容主要介绍了MQ的基本知识以及RocketMQ集群搭建过程,有兴趣的可以按照笔者的整个搭建过程尝试一遍,至于其中一些配置属性以及生产、消费消息将会在后文介绍。

责任编辑:武晓燕 来源: 码猿技术专栏

Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK