2

手写源码-揭开BlockingQueue的面纱

 2 years ago
source link: https://jelly.jd.com/article/61c0687d95fcc834d4c67f2d
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
JELLY | 手写源码-揭开BlockingQueue的面纱
手写源码-揭开BlockingQueue的面纱
上传日期:2021.12.24
最近在爱心东东项目中,发现了异步设计上存在一些问题,对异步回调的处理稍显粗暴简单。后续优化为,设计了一套基于BlockingQueue的异步消费处理的优化方案。回忆2018年的这个时间上,在一次面试的机试环节上,被要求按照自己的想法,设计和实现一套线程安全的生成消费模式的queue模型,一股重学和重写BlockingQueue的冲动就此产生。本文将根据简单的图文和模仿ArrayBlockingQueue实现的源码,阐述基于数组如何实现线程安全的Queue的核心原理。

什么是BlockingQueue?

BlockingQueue的字面理解为阻塞队列,即对队列的操作可产生阻塞而保证了线程安全的出队和入队操作。它继承了java.util.Queue。常用实现分别有基于数组实现的ArrayBlockingQueue和基于链表实现的LinkedBlockingQueue。

二 ArrayBlockingQueue主要都有什么属性?

其主要属性可参考下图:

11d36b920f7b79f2.png

实现 java.util.Queue接口后,将实现queue的核心方法,包括了add(),offer(),take(),pull(),contains()等等。这些方法的实现都离不开以下属性:

a): 一个Object的数组items, 因为java是面向对象的语言,因此,Object类型的数组即可存放万物了!

b): 一个整数类型的cap属性,表示初始化时,设置的容量!使用时,应对根据实际业务场景,分配合理的容量,避免内存溢出。

c): 一个存放元素的下标putIndex,随着元素存放而后移的下标

d): 一个拉去元素的下标takeIndex,随着元素获取而后移的下标

e): 一个整数类型的count属性,表示当前存放了多少元素

f): 一把锁,lock,操作队列时通过其加锁,保证线程安全的操作

g): 两个信号站,一个负责存放信号,一个负责读取信号

三 它大概是怎么工作的呢?

11d36b920f7b79f2.png

参考上图, 可以理解到最核心的存方法和取方法的基本逻辑:

a): add数据时,首先要获取到lock,根据count(当前存放量)和 cap(容量)做大小比较,判断是否已满。如果已满,会有多种处理方式,可直接返回false的结果,可使用put信号挂起线程等待,又或者设置一定的等待时间,从而衍生出了 put(), offer(),offer(Time) 等多个存放方法

b): get数据时,也要先获取到lock,根据count判断是否有数据可取,如果没有,也有多个处理方式,可直接null返回,可使用take信号挂起线程等待,又或者设置一个等待时间,同样衍生出了take(),pull(),pull(Time)等多个拉取方法。

四 核心方法到底怎么写呢?

下面会手写出基于数组实现的BlockingQueue的几个核心方法,如 构造方法,存入方法,拉取方法,contains方法等,因为是学习代码,所以比起ArrayBlockingQueue的源码,省去了参数判断等部分,部分扩展情景使用了文字说明而未做具体实现,仅供学习参考!

1 一个最简单的构造方法,如下图:

11d36b920f7b79f2.png

注:根据传来的容量参数,初始化数组,锁,put信号,take信号等!

2 一个简单的add(Object o)方法,如下图:

11d36b920f7b79f2.png

1): 一个存放数据的方法实现,第一步是获取lock,使用lock加锁,保证阻塞,保证了线程安全,与之相呼应的是在finally模块中保证锁的释放。

2): 在获取到lock后,根据count 和 cap 大小比较判断有无存放空间。如果没有,可选择返回false,可选择挂起线程。如果有,根据putIndex下标向数组中存放数据,然后对putIndex下标+1

3): 如果上一步加完后的putIndex已经和容量一样大,说明刚刚已经是加在了数组最后一位了,这时候把putIndex做归0初始化!

4): 存放了元素,那表示存量的count当然要+1了

5): 最终,存了数据就可以向那些在take信号站内苦苦等待取数的线程们释放一个“有数可取啦”的信号。

3 一个简单的pull()方法实现,如下图:

11d36b920f7b79f2.png

1): 一个简单的拉取数据方法实现第一步,同样是获取lock,使用lock阻塞,保证线程安全,在对应的finally模块中保证锁释放。

2): 在获取到lock后,根据count判断有无可取数据,如果没有,可选择返回一个null,可选择挂起线程等待。如果有,使用 takeIndex 下标从数组 items 中取数

3): 取完后对 takeIndex 做+1,这时候如果 takeIndex 已经和容量一样大小,说明刚刚取数的位置,已经是数组最后一位啦,同样对 takeIndex 做归0的初始化。

4): 取走一个数,当然要对存量 count 做减1了

5): 同样,也代表了一个存放位置的空出,继而释放一个“有空位啦”的信号给那些在put信号站里苦苦等待空位的存数线程们!

6): 特别说明:这里可以看到,取数,只是返回了数组在对应下标位置的引用,并不是把该位置清空,只是释放了一个“这个位置已空,可以存放”信号,该位置在新的数据存入前,依然是存放着被取走的那个引用,只是因为takeIndex已经偏移走了,不能再取了!

4 一个判断是否已存在的contains(Object o)方法则简单的多,如下图:

11d36b920f7b79f2.png

注:根据takeIndex即取数据下标位,和总容量count,依次从数组中取出尚未被pull走的数据,调用equals方法和传入参数比较是否相等,由此判断是否包含。

五 总结和心得

以上即为一个数组阻塞队列的简单实现。如今再看ArrayBlockingQueue的源码,已经感觉到简单易懂,但是对于大神Doug.lea精妙的写法和设计,依然无比崇敬。回想起java的源码阅读之路,认识地最深刻的一个道理是:阅读一个面向对象的源码,不用过早过急的去追问方法的具体实现,如果能理解这个对象的具体属性的含义,那才是俘获了这个对象的芳心,后续的行为阅读,亦可是事半功倍的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK