4

RocketMQ—RocketMQ消费重试和死信消息 - 随机的未知

 7 months ago
source link: https://www.cnblogs.com/nicaicai/p/18018420
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RocketMQ—RocketMQ消费重试和死信消息

生产者重试

设置重试的代码如下

// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

一般情况下,我们不会在生产者方进行重试。

消费者重试

消费者在消费消息的过程中,下方三种情况会进行重试:

  • 业务报错了
  • 返回null 返回
  • 返回RECONSUME_LATER

代码如下:

/**
     * 重试的时间间隔
     * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 默认重试16次
     * --------------
     * 重试的次数一般 5次
     * @throws Exception
     */
@Test
public void retryConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            System.out.println(messageExt.getReconsumeTimes());
            System.out.println(new String(messageExt.getBody()));
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

消息默认重试16次:

能否自定义重试次数

设置重试次数的代码如下:

// 设定重试次数
consumer.setMaxReconsumeTimes(2);

消息的构成如下:

消息构成

如果使用了上述代码,就会为消息头设置重试次数。

如果消息重试了最大次数还是失败怎么办

最大次数:如果没有设置最大次数,默认情况下,并发模式是16次,顺序模式是int的最大值。

如果重试了最大次数还是失败,就会变成死信消息,会被放进一个死信主题中去,这个死信主题的名字是有规律的,这个主题是

%DLQ%消费者组的名称

当消息处理失败的时候该如何正确的处理

  1. 可以监听死信消息,给管理员发送邮件或者短信通知,但是如果有多个死信消息,就要写多个监听器;
  2. 可以手动判断重试次数,如果大于某个次数,就记录下来,就不重试了,发送邮件或者短信通知。
try {
    handleDb();
} catch (Exception e) {
    // 重试
    int reconsumeTimes = messageExt.getReconsumeTimes();
    if (reconsumeTimes >= MAX_TIMES) {
        // 不要重试了
        System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK