23

Java并发队列与容器

 3 years ago
source link: https://my.oschina.net/bigdatalearnshare/blog/4836622
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Java并发队列与容器 - 大数据学习与分享的个人空间 - OSCHINA - 中文开源技术交流社区

【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,会发现它们底层大多用到了Java并发队列、同步类容器、ReentrantLock等。建议大家结合本篇文章,仔细分析一下相关源码】 BlockingQueue
阻塞队列,位于java.util.concurrent并发包下,它很好的解决了多线程中如何安全、高效的数据传输问题。所谓“阻塞”是指在某些情况下线程被挂起,当满足一定条件时会被自动唤醒,可以通过API进行控制。 常见的阻塞队列主要分为两种FIFO(先进先出)和LIFO(后进先出),当然通过不同的实现方式,还可以引申出多种不同类型的队列。首先了解一下BlockingQueue的几个核心API:put、take一对阻塞存取;add、poll一对非阻塞存取。 插入数据 put(anObj):把anObj加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞,直到BlockingQueue里面有空间再继续插入 add(anObj):把anObj加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常 offer(anObj):表示如果可能的话,将anObj加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则返回false。 读取数据 take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态,直到Blocking有新的对象被加入为止

poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

BlockingQueue核心成员介绍 ArrayBlockingQueue 基于数组实现的有界阻塞队列。因为基于数组实现,所以具有查找快,增删慢的特点。 生产者和消费者用的是同一把锁,不能并行执行效率低。它底层使用了一种标准互斥锁ReentrantLock,即读读、读写,写写都互斥,当然可以控制对象内部是否采用公平锁,默认是非公平锁。消费方式是FIFO。
生产和消费数据时,直接将枚举对象插入或删除,不会产生或销毁额外的对象实例。 应用:因为底层生产和消费用了同一把锁,定长数组不用频繁创建和销毁对象,适合于想按照队列顺序去执行任务,还不想出现频繁的GC的场景。 LinkedBlockingQueue 基于链表实现的阻塞队列,同样具有增删快,定位慢的特点。 需要注意一点:默认情况下创建的LinkedBlockingQueue容量是Integer.MAX_VALUE, 在这种情况下,如果生产者的速度一旦大于消费者的速度,可能还没有等到队列满阻塞产生,系统内存就有可能已被消耗尽。可以通过指定容量创建LinkedBlockingQueue避免这种极端情况的发生。 虽然底层使用的也是ReentrantLock但take和put是分离的(生产和消费的锁不是同一把锁),高并发场景下效率仍然高于ArrayBlockingQueue。put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。 DelayQueue

DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。DelayQueue中的元素,只有指定的延迟时间到了,才能够从队列中获取到该元素。

应用场景: 1.客户端长时间占用连接的问题,超过这个空闲时间了,可以移除的 2.处理长时间不用的缓存:如果队列里面的对象长时间不用,超过空闲时间,就移除

3.任务超时处理

PriorityBlockingQueue PriorityBlockingQueue不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此必须控制生产者生产数据的速度,避免消费者消费数据速度跟不上,否则时间一长,会最终耗尽所有的可用堆内存空间。 在向PriorityBlockingQueue中添加元素时,元素通过在实现实现Comparable接口,重写compareTo()来定义优先级的逻辑。它内部控制线程同步的锁采用的是公平锁。 SynchronousQueue 一种无缓冲的等待队列,来一个任务就执行这个任务,这期间不能添加任何的任务。也就是不用阻塞了,其实对于少量任务而言,这种做法更高效。

声明一个SynchronousQueue有两种不同的方式,公平模式和非公平模式:

公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体现整体的公平策略; 非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。 ConcurrentLinkedQueue 不上锁,高并发场景效率远高于ArrayBlockingQueue和LinkedBlockingQueue等 容器 同步类容器 第一类:Vector、Stack、HashTable都是同步类,线程安全的,但高并发场景下仍然可能出现问题如ConcurrentModificationException。

第二类:Collections提供的一些工厂类(静态),效率低

291b5f35-4979-41b0-960a-c2072542278d.png

并发类容器

CopyOnWrite容器 写时复制的容器:当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行copy,复制出一个新的容器,然后往新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器,非常适合读多写少的场景。但同时存在如下问题: 数据一致性问题:CopyOnWrite容器是弱一致性的,即只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据能够即时读到,不要使用CopyOnWrite容器。 内存占用问题:因为CopyOnWrite 的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象。如果这些对象占用的内存比较大,如果控制不好,比如写特别多的情景,很有可能造成频繁的Yong GC 和Full GC。针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。

有两种常见的CopyOnWrite容器:CopyOnWriteArrayList和CopyOnWriteArraySet,其中CopyOnWriteArrayList是ArrayList 的一个线程安全的变体。

ConcurrentHashMap
笔者分JDK1.7和JDK1.8两部分说明ConcurrentHashMap。 JDK1.7 ConcurrentHashMap JDK1.7采用"锁分段"技术来降低锁的粒度,它把整个map划分为一系列由segment组成的单元,一个segment相当于一个hashtable。通过这种方式,加锁的对象就从整个map变成了一个segment。ConcurrentHashMap线程安全并且提高性能原因就在于:对map中的读是并发的,无需加锁;只有在put、remove操作时才加锁,而加锁仅是对需要操作的segment加锁,不会影响其他segment的读写。因此不同的segment之间可以并发使用,极大地提高了性能。 根据源码又可得出查找、插入、删除的过程:通过key的hash确定segement(插入时如果segment大小达到扩容阈值则进行扩容) --> 确定链表数组HashEntry下标(插入/删除时,获取链表头) --> 遍历链表【查询:调用equals()进行比对,找到与所查找key相等的结点并读取;插入:如果找到相同的key的结点则更新value值,如果没有则插入新结点;删除:找到被删除结点后,以被删除结点的next结点开始建立新的链表,然后再把原链表头直到被删结点的前继结点依次复制、插入新链表,最后把新链表头设置为当前数组下标元素取代旧链表。
JDK1. 8ConcurrentHashMap JDK1.8中的ConcurrentHashMap在JDK1.7上做了很多优化: 1.取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,通过进一步降低锁粒度来减少并发冲突的概率 2.将原先table数组+链表的数据结构,变更为table数组+链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能 3.新增字段transient volatile CounterCell[] counterCells,可方便的计算集合中所有元素的个数,性能大大优于jdk1.7中的size()方法 相信通过这些介绍,大家对于诸如"为什么选择ConcurrentHashMap?"会有很好的思路了。

近期文章:

Redis从入门到精通

Hadoop Yarn

NameNode调优

a194430b-9961-40be-a8f4-b1e37a6de680.png

本文分享自微信公众号 - 大数据学习与分享(bigdatalearnshare)。
如有侵权,请联系 [email protected] 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK