3

JUC同步锁原理源码解析二--ReentrantReadWriteLock - bug的自我救赎

 1 year ago
source link: https://www.cnblogs.com/selfsalvation/p/17484555.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

JUC同步锁原理源码解析二----ReentrantReadWriteLock

1.读写锁的来源

​ 在开发场景下,对于写操作我们为了保证原子性所以需要上锁,但是对于读操作,由于其不改变数据,只是单纯对数据进行读取,但是每次都上一把互斥锁,阻塞所有请求。这个明显不符合读多写少的场景。所以将锁分为两把:读锁和写锁。

2.读写锁的底层实现

​ 读写锁的底层实现依旧依赖于AQS。具体底层实现,请查看上一篇文章的介绍

2.AQS源码

Node节点

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
 
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;
       
        volatile Thread thread;

        Node nextWaiter;
}

AbstractQueuedSynchronizer类

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
 	private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;//最重要的一个变量
       
}

ConditionObject类

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

accquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&//尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果获取锁失败,添加到队列中,由于ReentrantLock是独占锁所以节点必须是EXCLUSIVE类型
        selfInterrupt();//添加中断标识位
}

addWaiter方法

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);//新建节点
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;//获取到尾指针
     if (pred != null) {//尾指针不等于空,将当前节点替换为尾指针
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {//采用尾插法,充分利用时间局部性和空间局部性。尾插的节点一般不容易被取消。
             pred.next = node;
             return node;
         }
     }
     enq(node);//cas失败后执行入队操作,继续尝试
     return node;
 }

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;//获取尾指针
        if (t == null) { //代表当前队列没有节点
            if (compareAndSetHead(new Node()))//将当前节点置为头结点
                tail = head;
        } else {//当前队列有节点
            node.prev = t;//
            if (compareAndSetTail(t, node)) {//将当前节点置为尾结点
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到当前节点的前驱节点
            if (p == head && tryAcquire(arg)) {//前驱节点等于头节点尝试cas抢锁。
                setHead(node);//抢锁成功将当前节点设置为头节点
                p.next = null; // help GC  当头结点置空
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//当队列中有节点在等待,判断是否应该阻塞
                parkAndCheckInterrupt())//阻塞等待,检查中断标识位
                interrupted = true;//将中断标识位置为true
        }
    } finally {
        if (failed)//
            cancelAcquire(node);//取消当前节点
    }
}


 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//当前节点为空直接返回
         return;

     node.thread = null;//要取消了将当前节点的线程置为空
     // Skip cancelled predecessors
     Node pred = node.prev;//获取到当前节点的前驱节点
     while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//将当前要取消的节点断链

     node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
         compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驱节点不等于头结点
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
             pred.thread != null) {//前驱节点的线程补位空
             Node next = node.next;//获取当前节点的next指针
             if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
                 compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
         } else {
             unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
         }

         node.next = node; // help GC 将引用指向自身
     }
 }

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//获取当前节点状态
     if (ws < 0)//如果节点为负数也即不是取消节点
         compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//获取到下一个节点
     if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
         s = null;//将s置为空
         for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等于空
         LockSupport.unpark(s.thread);//唤醒当前节点s
 }

shouldParkAfterFailedAcquire方法


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//获取上一个节点的等待状态
    if (ws == Node.SIGNAL)//如果状态为SIGNAL,代表后续节点有节点可以唤醒,可以安心阻塞去
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    if (ws > 0) {//如果当前状态大于0,代表节点为CANCELLED状态
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
            node.prev = pred = pred.prev;//从尾节点开始遍历,找到下一个状态不是CANCELLED的节点。将取消节点断链移除
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //这里需要注意ws>0时,已经找到了一个不是取消状态的前驱节点。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//将找到的不是CANCELLED节点的前驱节点,将其等待状态置为SIGNAL
    }
    return false;
}

cancelAcquire方法

 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//当前节点为空直接返回
         return;

     node.thread = null;//要取消了将当前节点的线程置为空
     // Skip cancelled predecessors
     Node pred = node.prev;//获取到当前节点的前驱节点
     while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//将当前要取消的节点断链

     node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
         compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驱节点不等于头结点
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
             pred.thread != null) {//前驱节点的线程补位空
             Node next = node.next;//获取当前节点的next指针
             if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
                 compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
         } else {
             unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
         }

         node.next = node; // help GC 将引用指向自身
     }
 }

unparkSuccessor方法

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//获取当前节点状态
     if (ws < 0)//如果节点为负数也即不是取消节点
         compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//获取到下一个节点
     if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
         s = null;//将s置为空
         for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等于空
         LockSupport.unpark(s.thread);//唤醒当前节点s
 }

release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {//子类实现如何释放锁
        Node h = head;//获取到头结点
        if (h != null && h.waitStatus != 0)//获取到头结点,如果头结点不为空,等待状态不为0,唤醒后续节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;//获取节点的等待状态
    if (ws < 0)//如果等待状态小于0,标识节点属于有效节点
        compareAndSetWaitStatus(node, ws, 0);//将当前节点的等待状态置为0

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;//获取到下一个节点
    if (s == null || s.waitStatus > 0) {//如果节点是空,或者是取消状态的节点,就找到一个非取消状态的节点,将取消状态的节点断链后由垃圾回收器进行回收
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//节点不用空
        LockSupport.unpark(s.thread);//唤醒当前等待的有效节点S
}

acquireShared方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//由子类实现
        doAcquireShared(arg);
}

doAcquireShared方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//将共享节点也即读线程入队并返回
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到节点的前驱节点
            if (p == head) {//如果前驱节点等于头结点
                int r = tryAcquireShared(arg);//尝试获取共享锁数量
                if (r >= 0) {//如果锁的数量大于0,表示还有多余的共享锁。这里等于0也需要进一步判断。由于如果当执行到这里时,有另外的线程释放了共享锁,如果不进行判断,将会导致释放锁的线程没办法唤醒其他线程。所以这里会伪唤醒一个节点,唤醒的节点后续如果没有锁释放,依旧阻塞在当前parkAndCheckInterrupt方法中
                    setHeadAndPropagate(node, r);//将当前节点的等待状态设置为Propagate。
                    p.next = null; // help GC
                    if (interrupted)//判断是会否中断过
                        selfInterrupt();//设置中断标识位
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//判断是否应该阻塞等待
                parkAndCheckInterrupt方法中())//阻塞并检查中断标识
                interrupted = true;//重置中断标识位
        }
    } finally {
        if (failed)//如果失败
            cancelAcquire(node);//取消节点
    }
}

setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//将当前节点置为头结点
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 //可获取的共享锁也即读锁的数量,对于ReentrantReadWriteLock而言,永远都是1,所以会继续唤醒下一个读线程
            || h == null //如果旧的头结点为空
            || h.waitStatus < 0 ||//头结点的等待状态不为0
            (h = head) == null || h.waitStatus < 0) {//旧头节点不为空并且等待状态小于0也即是有效节点
            Node s = node.next;//获取到node的下一个节点
            if (s == null || s.isShared())//如果node的下一个节点为空或者是共享节点
                doReleaseShared();//唤醒下一个线程
        }
    }

releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//子类实现释放锁
        doReleaseShared();//唤醒后续线程
        return true;//释放成功
    }
    return false;//释放是吧
}

doReleaseShared方法

private void doReleaseShared() {
    /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    for (;;) {
        Node h = head;//获取到当前头结点
        if (h != null && h != tail) {//如果头结点不为空并且不等于尾结点
            int ws = h.waitStatus;//获取当前节点的等待状态
            if (ws == Node.SIGNAL) {//如果状态为SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//cas将SIGNAL状态置为0。SIGNAL标识后续有线程需要唤醒
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//唤醒后续线程
            }
            else if (ws == 0 &&//如果当前状态为0。表示有线程将其置为0
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//cas将0状态置为PROPAGATE。在多个共享锁同时释放时,方便继续进行读传播,唤醒后续节点
                continue;                // loop on failed CAS
        }
        if (h == head)//如果头结点没有改变,证明没有必要继续循环等待了,直接退出吧,如果头结点放生变化,可能有其他线程释放了锁。
            break;
    }
}

总结:AQS提供了统一的模板,对于如何入队出队以及线程的唤醒都由AQS提供默认的实现,只需要子类实现自己上锁和解锁的逻辑。

3.ReentrantReadWriteLock


import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class RWDictionary {
    private final Map<String, String> m = new TreeMap<String, String>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();
 
    public String get(String key) {
      r.lock();
      try { return m.get(key); }
      finally { r.unlock(); }
    }
    public String[] allKeys() {
      r.lock();
      try {
          return (String[])m.keySet().toArray();
      }
      finally { r.unlock(); }
    }
    public String put(String key, String value) {
      w.lock();
      try { return m.put(key, value); }
      finally { w.unlock(); }
    }
    public void clear() {
      w.lock();
      try { m.clear(); }
      finally { w.unlock(); }
    }
}

Sync类

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }//高16位代表共享锁也即读锁
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//低16位代表独占锁也即写锁
}

公平锁的FairSync类实现

//读写锁中公平锁的实现:如果后续队列中有等待的节点,那么进行排队。但是这样的性能并不高,可能造成写饥饿。 
static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

//如果队列中有节点在排队,那么就进行等待
 public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

非公平锁的NonfairSync类实现

//非公平锁的实现相对来说性能较高,不阻塞,可以直接抢锁,防止写饥饿。同时有写线程排队,那么后续的所有读操作都需要进行排队
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge //非公平锁写锁永远不阻塞,防止写饥饿
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
}

//如果队列中有任务并且不是共享节点也即不是读操作,那么返回true,进行阻塞
 final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&//头结点不等于空
            (s = h.next)  != null &&//头结点的下一个节点补位空
            !s.isShared()         &&//并且当前节点不是共享节点,也即读线程
            s.thread != null;//线程不为空
    }

写锁的lock方法:

public void lock() {
    sync.acquire(1);
}

//同样调用acquire的方法,请参考上文AQS的acquire方法的实现。同样都是获取失败进行排队

//子类获取锁的过程
protected final boolean tryAcquire(int acquires) {
    /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
    Thread current = Thread.currentThread();//获取当前线程
    int c = getState();//获取当前state状态值
    int w = exclusiveCount(c);//获取读锁的数量,这里doug lea一直都是使用二进制的风格
    if (c != 0) {//代表持有锁
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())//如果写锁为0,或则当前线程不是写锁的持有线程
            return false;//返回false
        if (w + exclusiveCount(acquires) > MAX_COUNT)//当前锁重入次数大于最大值,抛出异常
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);//直接设置状态,代表锁重入次数加1
        return true;//返回获取锁成功
    }
    if (writerShouldBlock() ||//写锁是否应该阻塞
        !compareAndSetState(c, c + acquires))//cas尝试获取锁
        return false;
    setExclusiveOwnerThread(current);//cas抢锁成功,直接将写锁的持有线程设置为当前线程
    return true;//返回true代表获取锁成功
}

writerShouldBlock的实现

//writerShouldBlock的实现请参考上面的公平锁和非公平锁的实现

写锁的unlock方法:

public void unlock() {
    sync.release(1);
}

//这里是AQS的模板方法
public final boolean release(int arg) {
    if (tryRelease(arg)) {//子类实现如何释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())//如果不是独占锁也即写锁,直接抛异常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;//将锁的数量减少
    boolean free = exclusiveCount(nextc) == 0;//如果独占锁也即写锁数量为0,标识已经释放。大于1是因为锁重入的问题
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

读锁的lock方法:

public void lock() {
    sync.acquireShared(1);
}

//acquireShared方法请看上面AQS的实现
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}


protected final int tryAcquireShared(int unused) {
    /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
    Thread current = Thread.currentThread();//获取当前线程
    int c = getState();//获取当前state状态
    if (exclusiveCount(c) != 0 &&//如果独占锁也即写锁的数量不等于0
        getExclusiveOwnerThread() != current)//独占锁的拥有线程不是当前线程
        return -1;//直接返回-1,获取失败
    int r = sharedCount(c);//获取共享锁的数量
    if (!readerShouldBlock() &&//读锁是否应该被阻塞
        r < MAX_COUNT &&//读锁数量是否小于最大数量
        compareAndSetState(c, c + SHARED_UNIT)) {//cas将读锁+1
        if (r == 0) {//当前共享锁也即读锁数量为0
            firstReader = current;//标记为第一个读线程
            firstReaderHoldCount = 1;//将第一个读线程持有锁数量置为1,主要为了判断读锁的重入
        } else if (firstReader == current) {//第一个线程读等于当前线程
            firstReaderHoldCount++;//第一个读线程持有锁数量
        } else {
            HoldCounter rh = cachedHoldCounter;//先读取到最后一个线程持有共享锁的数量
            if (rh == null || rh.tid != getThreadId(current))//如果为空或者最后一个线程不是当前线程
                cachedHoldCounter = rh = readHolds.get();//从缓存中获取当前线程持有读锁的数量
            else if (rh.count == 0)//如果读锁持有数量为0
                readHolds.set(rh);//将值设置并保存
            rh.count++;//持有线程数+1
        }
        return 1;//返回1标识获取锁成功
    }
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
    HoldCounter rh = null;//
    for (;;) {
        int c = getState();//获取当前状态值
        if (exclusiveCount(c) != 0) {//如果持有写锁的线程数补位0
            if (getExclusiveOwnerThread() != current)//持有写锁的线程不是当前线程
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {//读锁是否应该被阻塞
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)//如果读线程的数量等于最大数量,抛出异常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {//cas尝试抢锁
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

读锁的unlock方法

public void unlock() {
    sync.releaseShared(1);
}

//详情请查看上面AQS的实现
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//如果当前线程是第一个读线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)//当前第一个读线程持有的读锁的数量是1
            firstReader = null;//直接置位空
        else
            firstReaderHoldCount--;//否则将持有读锁的数量减1
    } else {
        HoldCounter rh = cachedHoldCounter;//先获取最后一个读线程的持有信息
        if (rh == null || rh.tid != getThreadId(current))//如果rh为空或者最后一个读线程不是当前线程
            rh = readHolds.get();//从缓存中获取当前线程的持有锁的信息
        int count = rh.count;//获取到数量
        if (count <= 1) {//如果持有读锁的数量小于等于1
            readHolds.remove();//直接清空缓存
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;//减去持有读锁的数量
    }
    for (;;) {
        int c = getState();//获取当前状态
        int nextc = c - SHARED_UNIT;//将读锁的数量减1
        if (compareAndSetState(c, nextc))//cas原子性赋值
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;//如果为0表示当前线程已经释放了所有读锁。
    }
}

本文章只是JUC 中AQS的一部分,后续的文章会对基于AQS锁实现的子类进行拓展讲解,以上文章内容基于个人以及结合别人文章的理解,如果有问题或者不当之处欢迎大家留言交流。由于为了保证观看流畅性,其中一部分源码有重复的地方。请见谅


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK