3

14. Springboot集成RabbitMQ

 1 month ago
source link: https://blog.51cto.com/u_15423953/10681228
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

目录

2、什么是RabbitMQ

3、安装RabbitMQ

4、Springboot集成RabbitMQ

4.1、添加依赖

4.2、添加配置

4.3、添加controller,作为生产者

4.4、设置生产者消息确认CallBack

4.5、添加Consumer,作为消费者

4.6、启动程序,访问


消息队列(Message Queue,简称 MQ)是一种异步的消息传递中间件,它解耦了应用程序之间的通信。应用程序可以将消息发送到队列,而无需知道谁会接收这些消息。接收应用程序可以从队列中检索消息,而无需知道谁发送了这些消息。消息队列是一种重要的中间件,它可以帮助应用程序之间进行异步、可靠、可扩展的通信。常见的消息队列中间件有ActiveMQ,RabbitMQ,Kafka......今天我们就来介绍RabbitMQ。

2、什么是RabbitMQ

RabbitMQ 是一个开源的消息队列服务器,它实现了 AMQP (高级消息队列协议) 标准。AMQP 是一种应用层协议,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 的主要特点包括:

  • 高性能:RabbitMQ 能够处理大量的消息,并提供低延迟的性能。
  • 可靠性:RabbitMQ 提供持久化消息存储,确保消息不会丢失。
  • 可扩展性:RabbitMQ 可以轻松扩展以满足不断增长的需求。
  • 灵活性:RabbitMQ 支持多种编程语言和客户端,并提供丰富的功能和配置选项。

RabbitMQ 的常见应用场景包括:

  • 分布式系统:RabbitMQ 可以用于在分布式系统中进行异步通信。
  • 异步处理:RabbitMQ 可以用于异步处理任务,提高系统的性能和效率。
  • 消息队列:RabbitMQ 可以用于实现消息队列,例如任务队列、发布/订阅队列等。
  • 消息通知:RabbitMQ 可以用于发送消息通知,例如电子邮件或短信。

3、安装RabbitMQ

由于RabbitMQ是一个由 Erlang 语言开发的 AMQP 的开源实现。所以在安装RabbitMQ前需要先安装Erlang环境。

Erlang下载地址: Downloads - Erlang/OTP

RabbitMQ下载地址: Installing RabbitMQ | RabbitMQ

先安装Erlang,在安装RabbitMQ。安装工程相对简单,无脑下一步即可。

安装完RabbitMQ后,打开cmd窗口,进入RabbitMQ的安装目录的sbin下,我的目录是:

D:\RabbitMQ Server\rabbitmq_server-3.13.0\sbin

然后输入以下命令安装一下插件:

rabbitmq-plugins enable rabbitmq_management

提示以下这个就是安装成功。

14. Springboot集成RabbitMQ_spring boot

验证RabbitMQ是否安装成功,输入以下命令:

rabbitmqctl status
14. Springboot集成RabbitMQ_java-rabbitmq_02

这时候,直接访问http://127.0.0.1:15672就可以看到RabbitMQ的管理页面了,RabbitMQ默认端口为15672,默认的管理页面账号密码均为guest。

登录后,就可以看到一个初始的管理界面:

14. Springboot集成RabbitMQ_消息队列_03

4、Springboot集成RabbitMQ

4.1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-parent</artifactId>
       <version>3.2.3</version>
       <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot-rabbitmq</description>
    <properties>
       <java.version>17</java.version>
    </properties>
    <dependencies>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-devtools</artifactId>
          <scope>runtime</scope>
          <optional>true</optional>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-configuration-processor</artifactId>
          <optional>true</optional>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
          <groupId>cn.hutool</groupId>
          <artifactId>hutool-all</artifactId>
          <version>5.8.24</version>
       </dependency>
    </dependencies>

    <build>
       <plugins>
          <plugin>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
       </plugins>
    </build>
</project>

4.2、添加配置

# rabbitmq连接配置信息
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 确保消息在未被队列接收时返回
spring.rabbitmq.publisher-returns=true
# 发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated

4.3、添加controller,作为生产者

新建controller,用于发送消息。

package com.example.springbootrabbitmq.controller;

import com.example.springbootrabbitmq.config.MqProducerCallBack;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("push/message")
public class PushMessageController {

    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private MqProducerCallBack mqProducerCallBack;


    @GetMapping("test")
    public String sendMessage() {
        // correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
        CorrelationData correlationData = new CorrelationData("id_" + System.currentTimeMillis());
        // 消息确认和返回回调
        rabbitTemplate.setConfirmCallback(mqProducerCallBack);
        rabbitTemplate.setReturnsCallback(mqProducerCallBack);
        // 消息发送
        rabbitTemplate.convertAndSend("my-queue", "hello world", message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        return "publisher success...";
    }
}

4.4、设置生产者消息确认CallBack

package com.example.springbootrabbitmq.config;

import cn.hutool.json.JSONUtil;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MqProducerCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    /**
     * correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
     * ack:消息投递到broker 的状态,true成功,false失败。
     * cause:投递失败的原因。
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.err.println("消息ID=" + correlationData.getId() + "投递失败,失败原因:" + cause);
        } else {
            System.out.println("消息投递收到确认,correlationData=" + correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("返回消息结果:" + JSONUtil.toJsonStr(returnedMessage));
    }

}

4.5、添加Consumer,作为消费者

package com.example.springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class PushMessageConsumer {


    /**
     * basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
     * void basicAck(long deliveryTag, boolean multiple)
     * deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
     * multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
     * */
    @RabbitListener(queuesToDeclare = @Queue(value = "my-queue"))
    @RabbitHandler
    public void consume(String msg, Channel channel, Message message) throws IOException {
        try {
            System.out.println("消费者收到消息:" + msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("deliveryTag:" + message.getMessageProperties().getDeliveryTag());
            System.out.println("redelivered:" + message.getMessageProperties().getRedelivered());
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.err.println("消息已重复处理失败,拒绝再次接收!");
                /**
                 * 拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列
                 * basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
                 * deliveryTag:表示消息投递序号。
                 * requeue:值为 true 消息将重新入队列。
                 */
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息即将再次返回队列处理!");
                /**
                 * requeue为是否重新回到队列,true重新入队
                 * deliveryTag:表示消息投递序号。
                 * multiple:是否批量确认。
                 * requeue:值为 true 消息将重新入队列。
                 */
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }

}

4.6、启动程序,访问

浏览器访问:http://localhost:8080/push/message/test 模拟消息进行推送。

14. Springboot集成RabbitMQ_消息队列_04

查看控制台,发现消费者正常打印出了消费信息。

14. Springboot集成RabbitMQ_spring_05

打开RabbitMQ管理控制台,可以发现我们的消息队列my-queue信息。

14. Springboot集成RabbitMQ_消息队列_06

既可以查看消息队列的装填,消息投递情况等。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK