8

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群

 2 years ago
source link: https://blog.51cto.com/u_15500707/5372519
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker

一、为什么要搭建RabbitMQ集群?

未部署集群的缺点

如果RabbitMQ集群只有一个broker节点,那么该节点的失效将导致整个服务临时性的不可用,并且可能会导致message的丢失(尤其是在非持久化message存储于非持久化queue中的时候)。可以将所有message都设置为持久化,并且使用持久化的queue,但是这样仍然无法避免由于缓存导致的问题:因为message在发送之后和被写入磁盘并执行fsync之间存在一个虽然短暂但是会产生问题的时间窗。通过publisher的confirm机制能够确保客户端知道哪些message已经存入磁盘,尽管如此,一般不希望遇到因单点故障导致服务不可用。

部署集群后

如果RabbitMQ集群是由多个broker节点构成的,那么从服务的整体可用性上来讲,该集群对于单点失效是有弹性的,但是同时也需要注意:尽管exchange和binding能够在单点失效问题上幸免于难,但是queue和其上持有的message却不行,这是因为queue及其内容仅仅存储于单个节点之上,所以一个节点的失效表现为其对应的queue不可用。

> RabbitMQ本身是基于Erlang编写Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的erlang.cookie来实现)。因此,RabbitMQ天然支持集群。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

为了提高程序的吞吐量,保持消息的可靠性,一台机器挂了后,RabbitMQ能够正常生产,消费消息。

二、RabbitMQ集群的三种模式

rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式

☁️单机模式

Demo级别的,一般只是本机测试玩玩而已,生产环境下不会用的。

⛅普通集群模式

多台机器上启动多个rabbitmq实例每个机器启动一个。

但是你创建的queue,只会放在一个rabbtimq实例上,但是每个实例都同步queue的元数据(存放含queue数据的真正实例位置)。消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。

示意图

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_erlang_02

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。普通集群的方式,确实达到了消息的高可用,但没办法保证可靠性,没做到分布式,简而言之,只是一个普通的集群。

缺点:

  • 可能会在RabbitMQ集群内部产生大量数据传输
  • 可用性没有达到保证,一台机器挂了就是挂了,无法恢复,只能手动恢复

⚡镜像队列

这种模式,才是所谓的rabbitmq的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_03

上图中每个节点有一个queue,生产者生产完毕数据后投递到指定交换机的队列,交换机的队列进行消息同步。

每个节点queue都有一个完整的rabbitmq节点,所以这种方式叫做镜像集群

> 镜像集群模式的好处与坏处好处: 任何一个节点宕机后,其它节点不受影响,正常使用

坏处:

  • 性能开销大,消息同步所有机器,导致网络带宽压力和消耗很重
  • 没有扩展性,如果某个queue负载很重,加机器,新增的机器也包含了这个queue的所有数据,没有办法扩展

对于以上方式,我们的镜像集群可以通过配置来解决这种扩展性的问题,配置同步的方式

三、阿里云服务器下Docker搭建RabbitMQ集群

♻️Docker安装RabbitMQ集群

确保机器中安装了Docker,若未安装,可看: ​【云原生】Docker入门 -- 阿里云服务器Linux环境下安装Docker

 使用Docker安装RabbitMQ镜像

# 拉取镜像,带有管理界面的版本
docker pull rabbitmq:management

查看拉取的镜像

docker images
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_04

运行Docker镜像

# 开启第一个RabbitMQ服务
docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

# 开启第二个
docker run -d --hostname myRabbit2 --name rabbit2 -p 15673:15672 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

# 开启第三个
docker run -d --hostname myRabbit3 --name rabbit3 -p 15674:15672 -p 5674:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

查看正在运行的镜像

docker ps
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_05

成功运行

 配置RabbitMQ节点之间的关系设置节点1

[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_06

设置节点2

[root@wanghuichen /]# docker exec -it rabbit2 bash
root@myRabbit2:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Clustering node rabbit@myRabbit2 with rabbit@myRabbit1
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_07

配置节点3

[root@wanghuichen /]# docker exec -it rabbit3 bash
root@myRabbit3:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
root@myRabbit3:/# rabbitmqctl start_app
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_erlang_08

进入每个集群依次设置用户密码

rabbitmqctl add_user admin admin

rabbitmqctl set_user_tags admin administrator

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

配置镜像队列

[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
root@myRabbit1:/# rabbitmqctl cluster_status
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Cluster status of node rabbit@myRabbit1 ...
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_09

查看集群状态

rabbitmqctl cluster_status
RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_erlang_10

 常用命令

# 查看已经运行过但停止了的镜像
docker ps -a
# 停止镜像
docker stop 镜像id/镜像名称
# 开启镜像,恢复运行状态
dockers start 镜像id/镜像名称
# 删除镜像
docker rm 镜像id/镜像名称
# 删除所有镜像
docker rmi $(docker ps -a)

😄测试RabbitMQ集群

浏览器输入 您的ip地址:15673

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_11

部署RabbitMQ镜像集群成功~

 如果出现无法访问的情况,可在阿里云服务器开启安全组,因为阿里云默认全部开启了安全组,配置允许访问的端口即可

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_12

再次测试即可成功~

四、SpringBoot整合RabbitMQ集群

✅创建Maven聚合工程

File ---> New ---> Project ---> Maven ---> 直接Next 进入下一步创建普通的Maven工程即可

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_erlang_13

创建一个默认的Maven聚合工程,将src文件夹删除,该工程就是一个Maven聚合工程

😃引入共有依赖

引入依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.wanshi</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>rabbitmq-order-producer</module>
<module>rabbitmq-order-consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</dependencyManagement>

</project>

⏳创建生产者

在项目内,新建一个Moudle,rabbitmq-order-producer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rabbitmq-order-producer</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>

⌛创建消费者

在项目内,新建一个Moudle,rabbitmq-order-cousumer 默认Maven工程,下一步即可

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-rabbitmq</artifactId>
<groupId>com.wanshi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rabbitmq-order-consumer</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

</project>

Maven聚合工程创建完成图

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_14

Maven依赖图

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_erlang_15

自行手写MainApplication即可创建完成!

♨️核心源码

生产者服务配置

# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
connection-timeout: 16000
addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,
# 启用消息确认模式
publisher-confirm-type: correlated

# 启用 return 消息模式
publisher-returns: true
template:
mandatory: true

消费者服务配置

# 服务端口
server:
port: 8081
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,

 生产者

package com.wanshi.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
* @author whc
* @date 2022/5/23 18:50
*/

@Service
public class OrderService {

@Autowired
private RabbitTemplate rabbitTemplate;

public void makeOrder() {
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
String exchange_name = "fanout_order_exchange";
String routeingKey = "";
rabbitTemplate.convertAndSend(exchange_name, routeingKey, orderId);
}
}

 消费者交换机的声明与队列我们放在消费者端,因为消费者是先开启的,如果没有交换机和队列,则会报错!RabbitMQConfiguration

package com.wanshi.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author whc
* @date 2022/5/23 10:18
*/
@Configuration
public class RabbitMQConfiguration {

//1.声明注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}

//2.声明队列,sms.fanout.queue email.fanout.queue msg.fanout.queue
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}

@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}

@Bean
public Queue msgQueue() {
return new Queue("msg.fanout.queue", true);
}

//3.完成绑定关系(队列与交换机完成绑定关系)
@Bean
public Binding smsBind() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}

@Bean
public Binding emailBind() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}

@Bean
public Binding msgBind() {
return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
}
}

编写具体业务消费类FanoutEmailConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author whc
* @date 2022/5/23 18:53
*/
@RabbitListener(queues = "email.fanout.queue")
@Component
public class FanoutEmailConsumer {

@RabbitHandler
public void messageService(String message) {
System.out.println("fanout email ==>" + message);
}
}

FanoutMsgConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author whc
* @date 2022/5/23 18:55
*/
@RabbitListener(queues = "msg.fanout.queue")
@Component
public class FanoutMsgConsumer {

@RabbitHandler
public void messageService(String message) {
System.out.println("fanout msg ==>" + message);
}
}

FanoutSmsConsumer

package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author whc
* @date 2022/5/23 18:54
*/
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class FanoutSmsConsumer {

@RabbitHandler
public void messageService(String message) {
System.out.println("fanout sms ==> " + message);
}
}

编写完成!

五、测试消息的生产与消费

 启动消费者,查看RabbitMQ队列的情况启动消费者

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_16

查看RabbitMQweb管理界面绑定信息交换机

=

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_17

查看队列Queue

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_18

查看其它两台机器是否同步了数据15674

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_19

15675

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_20

 生产者投递消息,查看消费者消费情况

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_docker_21

成功消费数据!

 只生产消息,关闭消费者,查看消息同步情况

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群_spring_22

已成功同步消息~

以上就是【小王 Java】对 RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群简单的概述,搭建集群达到消息的高可用,高可靠,可以提高吞吐量,集群部署是一个很不错的选择,镜像队列集群,这个我们可以根据配置来进行同步规则,根据需求来定制化我们的规则!

如果这篇【文章】有帮助到你,希望可以给【小王 Java】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【小王 Java】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK