JUC同步锁原理源码解析二--ReentrantReadWriteLock - bug的自我救赎
source link: https://www.cnblogs.com/selfsalvation/p/17484555.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
JUC同步锁原理源码解析二----ReentrantReadWriteLock
1.读写锁的来源
在开发场景下,对于写操作我们为了保证原子性所以需要上锁,但是对于读操作,由于其不改变数据,只是单纯对数据进行读取,但是每次都上一把互斥锁,阻塞所有请求。这个明显不符合读多写少的场景。所以将锁分为两把:读锁和写锁。
2.读写锁的底层实现
读写锁的底层实现依旧依赖于AQS。具体底层实现,请查看上一篇文章的介绍
2.AQS源码
Node节点
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
AbstractQueuedSynchronizer类
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;//最重要的一个变量
}
ConditionObject类
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
accquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&//尝试获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果获取锁失败,添加到队列中,由于ReentrantLock是独占锁所以节点必须是EXCLUSIVE类型
selfInterrupt();//添加中断标识位
}
addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建节点
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;//获取到尾指针
if (pred != null) {//尾指针不等于空,将当前节点替换为尾指针
node.prev = pred;
if (compareAndSetTail(pred, node)) {//采用尾插法,充分利用时间局部性和空间局部性。尾插的节点一般不容易被取消。
pred.next = node;
return node;
}
}
enq(node);//cas失败后执行入队操作,继续尝试
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;//获取尾指针
if (t == null) { //代表当前队列没有节点
if (compareAndSetHead(new Node()))//将当前节点置为头结点
tail = head;
} else {//当前队列有节点
node.prev = t;//
if (compareAndSetTail(t, node)) {//将当前节点置为尾结点
t.next = node;
return t;
}
}
}
}
acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到当前节点的前驱节点
if (p == head && tryAcquire(arg)) {//前驱节点等于头节点尝试cas抢锁。
setHead(node);//抢锁成功将当前节点设置为头节点
p.next = null; // help GC 当头结点置空
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&//当队列中有节点在等待,判断是否应该阻塞
parkAndCheckInterrupt())//阻塞等待,检查中断标识位
interrupted = true;//将中断标识位置为true
}
} finally {
if (failed)//
cancelAcquire(node);//取消当前节点
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//当前节点为空直接返回
return;
node.thread = null;//要取消了将当前节点的线程置为空
// Skip cancelled predecessors
Node pred = node.prev;//获取到当前节点的前驱节点
while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
node.prev = pred = pred.prev;
Node predNext = pred.next;//将当前要取消的节点断链
node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驱节点不等于头结点
((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
pred.thread != null) {//前驱节点的线程补位空
Node next = node.next;//获取当前节点的next指针
if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
} else {
unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
}
node.next = node; // help GC 将引用指向自身
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//获取当前节点状态
if (ws < 0)//如果节点为负数也即不是取消节点
compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//获取到下一个节点
if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
s = null;//将s置为空
for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等于空
LockSupport.unpark(s.thread);//唤醒当前节点s
}
shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//获取上一个节点的等待状态
if (ws == Node.SIGNAL)//如果状态为SIGNAL,代表后续节点有节点可以唤醒,可以安心阻塞去
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {//如果当前状态大于0,代表节点为CANCELLED状态
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;//从尾节点开始遍历,找到下一个状态不是CANCELLED的节点。将取消节点断链移除
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//这里需要注意ws>0时,已经找到了一个不是取消状态的前驱节点。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//将找到的不是CANCELLED节点的前驱节点,将其等待状态置为SIGNAL
}
return false;
}
cancelAcquire方法
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)//当前节点为空直接返回
return;
node.thread = null;//要取消了将当前节点的线程置为空
// Skip cancelled predecessors
Node pred = node.prev;//获取到当前节点的前驱节点
while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
node.prev = pred = pred.prev;
Node predNext = pred.next;//将当前要取消的节点断链
node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&//前驱节点不等于头结点
((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
pred.thread != null) {//前驱节点的线程补位空
Node next = node.next;//获取当前节点的next指针
if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
} else {
unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
}
node.next = node; // help GC 将引用指向自身
}
}
unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//获取当前节点状态
if (ws < 0)//如果节点为负数也即不是取消节点
compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//获取到下一个节点
if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
s = null;//将s置为空
for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//如果s不等于空
LockSupport.unpark(s.thread);//唤醒当前节点s
}
release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {//子类实现如何释放锁
Node h = head;//获取到头结点
if (h != null && h.waitStatus != 0)//获取到头结点,如果头结点不为空,等待状态不为0,唤醒后续节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//获取节点的等待状态
if (ws < 0)//如果等待状态小于0,标识节点属于有效节点
compareAndSetWaitStatus(node, ws, 0);//将当前节点的等待状态置为0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//获取到下一个节点
if (s == null || s.waitStatus > 0) {//如果节点是空,或者是取消状态的节点,就找到一个非取消状态的节点,将取消状态的节点断链后由垃圾回收器进行回收
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//节点不用空
LockSupport.unpark(s.thread);//唤醒当前等待的有效节点S
}
acquireShared方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//由子类实现
doAcquireShared(arg);
}
doAcquireShared方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//将共享节点也即读线程入队并返回
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//找到节点的前驱节点
if (p == head) {//如果前驱节点等于头结点
int r = tryAcquireShared(arg);//尝试获取共享锁数量
if (r >= 0) {//如果锁的数量大于0,表示还有多余的共享锁。这里等于0也需要进一步判断。由于如果当执行到这里时,有另外的线程释放了共享锁,如果不进行判断,将会导致释放锁的线程没办法唤醒其他线程。所以这里会伪唤醒一个节点,唤醒的节点后续如果没有锁释放,依旧阻塞在当前parkAndCheckInterrupt方法中
setHeadAndPropagate(node, r);//将当前节点的等待状态设置为Propagate。
p.next = null; // help GC
if (interrupted)//判断是会否中断过
selfInterrupt();//设置中断标识位
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//判断是否应该阻塞等待
parkAndCheckInterrupt方法中())//阻塞并检查中断标识
interrupted = true;//重置中断标识位
}
} finally {
if (failed)//如果失败
cancelAcquire(node);//取消节点
}
}
setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//将当前节点置为头结点
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 //可获取的共享锁也即读锁的数量,对于ReentrantReadWriteLock而言,永远都是1,所以会继续唤醒下一个读线程
|| h == null //如果旧的头结点为空
|| h.waitStatus < 0 ||//头结点的等待状态不为0
(h = head) == null || h.waitStatus < 0) {//旧头节点不为空并且等待状态小于0也即是有效节点
Node s = node.next;//获取到node的下一个节点
if (s == null || s.isShared())//如果node的下一个节点为空或者是共享节点
doReleaseShared();//唤醒下一个线程
}
}
releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//子类实现释放锁
doReleaseShared();//唤醒后续线程
return true;//释放成功
}
return false;//释放是吧
}
doReleaseShared方法
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;//获取到当前头结点
if (h != null && h != tail) {//如果头结点不为空并且不等于尾结点
int ws = h.waitStatus;//获取当前节点的等待状态
if (ws == Node.SIGNAL) {//如果状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//cas将SIGNAL状态置为0。SIGNAL标识后续有线程需要唤醒
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒后续线程
}
else if (ws == 0 &&//如果当前状态为0。表示有线程将其置为0
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//cas将0状态置为PROPAGATE。在多个共享锁同时释放时,方便继续进行读传播,唤醒后续节点
continue; // loop on failed CAS
}
if (h == head)//如果头结点没有改变,证明没有必要继续循环等待了,直接退出吧,如果头结点放生变化,可能有其他线程释放了锁。
break;
}
}
总结:AQS提供了统一的模板,对于如何入队出队以及线程的唤醒都由AQS提供默认的实现,只需要子类实现自己上锁和解锁的逻辑。
3.ReentrantReadWriteLock
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class RWDictionary {
private final Map<String, String> m = new TreeMap<String, String>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public String get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try {
return (String[])m.keySet().toArray();
}
finally { r.unlock(); }
}
public String put(String key, String value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }//高16位代表共享锁也即读锁
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//低16位代表独占锁也即写锁
}
公平锁的FairSync类实现
//读写锁中公平锁的实现:如果后续队列中有等待的节点,那么进行排队。但是这样的性能并不高,可能造成写饥饿。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
//如果队列中有节点在排队,那么就进行等待
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁的NonfairSync类实现
//非公平锁的实现相对来说性能较高,不阻塞,可以直接抢锁,防止写饥饿。同时有写线程排队,那么后续的所有读操作都需要进行排队
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge //非公平锁写锁永远不阻塞,防止写饥饿
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
//如果队列中有任务并且不是共享节点也即不是读操作,那么返回true,进行阻塞
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&//头结点不等于空
(s = h.next) != null &&//头结点的下一个节点补位空
!s.isShared() &&//并且当前节点不是共享节点,也即读线程
s.thread != null;//线程不为空
}
写锁的lock方法:
public void lock() {
sync.acquire(1);
}
//同样调用acquire的方法,请参考上文AQS的acquire方法的实现。同样都是获取失败进行排队
//子类获取锁的过程
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取当前state状态值
int w = exclusiveCount(c);//获取读锁的数量,这里doug lea一直都是使用二进制的风格
if (c != 0) {//代表持有锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())//如果写锁为0,或则当前线程不是写锁的持有线程
return false;//返回false
if (w + exclusiveCount(acquires) > MAX_COUNT)//当前锁重入次数大于最大值,抛出异常
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);//直接设置状态,代表锁重入次数加1
return true;//返回获取锁成功
}
if (writerShouldBlock() ||//写锁是否应该阻塞
!compareAndSetState(c, c + acquires))//cas尝试获取锁
return false;
setExclusiveOwnerThread(current);//cas抢锁成功,直接将写锁的持有线程设置为当前线程
return true;//返回true代表获取锁成功
}
writerShouldBlock的实现
//writerShouldBlock的实现请参考上面的公平锁和非公平锁的实现
写锁的unlock方法:
public void unlock() {
sync.release(1);
}
//这里是AQS的模板方法
public final boolean release(int arg) {
if (tryRelease(arg)) {//子类实现如何释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//如果不是独占锁也即写锁,直接抛异常
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//将锁的数量减少
boolean free = exclusiveCount(nextc) == 0;//如果独占锁也即写锁数量为0,标识已经释放。大于1是因为锁重入的问题
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁的lock方法:
public void lock() {
sync.acquireShared(1);
}
//acquireShared方法请看上面AQS的实现
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取当前state状态
if (exclusiveCount(c) != 0 &&//如果独占锁也即写锁的数量不等于0
getExclusiveOwnerThread() != current)//独占锁的拥有线程不是当前线程
return -1;//直接返回-1,获取失败
int r = sharedCount(c);//获取共享锁的数量
if (!readerShouldBlock() &&//读锁是否应该被阻塞
r < MAX_COUNT &&//读锁数量是否小于最大数量
compareAndSetState(c, c + SHARED_UNIT)) {//cas将读锁+1
if (r == 0) {//当前共享锁也即读锁数量为0
firstReader = current;//标记为第一个读线程
firstReaderHoldCount = 1;//将第一个读线程持有锁数量置为1,主要为了判断读锁的重入
} else if (firstReader == current) {//第一个线程读等于当前线程
firstReaderHoldCount++;//第一个读线程持有锁数量
} else {
HoldCounter rh = cachedHoldCounter;//先读取到最后一个线程持有共享锁的数量
if (rh == null || rh.tid != getThreadId(current))//如果为空或者最后一个线程不是当前线程
cachedHoldCounter = rh = readHolds.get();//从缓存中获取当前线程持有读锁的数量
else if (rh.count == 0)//如果读锁持有数量为0
readHolds.set(rh);//将值设置并保存
rh.count++;//持有线程数+1
}
return 1;//返回1标识获取锁成功
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;//
for (;;) {
int c = getState();//获取当前状态值
if (exclusiveCount(c) != 0) {//如果持有写锁的线程数补位0
if (getExclusiveOwnerThread() != current)//持有写锁的线程不是当前线程
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//读锁是否应该被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)//如果读线程的数量等于最大数量,抛出异常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//cas尝试抢锁
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的unlock方法
public void unlock() {
sync.releaseShared(1);
}
//详情请查看上面AQS的实现
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//如果当前线程是第一个读线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//当前第一个读线程持有的读锁的数量是1
firstReader = null;//直接置位空
else
firstReaderHoldCount--;//否则将持有读锁的数量减1
} else {
HoldCounter rh = cachedHoldCounter;//先获取最后一个读线程的持有信息
if (rh == null || rh.tid != getThreadId(current))//如果rh为空或者最后一个读线程不是当前线程
rh = readHolds.get();//从缓存中获取当前线程的持有锁的信息
int count = rh.count;//获取到数量
if (count <= 1) {//如果持有读锁的数量小于等于1
readHolds.remove();//直接清空缓存
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;//减去持有读锁的数量
}
for (;;) {
int c = getState();//获取当前状态
int nextc = c - SHARED_UNIT;//将读锁的数量减1
if (compareAndSetState(c, nextc))//cas原子性赋值
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;//如果为0表示当前线程已经释放了所有读锁。
}
}
本文章只是JUC 中AQS的一部分,后续的文章会对基于AQS锁实现的子类进行拓展讲解,以上文章内容基于个人以及结合别人文章的理解,如果有问题或者不当之处欢迎大家留言交流。由于为了保证观看流畅性,其中一部分源码有重复的地方。请见谅
Recommend
-
37
-
47
刚刚过去的5月对 唯品会 来说是一个值得纪念的月份: 度过了十周岁的生日、发布了2018Q1财报、上线了 唯品会 男士版...
-
28
0. 前言 作为一个不喜欢写样式的前端,遇到了直接对外的活动页面的需求,一下炸出一堆问题: 单位乱用,rem、vh、vw、px乱用甚至混在一起用 html冗余,有时候一个div只是为了取margin 一个页面用多种布局方案,flex、float、relati
-
19
引子 早在 2019 年底的 KubeConNA 中,Google API 基础设施的架构师 Louis Ryan 就透露了 Istio 控制平面架构将要进行调整的消息。从即将发布的 1.5 版本开始,原本多个独立的组件将会整合在一起,成为一个单体结构。相信每个...
-
10
【罗小布问道】自我救赎系列小讨论(7)——初一说梦给有线拜年:《有线扶贫:看电视换大病保险》罗小布导语:上述观点...
-
6
【罗小布问道】自我救赎系列小讨论(6)——有线5G移动的模仿,不一定是全等三角形式模仿,完全可以是相似三角形式模仿罗小布...
-
6
【罗小布问道】自我救赎系列小讨论(8)——说梦给有线拜晚年《广电“军委”决定组建训练有线“红军”的“蓝军”》罗小布导语...
-
8
【罗小布问道】自我救赎系列小讨论(5)——《有线在5G移动对标中不仅可以模仿,也可以创新》罗小布导语: 上述观点,...
-
3
一文带你了解线程池原理 1.使用线程池的意义何在? 项目开发中,为了统一管理线程,并有效精准地进行排错,我们经常要求项目人员统一使用线程池去创建线程。因为我们是在受不了有些人...
-
3
JUC同步锁原理源码解析四----Semaphore Semaphore 1.Semaphore的来源 A counting semaphore. Conceptually, a semaphore maintains a set of...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK