7

无界阻塞延迟队列DelayQueue基本原理与使用

 3 years ago
source link: https://my.oschina.net/u/2373935/blog/5082408
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

DelayQueue 类关系图

  • 从类关系图谱上看,本质上具有集合、队列、阻塞阻塞队列、延迟等特性

应用场景:

  • 延迟队列(类似RocketMQ中提供的机制)
  • 定时任务(定时触发某个任务)

核心原理:

  1. 队列中的元素按到期时间排好序;
  2. 假设存在3个消费者线程
  3. 线程1通过争抢成为了leader
  4. 线程1查看队列头部元素
  5. 发现需要2s后到期,则进入睡眠状态2s后唤醒
  6. 此时线程2、3处于待命状态,不会做任何事情
  7. 线程1唤醒后,拿到对象1后,向线程2、3发送signal
  8. 线程2、3收到信号后,争抢leader
  • 进一步状态
  1. 此处假设线程2抢到leader
  2. 线程2查看对象2状态,休眠3s后唤醒
  3. 后续逻辑与线程1逻辑类同
  4. 线程2被唤醒后,线程3成为leader进入等待状态
  5. 此时,若线程1已处理完毕,则继续处于待命状态
  6. 若线程1未处理完毕,则继续处理
  1. 一种不好的情况,3个线程因处理时间较长,目前都在处理中状态;
  2. 此时对象4快要到期了,没有消费者线程空下来消费
  3. 此时对象4的处理会延期
  4. 如果元素进入队列很快、且元素间到期时间相对集中,并且元素处理时间较长时,可能造成队列元素堆积情况
  1. 还有一种特殊情况,若目前处于左图现状
  2. 队列中的头元素突然发生变化
  3. 因为leader是取头元素的,此时的leader将没有意义
  4. 则将把当前leader = null
  5. 此时可能唤醒线程2、3中的某一个成为新的leader
  6. 新的leader将重新查看当前队列中最新的头元素
  7. 再后面的逻辑与上述一致;

核心方法offer()

核心方法take()

  • 重要方法解释
offfer() ->插入元素到队列中
peek() -> 窥视 查看
await() -> 待命
awaitNanos - > 等待
signal() -> 发出信号
poll() -> 从队列中弹出头部元素
lockInterruptibly() ->加了一把可中断锁

延迟队列实现代码

/**
 * @author qinchen
 * @date 2021/6/17 14:27
 * @description 延迟队列数据对象
 */
public class Order implements Delayed {

    /**
     * 延迟时间
     */
    private Long delayTime;

    private String name;

    public Order(Long delayTime, String name) {
        this.delayTime = System.currentTimeMillis() + delayTime;
        this.name = name;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {

        Order order = (Order) o;
        Long t = this.delayTime - order.delayTime;

        if( t > 0) {
            return 1;
        }

        if( t < 0) {
            return -1;
        }

        return 0;
    }

    public String getName() {
        return name;
    }
}
public class OrderConsumer implements Runnable{

    private DelayQueue<Order> queue;

    public OrderConsumer(DelayQueue<Order> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            try {
                Order take = queue.take();
                System.out.println("消费的订单名称:" + take.getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
Order order1 = new Order(5000L, "Order1");
Order order2 = new Order(12000L, "Order2");
Order order3 = new Order(3000L, "Order3");

DelayQueue<Order> queue = new DelayQueue<>();

queue.offer(order1);
queue.offer(order2);
queue.offer(order3);

ExecutorService exec = new ThreadPoolExecutor(4, 8,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
exec.execute(new OrderConsumer(queue));
exec.shutdown();

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK