6

RabbitMQ实现订单超时案例

 2 years ago
source link: https://www.cnblogs.com/SimpleWu/p/16626595.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RabbitMQ实现订单超时案例

前言#

业务场景#

用戶在购买商品的时候通常会预购然后没付款,没付款的订单通常会被设置一个自动超时时间如30分钟后超时,所以我们要在订单到30分钟后自动将超时的订单取消。

JUC(DelayQueue)方案#

DelayQueue简介
  • DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。
  • DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,
    如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。
  • DelayQueue主要用于两个方面:- 缓存:清掉缓存中超时的缓存数据- 任务超时处理
  • DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。
  • DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口。
JUC DelayQueue实现订单超时案例代码 案例代码

定义订单超时对象:

/**
 * juc 定义延迟对象信息
 * @author wuwentao
 */
@Data
public class Order implements Delayed {
    public Order(String orderId,long second){
        this.orderId = orderId;
        second = second * 1000;
        this.timeout = System.currentTimeMillis() + second;
    }
    private String orderId; // 订单号
    private long timeout; // 具体的超时时间

    /**
     *  延迟任务会自动调用该方法如果是负数则说明对象到了时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return this.timeout - System.currentTimeMillis();
    }

    /**
     * 定义排序规则
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

定义DelayQueue端作为消费者:

/**
 * 延迟队列消费者
 */
@Slf4j
@Component
public class JavaDelayQueueConsumer {
    // 订单超时对象存储的定时队列
    private final DelayQueue<OrderTimeoutDelayed> delayQueue = new DelayQueue<>();
    // 为true的时候启动线程,全局只启动一次
    private final AtomicBoolean start = new AtomicBoolean(false);
    // 任务处理线程
    private Thread thread;
    /**
     * 将需要自动过期的订单放到队列
     * @param orderTimeoutDelayed
     */
    public void monitor(OrderTimeoutDelayed orderTimeoutDelayed){
        delayQueue.add(orderTimeoutDelayed);
    }

    /**
     * 启动过期订单处理
     */
    @PostConstruct
    public void start(){
        if(!start.getAndSet(true)){
            this.thread = new Thread(()->{
                while (true) {
                    try {
                        // 获取已超时的订单
                        OrderTimeoutDelayed take = delayQueue.take();
                        if(take != null){
                            log.info("JUC延迟队列订单号:[{}]已超时当前时间为:[{}]",take.getOrderId(), DateUtil.getCuurentDateStr());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            this.thread.start();
            log.info("已启动JUC延迟队列消费!");
        }
    }
}

定义入口请求用于向队列中添加需要自动超时的订单信息:

/**
 * JUC实现延迟队列
 * @author wuwentao
 */
@Slf4j
@RestController
@RequestMapping("java/dealy/queue")
@AllArgsConstructor
public class JavaDealyQueueProducer {
    private JavaDelayQueueConsumer javaDelayQueueConsumer;
    private final int defaultTimoutSecond = 10; // 过期秒数
    /**
     * 发送消息到JUC延迟队列中
     * @param message 消息内容
     */
    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "message") String message){
        // 监听订单10秒后过期
        javaDelayQueueConsumer.monitor(new OrderTimeoutDelayed(message,defaultTimoutSecond));
        log.info("当前时间为:[{}] 订单号为:[{}] 超时秒数:[{}]", DateUtil.getCuurentDateStr(),message,defaultTimoutSecond);
        return "OK";
    }
}

测试生成消息访问接口地址(每一秒访问次一个生成5个需要过期的订单):

http://localhost:8022/java/dealy/queue/sendMessage?message=100000001
http://localhost:8022/java/dealy/queue/sendMessage?message=100000002
http://localhost:8022/java/dealy/queue/sendMessage?message=100000003
http://localhost:8022/java/dealy/queue/sendMessage?message=100000004
http://localhost:8022/java/dealy/queue/sendMessage?message=100000005

控制台打印消费信息:

2022-08-24 15:55:14.435  INFO 18876 --- [nio-8022-exec-1] c.g.b.d.c.JavaDealyQueueProducer         : 当前时间为:[2022-08-24 15:55:14] 订单号为:[100000001] 超时秒数:[10]
2022-08-24 15:55:16.184  INFO 18876 --- [nio-8022-exec-2] c.g.b.d.c.JavaDealyQueueProducer         : 当前时间为:[2022-08-24 15:55:16] 订单号为:[100000002] 超时秒数:[10]
2022-08-24 15:55:17.626  INFO 18876 --- [nio-8022-exec-3] c.g.b.d.c.JavaDealyQueueProducer         : 当前时间为:[2022-08-24 15:55:17] 订单号为:[100000003] 超时秒数:[10]
2022-08-24 15:55:19.165  INFO 18876 --- [nio-8022-exec-4] c.g.b.d.c.JavaDealyQueueProducer         : 当前时间为:[2022-08-24 15:55:19] 订单号为:[100000004] 超时秒数:[10]
2022-08-24 15:55:20.811  INFO 18876 --- [nio-8022-exec-5] c.g.b.d.c.JavaDealyQueueProducer         : 当前时间为:[2022-08-24 15:55:20] 订单号为:[100000005] 超时秒数:[10]
2022-08-24 15:55:24.434  INFO 18876 --- [       Thread-8] c.g.b.d.java.JavaDelayQueueConsumer      : JUC延迟队列订单号:[100000001]已超时当前时间为:[2022-08-24 15:55:24]
2022-08-24 15:55:26.184  INFO 18876 --- [       Thread-8] c.g.b.d.java.JavaDelayQueueConsumer      : JUC延迟队列订单号:[100000002]已超时当前时间为:[2022-08-24 15:55:26]
2022-08-24 15:55:27.625  INFO 18876 --- [       Thread-8] c.g.b.d.java.JavaDelayQueueConsumer      : JUC延迟队列订单号:[100000003]已超时当前时间为:[2022-08-24 15:55:27]
2022-08-24 15:55:29.164  INFO 18876 --- [       Thread-8] c.g.b.d.java.JavaDelayQueueConsumer      : JUC延迟队列订单号:[100000004]已超时当前时间为:[2022-08-24 15:55:29]
2022-08-24 15:55:30.810  INFO 18876 --- [       Thread-8] c.g.b.d.java.JavaDelayQueueConsumer      : JUC延迟队列订单号:[100000005]已超时当前时间为:[2022-08-24 15:55:30]

Redis Key过期事件方案#

这里主要使用Redis Key过期事件来实现订单超时案例

Rabbit Key过期时间实现订单超时案例代码

Redis使用的时候将redis配置文件中的该属性从""修改为"Ex"

notify-keyspace-events "Ex"

定义Redis序列化与Key过期监听容器:

/**
 * Redis SpringBoot 配置
 * @author wuwentao
 */
@Configuration
@AllArgsConstructor
public class RedisConfiguration {
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * 模板方法序列化防止乱码
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }

    /**
     * 配置Redis消息监听容器
     * @return {@link:org.springframework.data.redis.listener.RedisMessageListenerContainer}
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }
}

定义Key过期监听处理:

/**
 * Redis过期Key监听
 */
@Slf4j
@Component
public class RedisKeyExpiredListener extends KeyExpirationEventMessageListener {
    public RedisKeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String key = message.toString();
        if(key.startsWith(RedisKeyExpiredController.ORDER_TIMEOUT_PREFIX)){
            String orderId = key.replace(RedisKeyExpiredController.ORDER_TIMEOUT_PREFIX,"");
            log.info("过期时间:[{}] Redis-key:[{}] 订单号:[{}]", DateUtil.getCuurentDateStr(),key,orderId);
        }
    }
}

定义入口请求用于向Redis中保存需要过期的订单Key:

@Slf4j
@RestController
@RequestMapping("redis/expired/key")
@AllArgsConstructor
public class RedisKeyExpiredController {
    private RedisTemplate redisTemplate;
    // Redis需要需要处理的Key前缀
    public static final String ORDER_TIMEOUT_PREFIX = "ORDERTIMEOUT-";
    // 过期秒数
    private final int defaultTimoutSecond = 10;

    /**
     * Redis保存订单号与过期事件
     * @param orderId 订单编号
     */
    @GetMapping("/send")
    public String send(@RequestParam(value = "orderId") String orderId){
        ValueOperations valueOperations = redisTemplate.opsForValue();
        String key = ORDER_TIMEOUT_PREFIX + orderId;
        String value = orderId;
        long timeout = defaultTimoutSecond;
        TimeUnit seconds = TimeUnit.SECONDS;
        valueOperations.set(key,value,timeout, seconds);
        log.info("当前时间:[{}] 订单编号:[{}] Redis-Key:[{}] 超时秒数:[{}] ", DateUtil.getCuurentDateStr(),value,key,timeout);
        return "OK";
    }
}

测试生成消息访问接口地址(每一秒访问次一个生成5个需要过期的订单):

http://localhost:8022/redis/expired/key/send?orderId=100000001
http://localhost:8022/redis/expired/key/send?orderId=100000002
http://localhost:8022/redis/expired/key/send?orderId=100000003
http://localhost:8022/redis/expired/key/send?orderId=100000004
http://localhost:8022/redis/expired/key/send?orderId=100000005

控制台打印消费信息:

2022-08-24 16:26:49.626  INFO 20028 --- [nio-8022-exec-1] c.g.b.d.c.RedisKeyExpiredController      : 当前时间:[2022-08-24 16:26:49] 订单编号:[100000001] Redis-Key:[ORDERTIMEOUT-100000001] 超时秒数:[10] 
2022-08-24 16:26:53.124  INFO 20028 --- [nio-8022-exec-4] c.g.b.d.c.RedisKeyExpiredController      : 当前时间:[2022-08-24 16:26:53] 订单编号:[100000002] Redis-Key:[ORDERTIMEOUT-100000002] 超时秒数:[10] 
2022-08-24 16:26:55.468  INFO 20028 --- [nio-8022-exec-5] c.g.b.d.c.RedisKeyExpiredController      : 当前时间:[2022-08-24 16:26:55] 订单编号:[100000003] Redis-Key:[ORDERTIMEOUT-100000003] 超时秒数:[10] 
2022-08-24 16:26:57.717  INFO 20028 --- [nio-8022-exec-6] c.g.b.d.c.RedisKeyExpiredController      : 当前时间:[2022-08-24 16:26:57] 订单编号:[100000004] Redis-Key:[ORDERTIMEOUT-100000004] 超时秒数:[10] 
2022-08-24 16:26:59.703  INFO 20028 --- [nio-8022-exec-7] c.g.b.d.c.RedisKeyExpiredController      : 当前时间:[2022-08-24 16:26:59] 订单编号:[100000005] Redis-Key:[ORDERTIMEOUT-100000005] 超时秒数:[10] 
2022-08-24 16:26:59.885  INFO 20028 --- [enerContainer-2] c.g.b.d.redis.RedisKeyExpiredListener    : 过期时间:[2022-08-24 16:26:59] Redis-key:[ORDERTIMEOUT-100000001] 订单号:[100000001]
2022-08-24 16:27:03.210  INFO 20028 --- [enerContainer-3] c.g.b.d.redis.RedisKeyExpiredListener    : 过期时间:[2022-08-24 16:27:03] Redis-key:[ORDERTIMEOUT-100000002] 订单号:[100000002]
2022-08-24 16:27:05.537  INFO 20028 --- [enerContainer-4] c.g.b.d.redis.RedisKeyExpiredListener    : 过期时间:[2022-08-24 16:27:05] Redis-key:[ORDERTIMEOUT-100000003] 订单号:[100000003]
2022-08-24 16:27:07.771  INFO 20028 --- [enerContainer-5] c.g.b.d.redis.RedisKeyExpiredListener    : 过期时间:[2022-08-24 16:27:07] Redis-key:[ORDERTIMEOUT-100000004] 订单号:[100000004]
2022-08-24 16:27:09.780  INFO 20028 --- [enerContainer-6] c.g.b.d.redis.RedisKeyExpiredListener    : 过期时间:[2022-08-24 16:27:09] Redis-key:[ORDERTIMEOUT-100000005] 订单号:[100000005]

JUC与Redis的不足#

  1. JUC是存内存操作一旦系统宕机数据将全部丢失。
  2. JUC因为是纯内存操作所以不支持集群。
  3. Redis Key过期前程序突然宕机将造成数据丢失。
  4. 应用程序在集群环境是多个程序都能够监听到这个过期的Key,如果处理不好可能导致重复消费。
  5. Redis缓存所有的Key都会被监听需要自己处理Key去匹配不够灵活

为什么使用RabbitMQ来实现?#

单独看这点就能够解决前者的不足: 可靠性:支持持久化,传输确认,发布确认等保证了MQ的可靠性。

更多特性请参考: https://www.cnblogs.com/SimpleWu/p/16618662.html

RabbitMQ死信队列方案#

死信队列实现订单超时案例代码

DLX(dead-letter-exchange),死信队列也是一般的队列,当消息变成死信时,消息会投递到死信队列中,经过死信队列进行消费的一种形式,对应的交换机叫死信交换机DLX。

死信队列配置信息定义:

public interface DeadLetterQueueConfig {
    // 订单超时处理队列
    public static final String ORDER_TIMEOUT_QUEUE = "order.timeout.queue";
    // 订单超时处理队列交换机
    public static final String ORDER_TIMEOUT_EXCHANGE = "order.timeout.exchange";
    // 订单超时处理RoutingKey
    public static final String ORDER_TIMEOUT_ROUTING_KEY = "order.timeout.routing.key";
    // 死信队列
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    // 死信队列交换机
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    // 死信队列RoutingKey
    public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routing.key";
    // 死信队列超时时间(10秒)
    public static final String X_MESSAGE_TTL ="10000";
}

消费者定义,死信队列消费者定义与订单超时消费队列定义:

@Slf4j
@Component
public class DeadLetterConsumerAnnotatedEdition {

    /**
     * 该队列的消息为死信始终消费不会成功等到到了超时时间则会将消息投递到x-dead-letter-exchange交换机中由绑定的队列来进行处理
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = DeadLetterQueueConfig.DEAD_LETTER_QUEUE, arguments =
                    {@Argument(name = "x-dead-letter-exchange", value = DeadLetterQueueConfig.ORDER_TIMEOUT_EXCHANGE),
                            @Argument(name = "x-dead-letter-routing-key", value = DeadLetterQueueConfig.ORDER_TIMEOUT_ROUTING_KEY),
                            @Argument(name = "x-message-ttl", value = DeadLetterQueueConfig.X_MESSAGE_TTL, type = "java.lang.Long")
                            // ,@Argument(name = "x-max-length",value = "5",type = "java.lang.Integer")队列最大长度
                    }),//可以指定多种属性
                    exchange = @Exchange(value = DeadLetterQueueConfig.DEAD_LETTER_EXCHANGE),
                    key = {DeadLetterQueueConfig.DEAD_LETTER_ROUTING_KEY}
            )
    })
    @RabbitHandler
    public void deadLetterConsumer(Message message, Channel channel) throws Exception {
        /*
         * deliveryTag:该消息的index
         * multiple: ture确认本条消息以及之前没有确认的消息(批量),false仅确认本条消息
         * requeue: true该条消息重新返回MQ queue,MQ broker将会重新发送该条消息
         */
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }

    /**
     * 处理死信队列超时的订单
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = DeadLetterQueueConfig.ORDER_TIMEOUT_EXCHANGE, durable = "true", type = "direct"),
            value = @Queue(value = DeadLetterQueueConfig.ORDER_TIMEOUT_QUEUE, durable = "true"),
            key = DeadLetterQueueConfig.ORDER_TIMEOUT_ROUTING_KEY
    ))
    public void canleOrder(String context, Message message, Channel channel) throws IOException {
        log.info("当前时间:{} 订单取消订单号:{}", DateUtil.getCuurentDateStr(),context);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//仅确认本条消息
    }

}

定义入口请求用于向死信队列投递消息:

@Slf4j
@RestController
@RequestMapping("rabbit/deadletter/queue")
@AllArgsConstructor
public class RabbitDeadLetterQueueProducer {
    private RabbitTemplate rabbitTemplate;
    /**
     * 投递订单到死信队列中
     * @param orderId 订单ID
     */
    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "orderId") String orderId){
        rabbitTemplate.convertAndSend(DeadLetterQueueConfig.DEAD_LETTER_EXCHANGE,DeadLetterQueueConfig.DEAD_LETTER_ROUTING_KEY,orderId);
        log.info("当前时间:{} 订单号:{}", DateUtil.getCuurentDateStr(),orderId);
        return "OK";
    }
}

测试生成消息访问接口地址(每一秒访问次一个生成5个需要过期的订单):

http://localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000001
http://localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000002
http://localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000003
http://localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000004
http://localhost:8022/rabbit/deadletter/queue/sendMessage?orderId=100000005

控制台打印消费信息:

2022-08-25 16:41:25.150  INFO 19956 --- [nio-8022-exec-9] c.g.b.d.c.RabbitDeadLetterQueueProducer  : 当前时间:2022-08-25 16:41:25 订单号:100000001
2022-08-25 16:41:26.824  INFO 19956 --- [io-8022-exec-10] c.g.b.d.c.RabbitDeadLetterQueueProducer  : 当前时间:2022-08-25 16:41:26 订单号:100000002
2022-08-25 16:41:28.689  INFO 19956 --- [nio-8022-exec-1] c.g.b.d.c.RabbitDeadLetterQueueProducer  : 当前时间:2022-08-25 16:41:28 订单号:100000003
2022-08-25 16:41:30.453  INFO 19956 --- [nio-8022-exec-2] c.g.b.d.c.RabbitDeadLetterQueueProducer  : 当前时间:2022-08-25 16:41:30 订单号:100000004
2022-08-25 16:41:33.256  INFO 19956 --- [nio-8022-exec-3] c.g.b.d.c.RabbitDeadLetterQueueProducer  : 当前时间:2022-08-25 16:41:33 订单号:100000005
2022-08-25 16:41:35.153  INFO 19956 --- [ntContainer#1-5] r.t.c.DeadLetterConsumerAnnotatedEdition : 当前时间:2022-08-25 16:41:35 订单取消订单号:100000001
2022-08-25 16:41:36.825  INFO 19956 --- [ntContainer#1-4] r.t.c.DeadLetterConsumerAnnotatedEdition : 当前时间:2022-08-25 16:41:36 订单取消订单号:100000002
2022-08-25 16:41:38.699  INFO 19956 --- [ntContainer#1-6] r.t.c.DeadLetterConsumerAnnotatedEdition : 当前时间:2022-08-25 16:41:38 订单取消订单号:100000003
2022-08-25 16:41:40.458  INFO 19956 --- [ntContainer#1-7] r.t.c.DeadLetterConsumerAnnotatedEdition : 当前时间:2022-08-25 16:41:40 订单取消订单号:100000004
2022-08-25 16:41:43.267  INFO 19956 --- [ntContainer#1-3] r.t.c.DeadLetterConsumerAnnotatedEdition : 当前时间:2022-08-25 16:41:43 订单取消订单号:100000005

延迟消息插件方案#

延迟消息插件安装

我这里安装的RabbitMQ版本为3.8.8,这里我在发行版本中下载版本为: rabbitmq-delayed-message-exchange v3.8.x

github地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
同版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

将下载好的插件(rabbitmq_delayed_message_exchange-3.8.0.ez)复制到plugins目录下(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\plugins);
进入到sbin目录(C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin)打开cmd窗口执行命令开启插件:

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.8\sbin>rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@LX-P1DMPLUV:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LX-P1DMPLUV...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

继续输入命令重启RabbitMQ:rabbitmq-service restart

延迟消息实现订单超时案例代码

延迟消息配置信息定义:

public class DelayedQueueConfig {
    // 延迟队列
    public static final String DELAYED_QUEUE = "delayed.queue";
    // 延迟队列交换机
    public static final String DELAYED_EXCHANGE = "delayed.exchange";
    // 延迟队列路由KEY
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
    // 订单过期时间
    public static final String ORDER_OUTIME ="10000";

}

消费者定义,延迟消息订单超时消费队列定义:

@Slf4j
@Component
public class DelayedConsumerAnnotatedEdition {
    /**
     * 延迟队列交换机类型必须为:x-delayed-message
     * x-delayed-type 必须设置否则将会报错
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = DelayedQueueConfig.DELAYED_QUEUE),
                    exchange = @Exchange(value = DelayedQueueConfig.DELAYED_EXCHANGE,type = "x-delayed-message",
                    arguments = {@Argument(name = "x-delayed-type", value = ExchangeTypes.DIRECT)}),
                    key = {DelayedQueueConfig.DELAYED_ROUTING_KEY}
            )
    })
    @RabbitHandler
    public void delayedConsumer(String context, Message message, Channel channel) throws Exception {
        log.info("当前时间:{} 订单取消订单号:{}", DateUtil.getCuurentDateStr(),context);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//仅确认本条消息
    }
}

定义入口请求用于向延迟消息队列投递消息:

@Slf4j
@RestController
@RequestMapping("rabbit/dealyed/queue")
@AllArgsConstructor
public class RabbitDelayedQueueProducer {
    private RabbitTemplate rabbitTemplate;
    /**
     * 投递订单到延迟消息队列中
     * @param orderId 订单ID
     */
    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "orderId") String orderId){
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE, DelayedQueueConfig.DELAYED_ROUTING_KEY, orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息超时时间
                message.getMessageProperties().setHeader("x-delay", DelayedQueueConfig.ORDER_OUTIME);
                return message;
            }
        });
        log.info("当前时间:{} 订单号:{}", DateUtil.getCuurentDateStr(),orderId);
        return "OK";
    }
}

测试生成消息访问接口地址(每一秒访问次一个生成5个需要过期的订单):

http://localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000001
http://localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000002
http://localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000003
http://localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000004
http://localhost:8022/rabbit/dealyed/queue/sendMessage?orderId=100000005

控制台打印消费信息:

2022-08-25 17:24:36.324  INFO 17212 --- [nio-8022-exec-1] c.g.b.d.c.RabbitDelayedQueueProducer     : 当前时间:2022-08-25 17:24:36 订单号:100000001
2022-08-25 17:24:38.011  INFO 17212 --- [nio-8022-exec-2] c.g.b.d.c.RabbitDelayedQueueProducer     : 当前时间:2022-08-25 17:24:38 订单号:100000002
2022-08-25 17:24:39.606  INFO 17212 --- [nio-8022-exec-3] c.g.b.d.c.RabbitDelayedQueueProducer     : 当前时间:2022-08-25 17:24:39 订单号:100000003
2022-08-25 17:24:41.109  INFO 17212 --- [nio-8022-exec-4] c.g.b.d.c.RabbitDelayedQueueProducer     : 当前时间:2022-08-25 17:24:41 订单号:100000004
2022-08-25 17:24:42.547  INFO 17212 --- [nio-8022-exec-5] c.g.b.d.c.RabbitDelayedQueueProducer     : 当前时间:2022-08-25 17:24:42 订单号:100000005
2022-08-25 17:24:46.395  INFO 17212 --- [ntContainer#2-1] .d.r.d.c.DelayedConsumerAnnotatedEdition : 当前时间:2022-08-25 17:24:46 订单取消订单号:100000001
2022-08-25 17:24:48.037  INFO 17212 --- [tContainer#2-10] .d.r.d.c.DelayedConsumerAnnotatedEdition : 当前时间:2022-08-25 17:24:48 订单取消订单号:100000002
2022-08-25 17:24:49.626  INFO 17212 --- [ntContainer#2-9] .d.r.d.c.DelayedConsumerAnnotatedEdition : 当前时间:2022-08-25 17:24:49 订单取消订单号:100000003
2022-08-25 17:24:51.127  INFO 17212 --- [ntContainer#2-7] .d.r.d.c.DelayedConsumerAnnotatedEdition : 当前时间:2022-08-25 17:24:51 订单取消订单号:100000004
2022-08-25 17:24:52.549  INFO 17212 --- [ntContainer#2-8] .d.r.d.c.DelayedConsumerAnnotatedEdition : 当前时间:2022-08-25 17:24:52 订单取消订单号:100000005

案例源代码#

https://gitee.com/SimpleWu/blogs-examples/tree/master/rabbitmq-delay-queue-case


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK