18

使用SpringCloud Stream结合rabbitMQ实现消息消费失败重发机制

 4 years ago
source link: http://www.cnblogs.com/mingsay/p/13171586.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言:实际项目中经常遇到消息消费失败了,要进行消息的重发。比如支付消息消费失败后,要分不同时间段进行N次的消息重发提醒。

本文模拟场景

  1. 当金额少于100时,消息消费成功
  2. 当金额大于100,小于200时,会进行3次重发,第一次1秒;第二次2秒;第三次3秒。
  3. 当金额大于200时,消息消费失败,会进行5次重发,第一次1秒;第二次2秒;第三次3秒;第四次4秒;第五次5秒。重试五次后,消息自动进入死信队列,在死信队列存活60秒后消失。

代码实例

特别注意代码与配置文件中的注释,各个使用说明都已经详细写在配置文件中

pom包引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cloudstream</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR5</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- ①关键配置:引入stream-rabbit 依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!-- ②关键配置:由于stream是基于spring-cloud的,所以这里要引入 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

配置application.yml文件

注意各个配置的缩进格式,别搞错了

server:
  port: 8081
spring:
  application:
    name: stream-demo
  #rabbitmq连接配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: 123456
  cloud:
    stream:
      bindings:
        #消息生产者,与DelayDemoTopic接口中的DELAY_DEMO_PRODUCER变量值一致
        delay-demo-producer:
          #①定义交换机名
          destination: demo-delay-queue
        #消息消费者,与DelayDemoTopic接口中的DELAY_DEMO_CONSUMER变量值一致
        delay-demo-consumer:
          #定义交换机名,与①一致,就可以使发送和消费都指向一个队列
          destination: demo-delay-queue
          #分组,这个配置可以开启消息持久化、可以解决在集群环境下重复消费的问题。
          #比如A、B两台服务器集群,如果没有这个配置,则A、B都能收到同样的消息,如果有该配置则只有其中一台会收到消息
          group: delay-consumer-group
          consumer:
            #最大重试次数,默认为3。不使用默认的,这里定义为1,由我们程序控制发送时间和次数
            maxAttempts: 1
      rabbit:
        bindings:
          #消息生产者,与DelayDemoTopic接口中的DELAY_DEMO_PRODUCER变量值一致
          delay-demo-producer:
            producer:
              #②申明为延迟队列
              delayedExchange: true
          #消息消费者,与DelayDemoTopic接口中的DELAY_DEMO_CONSUMER变量值一致
          delay-demo-consumer:
            consumer:
              #申明为延迟队列,与②的配置的成对出现的
              delayedExchange: true
              #开启死信队列
              autoBindDlq: true
              #死信队列中消息的存活时间
              dlqTtl: 60000

定义队列通道

  1. 定义通道
/**
 * 定义延迟消息通道
 */
public interface DelayDemoTopic {
    /**
     * 生产者,与yml文件配置对应
     */
    String DELAY_DEMO_PRODUCER = "delay-demo-producer";
    /**
     * 消费者,与yml文件配置对应
     */
    String DELAY_DEMO_CONSUMER = "delay-demo-consumer";

    /**
     * 定义消息消费者,在@StreamListener监听消息的时候用到
     * @return
     */
    @Input(DELAY_DEMO_CONSUMER)
    SubscribableChannel delayDemoConsumer();

    /**
     * 定义消息发送者,在发送消息的时候用到
     * @return
     */
    @Output(DELAY_DEMO_PRODUCER)
    MessageChannel delayDemoProducer();
}
  1. 绑定通道
/**
 * 配置消息的binding
 *
 */
@EnableBinding(value = {DelayDemoTopic.class})
@Component
public class MessageConfig {

}

消息发送模拟

/**
 * 发送消息
 */
@RestController
public class SendMessageController {
    @Autowired
    DelayDemoTopic delayDemoTopic;

    @GetMapping("send")
    public Boolean sendMessage(BigDecimal money) throws JsonProcessingException {

        Message<BigDecimal> message = MessageBuilder.withPayload(money)
                //设置消息的延迟时间,首次发送,不设置延迟时间,直接发送
                .setHeader(DelayConstant.X_DELAY_HEADER,0)
                //设置消息已经重试的次数,首次发送,设置为0
                .setHeader(DelayConstant.X_RETRIES_HEADER,0)
                .build();
        return delayDemoTopic.delayDemoProducer().send(message);
    }
}

消息监听处理

@Component
@Slf4j
public class DelayDemoTopicListener {
    @Autowired
    DelayDemoTopic delayDemoTopic;

    /**
     * 监听延迟消息通道中的消息
     * @param message
     */
    @StreamListener(value = DelayDemoTopic.DELAY_DEMO_CONSUMER)
    public void listener(Message<BigDecimal> message) {
        //获取重试次数
        int retries = (int)message.getHeaders().get(DelayConstant.X_RETRIES_HEADER);
        //获取消息内容
        BigDecimal money = message.getPayload();
        try {
            String now = DateUtils.formatDate(new Date(),"yyyy-MM-dd HH:mm:ss");
            //模拟:如果金额大于200,则消息无法消费成功;金额如果大于100,则重试3次;如果金额小于100,直接消费成功
            if (money.compareTo(new BigDecimal(200)) == 1){
                throw new RuntimeException(now+":金额超出200,无法交易。");
            }else if (money.compareTo(new BigDecimal(100)) == 1 && retries <= 3) {
                if (retries == 0) {
                    throw new RuntimeException(now+":金额超出100,消费失败,将进入重试。");
                }else {
                    throw new RuntimeException(now+":金额超出100,当前第" + retries + "次重试。");
                }
            }else {
                log.info("消息消费成功!");
            }
        }catch (Exception e) {
            log.error(e.getMessage());
            if (retries < DelayConstant.X_RETRIES_TOTAL){
                //将消息重新塞入队列
                MessageBuilder<BigDecimal> messageBuilder = MessageBuilder.fromMessage(message)
                        //设置消息的延迟时间
                        .setHeader(DelayConstant.X_DELAY_HEADER,DelayConstant.ruleMap.get(retries + 1))
                        //设置消息已经重试的次数
                        .setHeader(DelayConstant.X_RETRIES_HEADER,retries + 1);
                Message<BigDecimal> reMessage = messageBuilder.build();
                //将消息重新发送到延迟队列中
                delayDemoTopic.delayDemoProducer().send(reMessage);
            }else {
                //超过重试次数,做相关处理(比如保存数据库等操作),如果抛出异常,则会自动进入死信队列
                throw new RuntimeException("超过最大重试次数:" + DelayConstant.X_RETRIES_TOTAL);
            }
        }
    }
}

规则定义

目前写在一个常量类里,实际项目中,通常会配置在配置文件中

public class DelayConstant {
    /**
     * 定义当前重试次数
     */
    public static final String X_RETRIES_HEADER = "x-retries";
    /**
     * 定义延迟消息,固定值,该配置放到消息的header中,会开启延迟队列
     */
    public static final String X_DELAY_HEADER = "x-delay";

    /**
     * 定义最多重试次数
     */
    public static final Integer X_RETRIES_TOTAL = 5;

    /**
     * 定义重试规则,毫秒为单位
     */
    public static final Map<Integer,Integer> ruleMap = new HashMap(){{
        put(1,1000);
        put(2,2000);
        put(3,3000);
        put(4,4000);
        put(5,5000);
    }};
}

测试

经过以上配置和实现就可完成模拟的重发场景。

  • 浏览器中输入 http://127.0.0.1:8081/send?money=10 ,可以看到控制台中输出:
消息消费成功!
  • 浏览器中输入 http://127.0.0.1:8081/send?money=110 ,可以看到控制台中输出:
2020-06-20 10:59:42:金额超出100,消费失败,将进入重试。
2020-06-20 10:59:43:金额超出100,当前第1次重试。
2020-06-20 10:59:45:金额超出100,当前第2次重试。
2020-06-20 10:59:48:金额超出100,当前第3次重试。
消息消费成功!
  • http://127.0.0.1:8081/send?money=110

    浏览器中输入 http://127.0.0.1:8081/send?money=110 ,可以看到控制台中输出:

    fiyUN3z.png!web

注意事项

由于本文用到了延迟队列,需要在rabbitMQ中安装延迟插件,具体安装方式,可以查看:延迟队列安装参考

源码获取

以上示例都可以通过 我的GitHub 获取完整的代码.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK