5

不允许还有Java程序员不了解BlockingQueue阻塞队列的实现原理 - 一灯架构

 1 year ago
source link: https://www.cnblogs.com/yidengjiagou/p/16799082.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我们平时开发中好像很少使用到BlockingQueue(阻塞队列),比如我们想要存储一组数据的时候会使用ArrayList,想要存储键值对数据会使用HashMap,在什么场景下需要用到BlockingQueue呢?

1. BlockingQueue的应用场景

当我们处理完一批数据之后,需要把这批数据发给下游方法接着处理,但是下游方法的处理速率不受控制,可能时快时慢。如果下游方法的处理速率较慢,会拖慢当前方法的处理速率,这时候该怎么办呢?

你可能想到使用线程池,是个办法,不过需要创建很多线程,还要考虑下游方法支不支持并发,如果是CPU密集任务,可能多线程比单线程处理速度更慢,因为需要频繁上下文切换。

这时候就可以考虑使用BlockingQueue,BlockingQueue最典型的应用场景就是上面这种生产者-消费者模型。生产者往队列中放数据,消费者从队列中取数据,中间使用BlockingQueue做缓冲队列,也就解决了生产者和消费者速率不同步的问题。

image

你可能联想到了消息队列(MessageQueue),消息队列相当于分布式阻塞队列,而BlockingQueue相当于本地阻塞队列,只作用于本机器。对应的是分布式缓存(比如:Redis、Memcache)和本地缓存(比如:Guava、Caffeine)。

另外很多框架中都有BlockingQueue的影子,比如线程池中就用到BlockingQueue做任务的缓冲。消息队列中发消息、拉取消息的方法也都借鉴了BlockingQueue,使用起来很相似。

今天就一块深入剖析一下Queue的底层源码。

2. BlockingQueue的用法

BlockingQueue的用法非常简单,就是放数据和取数据。

/**
 * @apiNote BlockingQueue示例
 * @author 一灯架构
 */
public class Demo {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建队列,设置容量是10
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        // 2. 往队列中放数据
        queue.put(1);
        // 3. 从队列中取数据
        Integer result = queue.take();
    }
}

为了满足不同的使用场景,BlockingQueue设计了很多的放数据和取数据的方法。

操作 抛出异常 返回特定值 阻塞 阻塞一段时间
放数据 add offer put offer(e, time, unit)
取数据 remove poll take poll(time, unit)
取数据(不删除) element() peek() 不支持 不支持

这几组方法的不同之处就是:

  1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer方法阻塞指定时间然后返回false。
  2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll方法阻塞指定时间然后返回null。
  3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

工作中使用最多的就是offer、poll阻塞指定时间的方法。

3. BlockingQueue实现类

BlockingQueue常见的有下面5个实现类,主要是应用场景不同。

  • ArrayBlockingQueue

    基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。

  • LinkedBlockingQueue

    基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小

  • SynchronousQueue

    一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费

  • PriorityBlockingQueue

    实现了优先级的阻塞队列,基于数据显示,是无界队列

  • DelayQueue

    实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列

4. BlockingQueue源码解析

BlockingQueue的5种子类实现方式大同小异,这次就以最常用的ArrayBlockingQueue做源码解析。

4.1 ArrayBlockingQueue类属性

先看一下ArrayBlockingQueue类里面有哪些属性:

// 用来存放数据的数组
final Object[] items;

// 下次取数据的数组下标位置
int takeIndex;

// 下次放数据的数组下标位置
int putIndex;

// 当前已有元素的个数
int count;

// 独占锁,用来保证存取数据安全
final ReentrantLock lock;

// 取数据的条件
private final Condition notEmpty;

// 放数据的条件
private final Condition notFull;

ArrayBlockingQueue中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析。

4.2 put方法源码解析

image

无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。

// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据
public void put(E e) throws InterruptedException {
    // 校验元素不能为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
  	// 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列已满,就一直阻塞,直到被唤醒
        while (count == items.length)
            notFull.await();
      	// 如果队列未满,就往队列添加元素
        enqueue(e);
    } finally {
      	// 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际往队列添加数据的方法
private void enqueue(E x) {
    // 获取数组
    final Object[] items = this.items;
    // putIndex 表示本次插入的位置
    items[putIndex] = x;
    // ++putIndex 计算下次插入的位置
    // 如果本次插入的位置,正好是队尾,下次插入就从 0 开始
    if (++putIndex == items.length)
        putIndex = 0;
  	// 元素数量加一
    count++;
    // 唤醒因为队列空等待的线程
    notEmpty.signal();
}

源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。

像下面这样:

image

4.3 take方法源码

// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  	// 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列为空,就一直阻塞,直到被唤醒
        while (count == 0)
            notEmpty.await();
        // 如果队列不为空,就从队列取数据
        return dequeue();
    } finally {
      	// 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际从队列取数据的方法
private E dequeue() {
  	// 获取数组
    final Object[] items = this.items;
    // takeIndex 表示本次取数据的位置,是上一次取数据时计算好的
    E x = (E) items[takeIndex];
    // 取完之后,就把队列该位置的元素删除
    items[takeIndex] = null;
    // ++takeIndex 计算下次取数据的位置
    // 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素数量减一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒被队列满所阻塞的线程
    notFull.signal();
    return x;
}

4.4 总结

  1. ArrayBlockingQueue基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  2. ArrayBlockingQueue底层采用循环队列的形式,保证数组位置可以重复使用。
  3. ArrayBlockingQueue存取都采用ReentrantLock加锁,保证线程安全,在多线程环境下也可以放心使用。
  4. 使用ArrayBlockingQueue的时候,预估好队列长度,保证生产者和消费者速率相匹配。

我是「一灯架构」,如果本文对你有帮助,欢迎各位小伙伴点赞、评论和关注,感谢各位老铁,我们下期见

image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK