5

并发队列:ArrayBlockingQueue实际运用场景和原理

 3 years ago
source link: https://segmentfault.com/a/1190000039159377
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

ArrayBlockingQueue实际应用场景

之前在某公司做过一款情绪识别的系统,这套系统通过调用摄像头接口采集人脸信息,将采集的人脸信息做人脸识别和情绪分析,最终经过一定的算法将个人情绪数据转化具体行为指标值。其中采集图片的部分就用到了并发队列ArrayBlockingQueue。

m6Zfyuz.png!mobile

如上图所示:摄像头有n个,单线程采集的效率会比较慢,所以在采集摄像头的过程中是多线程的,另外采集到的图片需要存储到图片服务器,对图片服务器写也有很高的要求,图片服务器是集群的,也需要用到也多线程的。将图片入库后需要将图片数据打到 人脸分析服务器 上去处理,这部分涉及到了分布式消息,所以是 黑色虚线 部分用kafka来传递消息。其中 红色虚线部分 多线程图片采集将信息传递到多线程图片存储用到了 ArrayBlockingQueue ,它是 并发安全队列

ArrayBlockingQueue简化类图结构

3aq26nJ.png!mobile

从类图可以看出Queue接口提供了add,offer入队列的方法,提供 poll出队列的方法!

BlockingQueue接口增加了put 入队列的方法,提供 take出队列的方法!

补充说明:UML类图结构:

  • 继承:实线空箭头。
  • 实现:虚线虚箭头。

并发队列阻塞和非阻塞概念

从上面类图名字可以看到 Queue 提供的方法是非阻塞的!而 BlockingQueue 提供的 put, take 方法是阻塞的!下面按老思路,我们用代码说明 阻塞非阻塞 下!

非阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞测试
 * @modified By:
 * 公众号:叫练
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.offer("叫练");
        arrayBlockingQueue.offer("叫练");
        //输出arrayBlockingQueue的长度
        System.out.println(arrayBlockingQueue.size());
    }
}

如上代码:设置ArrayBlockingQueue长度为1,通过offer方法向队列添加2个元素,最后打印arrayBlockingQueue的长度?答案是1, 不会阻塞 ,因为offer方法 丢弃 了第二个元素“叫练”,我们说 出队和入队能够让其继续执行的队列我们称为非阻塞 。如果换成add方法呢?就会报错队列溢出,如下图所示!但是还不是阻塞的。下面我们看看什么阻塞!

BB7bi2m.png!mobile

阻塞

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞测试
 * @modified By:
 * 公众号:叫练
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.put("叫练");
        arrayBlockingQueue.put("叫练");
        //输出arrayBlockingQueue的长度
        System.out.println(arrayBlockingQueue.size());
    }
}

如上代码:ArrayBlockingQueue长度为1,通过put方法向队列添加2个元素,最后输出arrayBlockingQueue的长度是多少?答案是控制台一直运行,因为在添加第二个“叫练”时程序阻塞了。我们说 出队和入队不能够让其继续执行的队列我们称为阻塞, add方法,poll方法,take方法我们就不一一举例了,大家可以写代码做下最简单的测试!

好啦,我们对几个方法做个总结吧!

  • 入队:

offer:队列满了丢弃。

add :队列满了报错。

put :阻塞。

  • 出队:

poll :如果队列为空则返回null。

take :阻塞。

ArrayBlockingQueue实现原理浅析

QfYvum.png!mobile

如上图, ArrayBlockingQueue 是用数组实现的,ReentrantLock独占锁控制数组的入队和出队。notEmpty,notFull是ReentrantLock的两个条件队列,用来控制队列是否进入阻塞状态,是生产者和消费者模型。下面我们看看 take,put方法 流程,其他的方法同理。

  • *take方法:多个线程 竞争 独占锁获取 items[ taskIndex ]队首元素 ,其中A线程 成功 获取锁,其他线程阻塞等待A线程执行完成释放锁,如果队列不为空,A线程获取items[ taskIndex ]元素返回移除并释放锁让其他阻塞线程继续竞争;如果队列为空,A线程调用notEmpty.await方法进入条件队列并释放锁 让其他阻塞线程继续竞争, *其他线程发现队列为空也会进入notEmpty条件队列,等待put线程入队通知notEmpty阻塞线程。
    **
  • put方法: 多个线程 竞争 独占锁设置 items[putIndex ]队尾元素, 其中A线程 成功 获取锁,其他线程阻塞等待A线程执行完成释放锁,如果队列不满【队列长度】,A线程添加items[ putIndex ]元素返回并释放锁让其他阻塞线程继续竞争;如果队列满了,A线程调用notFull.await方法进入条件队列并释放锁 让其他阻塞线程继续竞争, 其他线程发现队列为空也会进入 notFull 条件队列,等待take线程出队通知 notFull 阻塞线程

完全非阻塞队列ConcurrentLinkedQueue

ConcurrentLinkedQueue也实现了Queue接口,提供offer,add,poll方法都是非阻塞的,另外从名字可以看出,底层是链表结构,入队和出队用的是自旋的cas。

List 多线程安全方案:LinkedBlockingQueue

ymyia2q.png!mobile

LinkedBlockingQueue和ArrayBlockingQueue 类似,LinkedBlockingQueue是有界的,长度是 Integer.MAX_VALUE 实现上,LinkedBlockingQueue是链表,而且是双锁,如上图所示,takeLock独占锁控制队列头部,putLock控制队列尾部,互不影响,目的是提高LinkedBlockingQueue的并发度。

总结

今天我们介绍了并发队列重要的几个概念,整理出来希望能对你有帮助,写的比不全,同时还有许多需要修正的地方,希望亲们加以指正和点评,年前这段时间会继续输出线程池这些概念等。最后喜欢的请点赞加关注哦。 点关注,不迷路, 我是 叫练【公众号】 ,边叫边练。

yaqIbe.jpg!mobile

参考书籍:《Java并发编程之美》


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK