4

SpringBoot使用Redis实现延时队列

 8 months ago
source link: https://blog.51cto.com/u_13312531/9191630
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

SpringBoot使用Redis实现延时队列

精选 原创

我们在日常开发中有可能会使用到延时队列,比如商城下单如果超过30分钟没有支付的话就关闭订单等情况。Redis做为一款高性能的NoSQL数据库,具备快速读写、高并发、数据持久化等特点,非常适合用于实现延迟队列。Redis提供了丰富的数据结构,其中利用Redis的ZSet数据结构就可以实现一个简单的延迟队列。

二、SpringBoot集成

1.添加依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

2.Redis配置

spring.redis.host=127.0.0.1
spring.redis.port=6379
spring.redis.database=0

3.订单实体类

package com.example.nettydemo.bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author qx
 * @date 2023/12/25
 * @des 订单实体类
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private Long id;

    /**
     * 购买的产品
     */
    private String product;

    /**
     * 订单金额
     */
    private Double money;

    /**
     * 下单时间
     */
    private Long orderTime;

    /**
     * 过期时间
     */
    private Long expireTime;


}

4.创建延时队列类

package com.example.nettydemo.redis;

import com.example.nettydemo.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

/**
 * @author qx
 * @date 2023/12/25
 * @des 延时队列
 */
@Component
@Slf4j
public class DelayQueue {

    private static final String KEY = "delay_queue";

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 添加消息到延时队列
     *
     * @param order
     */
    public void put(Order order) {
        redisTemplate.opsForZSet().add(KEY, order, order.getExpireTime());
    }

    /**
     * 删除延时队列中的消息
     *
     * @param order
     */
    public void remove(Order order) {
        redisTemplate.opsForZSet().remove(KEY, order);
    }

    /**
     * 获取延时队列中过期的订单
     *
     * @return
     */
    public List<Order> getExpiredOrders() {
        long currentTime = System.currentTimeMillis();
        Set orders = redisTemplate.opsForZSet().rangeByScore(KEY, 0, currentTime);
        if (CollectionUtils.isEmpty(orders)) {
            return Collections.emptyList();
        }
        List<Order> orderList = new ArrayList<>();
        for (Object order : orders) {
            orderList.add((Order) order);
        }
        return orderList;
    }
}

5.创建定时消息处理器

我们创建一个定时消息处理器,间隔1秒查询延时队列是否有到期的任务,如果有到期的任务就进行消息处理,反之继续轮询。

package com.example.nettydemo.redis;

import cn.hutool.core.date.DateUtil;
import com.example.nettydemo.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;

/**
 * @author qx
 * @date 2023/12/25
 * @des 消息处理器
 */
@Component
@Slf4j
@EnableScheduling
public class DelayMessageHandler {

    @Autowired
    private DelayQueue delayQueue;

    @Scheduled(fixedDelay = 1000)
    public void handlerExpiredOrder() {
        String currentTime = DateUtil.formatTime(new Date());
        // 读取延时队列中过期的数据
        List<Order> expiredOrders = delayQueue.getExpiredOrders();
        if (!CollectionUtils.isEmpty(expiredOrders)) {
            for (Order expiredOrder : expiredOrders) {
                log.info(currentTime + "处理订单:{}", expiredOrder);
                // 删除延时队列中的消息
                delayQueue.remove(expiredOrder);
            }
        }
    }
}

6.创建控制层测试

package com.example.nettydemo.controller;

import com.example.nettydemo.bean.Order;
import com.example.nettydemo.redis.DelayQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qx
 * @date 2023/12/25
 * @des
 */
@RestController
@Slf4j
public class DelayMessageController {

    @Autowired
    private DelayQueue delayQueue;


    @GetMapping("/delayStart")
    public void delayMessage() {
        long startTime = System.currentTimeMillis();
        log.info("开始时间:{}", startTime);

        // 订单1 10秒后过期
        Order order1 = new Order(1L, "华为", 2000.0, startTime, startTime + 10000);
        // 订单2 20秒后过期
        Order order2 = new Order(2L, "三星", 1000.0, startTime, startTime + 20000);
        // 订单3 30秒后过期
        Order order3 = new Order(3L, "oppo", 1500.0, startTime, startTime + 30000);

        delayQueue.put(order1);
        delayQueue.put(order2);
        delayQueue.put(order3);
    }
}

7.测试

浏览器访问开启延时队列的请求。

SpringBoot使用Redis实现延时队列_SpringBoot

我们可以从控制台的日志可以看出延时队列的执行成功了。

2023-12-25 15:39:06.086  INFO 14164 --- [nio-8090-exec-2] c.e.n.controller.DelayMessageController  : 开始时间:1703489946086
2023-12-25 15:39:16.779  INFO 14164 --- [pool-3-thread-1] c.e.nettydemo.redis.DelayMessageHandler  : 15:39:16处理订单:Order(id=1, product=华为, money=2000.0, orderTime=1703489946086, expireTime=1703489956086)
2023-12-25 15:39:26.875  INFO 14164 --- [pool-3-thread-1] c.e.nettydemo.redis.DelayMessageHandler  : 15:39:26处理订单:Order(id=2, product=三星, money=1000.0, orderTime=1703489946086, expireTime=1703489966086)
2023-12-25 15:39:36.968  INFO 14164 --- [pool-3-thread-1] c.e.nettydemo.redis.DelayMessageHandler  : 15:39:36处理订单:Order(id=3, product=oppo, money=1500.0, orderTime=1703489946086, expireTime=1703489976086)

这样我们就利用Redis的数据结构实现了一个延时消息队列的处理。

  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK