25

JDK 源码分析:ReentrantReadWriteLock

 4 years ago
source link: https://mp.weixin.qq.com/s/OSyITbb51MBCV5ZHW-sfJA
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

概述

前面分析过 ReentrantLock「 JDK源码分析-ReentrantLock 」,它是一种互斥的可重入锁,可用于处理并发场景下的线程安全问题。而很多时候会出现“读多写少”的情况,若用 ReentrantLock 会降低并发量,此时 就比较适合   ReentrantReadWriteLock 出场了

ReentrantReadWriteLock 是读写锁,它 维护了一对锁:一个读锁,一个写锁。读锁之间是共享 的,写锁是互斥的。与  ReentrantLock 相比,读写锁在读多写少的场景下允许更高的并发量。它的 类签名如下:

public class ReentrantReadWriteLock

implements ReadWriteLock, java.io.Serializable {}

下面分析其代码实现。

代码分析

ReadWriteLock 接口

ReentrantReadWriteLock 实现了  ReadWriteLock 接口,其代码如下

public interface ReadWriteLock {

/**

* 返回读锁

*/

Lock readLock();


/**

* 返回写锁

*/

Lock writeLock();

}

该接口定义了两个方法,分别返回读锁和写锁, 有关 Lock 接口的分析 可参考前文「 JDK源码分析-Lock&Condition 」。

构造器

仍然先从构造器开始分析,如下:

// 无参构造器(默认非公平)

public ReentrantReadWriteLock() {

this(false);

}


// 以给定的公平策略创建一个 ReentrantReadWriteLock 对象

// true 为公平,false 为非公平

public ReentrantReadWriteLock(boolean fair) {

sync = fair ? new FairSync() : new NonfairSync();

readerLock = new ReadLock(this);

writerLock = new WriteLock(this);

}

与 ReentrantLock 类似,这里的构造器也传入了公平策略,且默认为非公平。构造器内部初始化了三个变量:sync、readerLock 和 writerLock,如下:

// 提供读锁的内部类

private final ReentrantReadWriteLock.ReadLock readerLock;

// 提供写锁的内部类

private final ReentrantReadWriteLock.WriteLock writerLock;

// 执行所有的同步机制

final Sync sync;

下面先分析这几个内部类的代码。

Sync 类

Sync 类继承自 AQS(与 ReentrantLock 中的 Sync 类似),如下:

abstract static class Sync extends AbstractQueuedSynchronizer {

// 使用 AQS 中的 state 变量(int 类型)来记录读写锁的占用情况

// 其中高 16 位记录读锁的持有次数;低 16 位记录写锁的重入次数

static final int SHARED_SHIFT = 16;

static final int SHARED_UNIT = (1 << SHARED_SHIFT);

static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;


// 共享锁(读锁)的持有次数

static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

// 互斥锁(写锁)的重入次数

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }


/**

* 每个线程持有读锁的计数器。

* 以 ThreadLocal 形式保存,缓存在 cachedHoldCounter

* 该类的主要作用是记录线程持有读锁的数量,可理解为 <tid,count> 的形式

*/

static final class HoldCounter {

int count = 0;

// Use id, not reference, to avoid garbage retention

final long tid = getThreadId(Thread.currentThread());

}

static final class ThreadLocalHoldCounter

extends ThreadLocal<HoldCounter> {

public HoldCounter initialValue() {

return new HoldCounter();

}

}


/**

* 当前线程持有的可重入读锁的数量(数量为0时删除)。

*/

private transient ThreadLocalHoldCounter readHolds;


private transient HoldCounter cachedHoldCounter;


/**

* firstReader:第一个获取读锁的线程;

* firstReaderHoldCount:firstReader 的持有计数。

*/

private transient Thread firstReader = null;

private transient int firstReaderHoldCount;


Sync() {

readHolds = new ThreadLocalHoldCounter();

setState(getState()); // ensures visibility of readHolds

}


/*

* 对于公平锁和非公平锁,获取和释放锁使用的代码相同;

* 但在队列非空时,它们是否或如何允许插入的方式不同。

*/

/**

* 当前线程在尝试(或有资格)获取读锁时,是否应该由于策略原因而阻塞。

*/

abstract boolean readerShouldBlock();


/**

* 当前线程在尝试(或有资格)获取写锁时,是否应该由于策略原因而阻塞。

*/

abstract boolean writerShouldBlock();


// 释放写锁

protected final boolean tryRelease(int releases) {

if (!isHeldExclusively())

throw new IllegalMonitorStateException();

int nextc = getState() - releases;

boolean free = exclusiveCount(nextc) == 0;

if (free)

setExclusiveOwnerThread(null);

setState(nextc);

return free;

}


// 获取写锁

protected final boolean tryAcquire(int acquires) {

/*

* 流程:

* 1. 若其他线程持有读锁或写锁(计数不为零),返回 false;

* 2. 若持有数量饱和(超出上限),返回 false;

* 3. 该线程有资格获取锁,更新 state 并设置为 owner。

*/

Thread current = Thread.currentThread();

int c = getState();

int w = exclusiveCount(c);

if (c != 0) {

// (Note: if c != 0 and w == 0 then shared count != 0)

if (w == 0 || current != getExclusiveOwnerThread())

return false;

if (w + exclusiveCount(acquires) > MAX_COUNT)

throw new Error("Maximum lock count exceeded");

// Reentrant acquire

setState(c + acquires);

return true;

}

// 若获取写锁时应该阻塞,或者更新 state 失败,返回 false

if (writerShouldBlock() ||

!compareAndSetState(c, c + acquires))

return false;

setExclusiveOwnerThread(current);

return true;

}


// 释放读锁

protected final boolean tryReleaseShared(int unused) {

Thread current = Thread.currentThread();

// 若当前线程是第一个持有读锁的线程

if (firstReader == current) {

// assert firstReaderHoldCount > 0;

if (firstReaderHoldCount == 1)

firstReader = null;

else

firstReaderHoldCount--;

} else {

// 更新缓存

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

rh = readHolds.get();

int count = rh.count;

if (count <= 1) {

readHolds.remove();

if (count <= 0)

throw unmatchedUnlockException();

}

--rh.count;

}

// 更新 state

for (;;) {

int c = getState();

int nextc = c - SHARED_UNIT;

if (compareAndSetState(c, nextc))

// Releasing the read lock has no effect on readers,

// but it may allow waiting writers to proceed if

// both read and write locks are now free.

return nextc == 0;

}

}


// 获取读锁

protected final int tryAcquireShared(int unused) {

/*

* 流程:

* 1. 如果其他线程持有写锁,获取失败;

* 2. 否则,该线程有资格获取,因此请询问它是否由于队列策略而阻塞;

* 若不阻塞,尝试通过 CAS 更新状态计数。

* 注意:这一步没有检查可重入的获取,推迟到完整版本,

* 以避免在明显不可重入的情况下检查持有计数。

* 3. 如果第二步失败,要么是因为线程明显不符合条件、CAS 失败或计数饱和,

* 则进行完整重试版本。

*/

Thread current = Thread.currentThread();

int c = getState();

// step1. 若写锁被其他线程占用,则获取失败

// exclusiveCount(c) != 0表示写锁被占用

if (exclusiveCount(c) != 0 &&

getExclusiveOwnerThread() != current)

return -1;

// step2. 获取读锁数量

int r = sharedCount(c);

if (!readerShouldBlock() &&

r < MAX_COUNT &&

compareAndSetState(c, c + SHARED_UNIT)) {

// 读锁未被占用,设置该线程是第一个持有读锁的线程

if (r == 0) {

firstReader = current;

firstReaderHoldCount = 1;

// 该线程已持有读锁,计数加1

} else if (firstReader == current) {

firstReaderHoldCount++;

// 其他线程已持有读锁

} else {

// 取缓存

HoldCounter rh = cachedHoldCounter;

// 若未初始化,或者拿到的不是当前线程的计数,则从 ThreadLocal 中获取

if (rh == null || rh.tid != getThreadId(current))

cachedHoldCounter = rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

// 增加计数

rh.count++;

}

// 获取成功

return 1;

}

// step3. 若step2获取失败,则执行该步骤

return fullTryAcquireShared(current);

}


/**

* 获取读锁的完整版,处理 tryAcquireShared 中未处理的 CAS 丢失和可重入读取。

*/

final int fullTryAcquireShared(Thread current) {

HoldCounter rh = null;

for (;;) {

int c = getState();

// 如果其他线程占用写锁,获取失败

if (exclusiveCount(c) != 0) {

if (getExclusiveOwnerThread() != current)

return -1;

// else we hold the exclusive lock; blocking here

// would cause deadlock.

} else if (readerShouldBlock()) {

// Make sure we're not acquiring read lock reentrantly

if (firstReader == current) {

// assert firstReaderHoldCount > 0;

} else {

if (rh == null) {

rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current)) {

rh = readHolds.get();

if (rh.count == 0)

readHolds.remove();

}

}

if (rh.count == 0)

return -1;

}

}

if (sharedCount(c) == MAX_COUNT)

throw new Error("Maximum lock count exceeded");

if (compareAndSetState(c, c + SHARED_UNIT)) {

if (sharedCount(c) == 0) {

firstReader = current;

firstReaderHoldCount = 1;

} else if (firstReader == current) {

firstReaderHoldCount++;

} else {

if (rh == null)

rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

rh.count++;

cachedHoldCounter = rh; // cache for release

}

return 1;

}

}

}


/**

* 执行写锁的 tryLock 方法

* 与 tryAcquire 相比,该方法未调用 writerShouldBlock

*/

final boolean tryWriteLock() {

Thread current = Thread.currentThread();

int c = getState();

if (c != 0) {

int w = exclusiveCount(c);

if (w == 0 || current != getExclusiveOwnerThread())

return false;

if (w == MAX_COUNT)

throw new Error("Maximum lock count exceeded");

}

if (!compareAndSetState(c, c + 1))

return false;

setExclusiveOwnerThread(current);

return true;

}


/**

* 执行读锁的 tryLock 方法

* 与 tryAcquireShared 相比,该方法未调用 readerShouldBlock

*/

final boolean tryReadLock() {

Thread current = Thread.currentThread();

for (;;) {

int c = getState();

if (exclusiveCount(c) != 0 &&

getExclusiveOwnerThread() != current)

return false;

int r = sharedCount(c);

if (r == MAX_COUNT)

throw new Error("Maximum lock count exceeded");

if (compareAndSetState(c, c + SHARED_UNIT)) {

if (r == 0) {

firstReader = current;

firstReaderHoldCount = 1;

} else if (firstReader == current) {

firstReaderHoldCount++;

} else {

HoldCounter rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current))

cachedHoldCounter = rh = readHolds.get();

else if (rh.count == 0)

readHolds.set(rh);

rh.count++;

}

return true;

}

}

}

}

Sync 类继承自 AQS,主要重写了 AQS 中独占模式(参考「 JDK源码分析-AbstractQueuedSynchronizer(2) 」)和共享模式(参考「 JDK源码分析-AbstractQueuedSynchronizer(3) 」)下获取和释放锁的方法。

Sync 类的继承结构如下:

jIJ3emQ.png!web

NonfairSync 类

NonfairSync 继承自 Sync 类,提供非公平策略的实现,如下:

static final class NonfairSync extends Sync {

final boolean writerShouldBlock() {

return false; // writers can always barge

}

final boolean readerShouldBlock() {

// 调用父类 AQS 中的方法实现

return apparentlyFirstQueuedIsExclusive();

}

}


// 若头节点的下一个节点是写线程,为了防止写线程饥饿等待,当前的读线程应该阻塞

final boolean apparentlyFirstQueuedIsExclusive() {

Node h, s;

return (h = head) != null &&

(s = h.next) != null &&

!s.isShared() &&

s.thread != null;

}

非公平策略中,writerShouldBlock 返回 false,说明写线程无需阻塞;

readerShouldBlock 是调用父类 AQS 中的 apparentlyFirstQueuedIsExclusive 方法实现的,该方法通过判断等待队列中的第一个线程是否为写线程,若是则返回 true,表示给写线程让道。

PS: 通过分析这两个方法,发现在非公平策略下,写线程的优先级还是高于读线程的(纯属个人理解)。

FairSync 类

FairSync 也继承自 Sync 类,提供公平策略的实现,如下:

static final class FairSync extends Sync {

final boolean writerShouldBlock() {

return hasQueuedPredecessors();

}

final boolean readerShouldBlock() {

return hasQueuedPredecessors();

}

}

在公平策略中,两个方法都通过调用父类 AQS 的 hasQueuedPredecessors 方法判别,二者都是根据 等待队列中是否有其他线程,若有其他线程,则当前线程等待。

这就是公平的体现吧:无论读写,都乖乖去排队,别插队。

ReadLock

ReadLock 是读锁的实现,代码如下:

public static class ReadLock implements Lock, java.io.Serializable {

private final Sync sync;


protected ReadLock(ReentrantReadWriteLock lock) {

sync = lock.sync;

}


/**

* 获取读锁(不可中断):

* 1. 若其他线程未持有写锁,则获取读锁并立即返回;

* 2. 若其他线程持有写锁,则由于线程调度,当前线程被禁用并休眠,直到获取读锁。

*/

public void lock() {

sync.acquireShared(1);

}


/**

* 以中断方式获取读锁:

* 1. 若其他线程未持有写锁,则获取读锁并立即返回;

* 2. 若其他线程持有写锁,则由于线程调度,当前线程被禁用并休眠,

* 直到当前获取到读锁,或者被其他线程中断。

*/

public void lockInterruptibly() throws InterruptedException {

sync.acquireSharedInterruptibly(1);

}


/**

* 仅当另一个线程未持有写锁时才能获取读锁。

* 若另一个线程持有写锁,则立即返回 false。

*/

public boolean tryLock() {

return sync.tryReadLock();

}


/**

* 获取读锁(与 tryLock 方法类似,多了超时等待)。

*/

public boolean tryLock(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

}


/**

* 尝试释放该锁。

* 若读线程的数量为零,则该锁可用于尝试获取写锁。

*/

public void unlock() {

sync.releaseShared(1);

}


public Condition newCondition() {

throw new UnsupportedOperationException();

}

}

ReadLock 类实现了 Lock 接口,它的主要方法就是 Lock 接口所定义的方法(获取和释放锁)。 读锁之间是共享的 ,ReadLock 的主要方法都通过 AQS 共享模式的方法实现的。

WriteLock

WriteLock 是写锁,代码如下:

public static class WriteLock implements Lock, java.io.Serializable {

private final Sync sync;


protected WriteLock(ReentrantReadWriteLock lock) {

sync = lock.sync;

}


/**

* 获取写锁。

* 1. 若无其他线程持有读锁或写锁,则获取写锁并立即放回,并将写锁计数设为1;

* 2. 若当前线程已经持有写锁,则将其计数加1,并立即返回(可重入);

* 3. 若锁被其他线程持有,当前线程被禁用并处于休眠状态,直到获取写锁(计数设为1)。

*/

public void lock() {

sync.acquire(1);

}


/**

* 获取写锁(可被中断)。

* 1. 若无其他线程持有读锁或写锁,则获取并立即返回写锁,并将计数设为1;

* 2. 若当前线程已经持有写锁,则将其计数加1,并立即返回(可重入);

* 3. 若锁被其他线程持有,当前线程被禁用并处于休眠状态,

* 直到当前线程获取写锁(计数设为1)或被其他线程中断。

*/

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}


/**

* 仅当调用时其他线程未持有该写锁时,才获取该写锁。

*/

public boolean tryLock( ) {

return sync.tryWriteLock();

}


/**

* 尝试获取写锁(响应中断,有超时等待)。

*/

public boolean tryLock(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireNanos(1, unit.toNanos(timeout));

}


/**

* 尝试释放锁。

* 若当前线程是锁的持有者,则持有计数将减少;若当前持有计数为零则释放锁。

* 若当前线程不是锁的持有者,则抛出异常IllegalMonitorStateException

*/

public void unlock() {

sync.release(1);

}

public Condition newCondition() {

return sync.newCondition();

}

/**

* 查询此写锁是否由当前线程持有。

*/

public boolean isHeldByCurrentThread() {

return sync.isHeldExclusively();

}


/**

* 查询当前线程对该写锁的持有计数。

*/

public int getHoldCount() {

return sync.getWriteHoldCount();

}

}

与 ReadLock 类似,WriteLock 类也实现了 Lock 接口,其主要方法也是 Lock 接口所定义的方法(获取和释放锁)。 而写锁是互斥的,WriteLock 的大部分方法都是通过 AQS 独占模式的方法实现的。

ReentrantReadWriteLock 的主要代码就分析到这里,下面简单分析其用法和使用场景。

典型用法

示例代码

为便于 理解读写锁的操作 ,下面举个 栗子验证(代码 仅供参考)

public class TestRDLock {

// 创建一个线程池

private static ExecutorService threadPoolExecutor = new ThreadPoolExecutor(10, 20,

60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));

// 创建一个读写锁实例

private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();


public static void main(String[] args) {

for (int i = 0; i < 5; i++) {

// 这里可以尝试“读读“、“读写”和“写写”场景的代码测试(仅供参考)

threadPoolExecutor.execute(new ReadTask());

threadPoolExecutor.execute(new WriteTask());

}

}


// 写操作

private static class WriteTask implements Runnable {

@Override

public void run() {

readWriteLock.writeLock().lock();

try {

System.out.println(Thread.currentThread().getName() + " 获取写锁");

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

readWriteLock.writeLock().unlock();

System.out.println(Thread.currentThread().getName() + " 释放了写锁");

}

}

}


// 读操作

private static class ReadTask implements Runnable {

@Override

public void run() {

readWriteLock.readLock().lock();

try {

System.out.println(Thread.currentThread().getName() + " 获取读锁");

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

readWriteLock.readLock().unlock();

System.out.println(Thread.currentThread().getName() + " 释放了读锁");

}

}

}

}

Java API 文档中还提供了两个典型的使用场景,如下:

场景一:更新缓存后执行锁降级

class CachedData {

Object data;

volatile boolean cacheValid;

final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();


void processCachedData() {

// 先获取读锁

rwl.readLock().lock();

if (!cacheValid) {

// Must release read lock before acquiring write lock

rwl.readLock().unlock();

rwl.writeLock().lock();

try {

// Recheck state because another thread might have

// acquired write lock and changed state before we did.

// 更新缓存(持有写锁的情况下)

if (!cacheValid) {

data = ...

cacheValid = true;

}

// Downgrade by acquiring read lock before releasing write lock

rwl.readLock().lock();

} finally {

// 释放写锁(仍然持有读锁,即降级为读锁)

rwl.writeLock().unlock(); // Unlock write, still hold read

}

}


try {

use(data);

} finally {

// 释放读锁

rwl.readLock().unlock();

}

}

}

场景二:在较大的集合中,读多写少的情况

class RWDictionary {

private final Map<String, Data> m = new TreeMap<String, Data>();

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

private final Lock r = rwl.readLock();

private final Lock w = rwl.writeLock();

public Data get(String key) {

r.lock();

try { return m.get(key); }

finally { r.unlock(); }

}

public String[] allKeys() {

r.lock();

try { return m.keySet().toArray(); }

finally { r.unlock(); }

}

public Data put(String key, Data value) {

w.lock();

try { return m.put(key, value); }

finally { w.unlock(); }

}

public void clear() {

w.lock();

try { m.clear(); }

finally { w.unlock(); }

}

}

小结

1. ReentrantReadWriteLock 是一种读写锁,它持有一对锁:读锁和写锁。 其中读锁之间是共享的,写锁是互斥的。

2. 「读多写少」的场景下, ReentrantReadWriteLock 比 ReentrantLock 有更高的并发性。

3. 与 ReentrantLock 原理类似, ReentrantReadWriteLock  内部也基于 AQS:其中读锁基于「共享模式」实现,写锁基于「独占模式」实现。

参考:

1. https://docs.oracle.com/javase/8/docs/api/index.html

2. https://blog.csdn.net/fxkcsdn/article/details/82217760

RbUbmmb.png!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK