3

多线程并发:AQS源码分析(2)——共享锁的实现原理 - 一只烤鸭朝北走

 1 year ago
source link: https://www.cnblogs.com/wha6239/p/17131708.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

多线程并发:AQS源码分析(2)——共享锁的实现原理

  在上一篇文章多线程并发(一)中我们通过acquire()详细地分析了AQS中的独占锁的获取流程,提到独占锁,自然少不了共享锁,所以我们这边文章就以AQS中的acquireShared()方法为例,来分析下并发编程中共享锁的获取与释放吧,获取共享锁的大体流程和获取独占锁一样,但是因为共享锁可以被多个线程同时持有,所以共享锁比起独占锁来可能更复杂,文章有点长,静下心来,慢慢读,读完之后可能会使你收获颇多。

   通过上篇文章的分析,我们发现AQS中主要做三件事:1、同步状态的state的获取和释放,即同步状态的管理;2、同步队列的维护;3、线程的阻塞和唤醒,即线程间的协作;AQS中定义了大量的同步状态管理的模板方法,比如acquireShared()就是一个线程获取共享锁的入口方法,我们就从这个方法开始我们的共享锁之旅吧!

  1、acquireShared(int arg)方法:

1 public final void acquireShared(int arg) {
2         //获取共享资源成功直接返回
3         if (tryAcquireShared(arg) < 0)
4             //获取资源不成功执行此方法阻塞
5             doAcquireShared(arg);
6     }

  这个方法是AQS中定义的一个模版方法,也是获取共享锁的入口,调用tryAcquireShared()尝试获取共享锁,如果获取共享锁成功,则此方法直接返回;获取共享锁不成功,则执行doAcquireShared()方法,将当前节点包装成Node节点,加入到同步队列中,进行阻塞,直到被其它线程唤醒了,成功获取到了共享锁再返回。是不是和获取独占锁的流程很类似呢?是的,大致流程基本一致,但是两者最大的区别共享锁在同一时刻只能被一个线程持有,而共享锁在同一时刻可能会被多个线程同时持有,所以共享锁比独占锁更复杂。其中这个tryAcquireShared()方法我们这个模版类中只提供了定义,并没有提供实现,具体实现还需要自定义的同步器去实现。我们接着往下看doAcquireShared()方法:

  2、doAcquireShared方法:

 1     private void doAcquireShared(int arg) {
 2         //将当前线程加入到同步队列中,并标记为共享模式
 3         final Node node = addWaiter(Node.SHARED);
 4         //线程阻塞等待获取共享资源的过程中是否发生了异常
 5         boolean failed = true;
 6         try {
 7             //现在在阻塞等待获取资源的过程中,其它线程对此线程是否发生了中断请求
 8             boolean interrupted = false;
 9            
10             /*自旋,找到合适点的点将当前线程挂起,再寻找合适点的过程中也不断尝试重新获取共享锁,因为可能再这个尝试的过程中,其它线程释放了共享锁*/
11             for (;;) {
12                 //找到当前节点的前继节点
13                 final Node p = node.predecessor();
14                 
15                 //当前节点的前继节点是同步队列的头节点
16                 if (p == head) {
17                     //尝试获取指定量的共享资源
18                     int r = tryAcquireShared(arg);
19                     //当前Node节点的线程成功获取到了共享资源
20                     if (r >= 0) {
21                         //将当前线程Node节点设置为head头节点,并尝试唤醒后面的阻塞节点
22                         setHeadAndPropagate(node, r);
23                         p.next = null; // help GC
24                         //等待获取资源的过程中发生了线程中断的请求是不响应线程中断的,所以这里要将线程中断补上。
25                         if (interrupted)
26                             //获取独占锁是放在acquire()方法中处理的,不过作用都一样。
27                             selfInterrupt();
28                         //表示获取阻塞获取资源的过程中没有发生异常,就不用执行finally中的取消方法了。
29                         failed = false;
30                         return;
31                     }
32                 }
33                 
34                 //找到当前被阻塞线程节点的前继有效节点,将它的状态设置为Node.SIGNAL
35                 if (shouldParkAfterFailedAcquire(p, node) &&
36                         /**
37                          * 找到了有效前继节点并它的状态设置为Node.SIGNAL,那么我们就可以将当前节点park(),
38                          * 等待前继节点释放资源后唤醒它,唤醒之后在进行一次线程中断检测,进入下次"自旋"。
39                          */
40                     parkAndCheckInterrupt())
41                     interrupted = true;
42             }
43         } finally {
44             //阻塞等待获取共享资源的时候发生了异常,需要将当前Node节点出队,上一篇文章中讲过,这里就不再赘述了。
45             if (failed)
46                 cancelAcquire(node);
47         }
48     }

  上面已经说过这个方法主要干两件事情:1、将阻塞的线程包装成Node节点,加入到同步队列中;2、通过一定次数的“自旋”操作,当前线程找到合适的点,将自己挂起,等待其它线程唤醒。

  在寻找这个“合适点”(这个合适点的选择,上篇文章多线程并发(一):以AQS中acquire()方法为例来分析多线程间的同步与协作中提到过,有不清楚的可以在这里找到答案)的过程中,有可能其它线程释放了共享锁,那么当前线程应该检查下有没有资格获取,有资格获取,并且获取成功,那么就将它自己设置为头节点,然后唤醒后继节点之后再返回,至此获取锁的整个流程就完了。

  细心的读者可能发现,这个“自旋”中,将尝试获取锁放在前面,将阻塞判定放在后面执行,现在想想这是不是一个类型do{}while()模型呢,要是第一次直接获取锁成功了,是不是线程就少了一次阻塞----》唤醒的状态转化呢?

  上面方法中也提到过,“如果当前线程节点的前继节点是队列head节点时,我们就可以尝试获取一次共享资源”。为什么当前节点的前继节点不是head节点的时候,就不能尝试获取共享锁呢?这是因为当前这个LCH同步队列是严格按照FIFO出队的,当前节点前继节点不是head, 说明在当前节点之前还有线程被阻塞等待获取共享锁,所以当前线程节点就应该老老实实地等待,等待它的前继节点获取成功共享锁或者释放了共享锁之后,再唤醒它去尝试获取共享锁吧。

  在独占锁模式中,因为锁只能被一个线程持有,所以当同步队列中的一个线程获取了独占锁之后,只需要将它自身设置为头节点,让原来的头节点“出队”就可以了。但是,在共享锁模式下,因为共享锁可以被多个线程同时持有,当前线程获取共享锁成功,并将自身设置为头节点之后,还需判断同步队列中是否有满足唤醒条件的后继节点,如果有则继续唤醒后继节点去竞争共享锁,这个是通过 setHeadAndPropagate()来实现的。

  3、setHeadAndPropagate()方法分析:

private void setHeadAndPropagate(Node node, int propagate) {
        //后继节点成功获取了共享锁,队列的"旧head"还没有改变,将其保存下来,锁定到方法的局部变量做后序的判断使用;
        Node h = head; // Record old head for check below
        /**
         * 将这个获取共享锁成功的后继节点设置为同步队列的“新head”,此时同步队列的head发生变化, 此线程还未唤起任何线程。
         */
        setHead(node);
        /**
         * 1、h == null这个条件什么时候成立呢?仔细翻了下AQS中的源码发现:
         * 这个setHeadAndPropagate()方法只在共享锁模式下,同步队列head的后继节点成功获取了共享锁才会调用。
         * 获取到共享锁的当前线程是同步队列的头结点的后继节点,"旧head"有后继节点,说明同步队列不为空,那么"旧head"也必定不为空,
         * 此方法中第一行通过h == head,在执行setHead(node)方法之前将"旧head"保存了下来,所以h == null必定不会成立,
         * 至于为什么这么写呢? 查阅了下资料网上说"发现这个是防止空指针异常发生的标准写法(既如果要取一个对象的某个属性进行判断的时候,首先对这个对象进行null判断)。"
         * 这说的过去吧?
         * 
         * 2、(h = head) == null这个条件什么时候成立呢?
         * 这个条件也是不可能成立的,下面这种情况应该是最常见的:
         *  (1)、例如有个Semaphore实例s初始化了2个许可,线程A首先调用s.acquire(2)申请了两个许可,成功申请到了许可;
         *  (2)、线程B调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
         *  (3)、线程C调用了s.acquire()方法申请一个许可,申请失败,加入到同步队列;
         *  (4)、线程A调用了s.releaseShared(2)方法释放了两个许可,再调用doReleaseShared()方法,进行同步队列唤醒;
         *  (6)、首先唤醒了同步队列中的线程B,B线程获取到共享锁:
         *      a)、如果此时线程B还未setHead(Node)方法,还未改变同步队列的head头结点,那么线程A的唤醒工作就结束,也仅仅只是唤醒了同步队列中的线程B,
         *              则必定有(h = head) == Node(C) != null成立,线程C的唤醒工作仍然需要线程B去执行;
         *      b)、如果此时线程B执行了SetHead(Node)方法,改变了同步队列的head头结点,那么线程A同时也会唤醒线程C,相当于线程A同时唤醒了线程B和线程C:
         *         1)、如果线程C中的setHeadAndPropagate()在线程B前调用完毕(即线程C执行了setHead()方法改变了同步队列的head),那么 (h = head) == Node(C);
         *         2)、如果线程C中的setHeadAndPropagate()在线程B之后才调用(即线程C此时还未执行setHead()方法,未改变同步队列的head),那么 (h = head) == Node(B)
         *  所以综上所述,只要执行过addWaiter()方法,向同步队列中添加过线程,那么(h = head)== null必定不成立。只能理解为“防止空指针的标准写法”。 
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            /**
             * s == null这种情况是可能存在的,如果当前唤醒的这个node节点是同步队列的尾节点就可能出现node.next == null;
             * s.isShared()指定是共享锁模式,当前线程获取共享锁之后,是需要尝试唤醒同步队列中的其它线程的。
             */
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

  上面提到的这个方法其实就做了两件事情:1、将当前获取共享锁的线程设置为同步队列的头节点;2、根据同步队列头节点head的状态,来决定是否需要唤醒后续节点,符合条件就调用doReleaseShared()方法执行唤醒后续节点的操作。存在的各种情况,也在上面的代码中上面代码中分析过了,下面我们接着往下走来分析doReleaseShared()方法吧! 

  4、doReleaseShared()方法:

 1  private void doReleaseShared() {
 2         for (;;) {
 3             Node h = head;
 4             /**
 5              * h != null保证了队列不为空,h != tail保证了队列中有需要唤醒的节点,
 6              * 如果这不能同时满足说明队列中没有需要唤醒的节点,此时h == head这个条件是成立的,
 7              * 直接跳转到h == head判断中break,此方法结束执行。
 8              */
 9             if(h != null && h != tail) {
10                 int ws = h.waitStatus;
11                 //如果头节点的状态是Node.SIGNAL说明后续有节点是需要唤醒的,
12                 if (ws == Node.SIGNAL) {
13                     /**
14                      * 考虑到共享锁可以被多线程并发持有,可以采用CAS操作,将设置头节点的状态为的0的compareAndSetWaitStatus(h,Node.SIGNAL,0)的操作
15                      * 和unparkSuccessor(h)唤醒后节点的操作绑定在一起,这个CAS操作成功,说明头节点之前肯定是Node.SIGNAL状态,那么后继结点肯定能被唤醒。
16                      */
17                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
18                         continue;            // loop to recheck cases
19                     unparkSuccessor(h);
20                 }
21                 /**
22                  * ws == 0说明头节点的后继节点已经被唤醒或者即将被唤醒。
23                  */
24                 else if (ws == 0 &&
25                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
26                     continue;                // loop on failed CAS
27             }
28             //同步队列的头节点未发生变化,跳出唤醒的动作
29             if (h == head)                   // loop if head changed
30                 break;
31         }
32     }

   这个方法可能是共享锁中最难理解的一个方法了,粗略地读完上面的注释,你是否会有以下几个问题呢?

  Q1、什么时候会调用doReleaseShared()方法呢?

翻了下AQS源码我们发现有两个地方调用:

  1、在获取共享锁acquireShared()方法中,满足一定的条件下(共享锁还可以被同步队列中的其他线程获取情况下),可以调用;

  2、在释放共享锁releaseShared()方法中,释放成功一定会被调用;

  Q2、谁会调用调用doReleaseShared()方法呢?

 在通过上一篇文章我们了解到,独占锁中,只有获取了锁的线程才能调用release释放锁,因此调用unparkSuccessor(h)唤醒后继节点的必然是持有锁的线程,该线程可看做是当前的头节点(虽然在setHead方法中已经将头节点的thread属性设为了null,但是这个头节点曾经代表的就是这个线程);

  而共享锁中,持有锁的线程可以有多个,这些线程都可以调用releaseShared()方法释放锁;假如这些线程都是从同步队列出队获取共享锁的,那么他们必然曾经成为过head或者现在就是head,如果是reReaseShared()中的方法调用doReleaseShared()方法,那么可能现在调用此方法的线程,已经不是同步队列头节点所代表的线程了,头节点可能被易主好多次了。

  Q3、调用该doReleaseShared()方法的目的是什么呢,何时结束这个唤醒操作呢?

无论是在acquireShared()调用,还是在releaseShared()方法中调用,其目的就是在共享锁是可用的状态,唤醒头节点的后继有效节点,竞争共享锁。但是共享锁和独占锁的一个重要区别是:共享锁在头节点发生变化时(说明后继节点已经成功获取了共享锁,并执行了setHead()方法,将其设置为head),会再执行一次自旋唤醒新的头节点的后继节点,去竞争共享锁。

  上面的话是什么意思呢?换句话说:就是当前线程完成后继节点唤醒任务,需要退出的时候,检查了一下头节点,唤醒的这个节点已经是新的头节点了(这个唤醒的节点也成功获取到了共享锁),那么它的后继节点是有资格竞争共享锁的,所以需要继续唤醒它的后续节点,周而复始,直到h == head不再执行后续节点唤醒。

  Q4、什么时候才会发生满足h == head这个条件呢?

 经过分析大概有以下这么几种情况(欢迎大家补充,有不对的地方还请大家指出):

  1、阻塞队列为空,即阻塞队列中没有需要唤醒的节点,满足h == head这个条件。

  2、线程A唤醒了后继线程B,但是线程B并没有获取到共享资源(线程B当然也就不会执行setHead()方法改变同步队列的head了),又发生了线程阻塞,不需要再唤醒后续的线程了,也满足h == head这个条件,那么B的后续线程的唤醒工作应该交给线程B获取资源时候在负责去唤醒吧。

  3、线程A唤醒了线程B,线程B成功获取了资源,还是还未执行到setHead()这个方法;当前线程A,此时判断h == head也成立了,其调用的doReleaseShared()方法结束了,那么将唤醒线程B后续节点的工作,就应该交给刚刚被唤醒的线程B去执行了。

  Q5、ws == 0这个状态怎么理解?什么情况下才会出现ws == 0这个状态呢?

其实我们仔细分析之前的代码我们可以得出以下结论:head的后继节点已经被唤醒或者即将被唤醒,分以下几种情况:

1、有线程A刚释放了锁,刚执行了unparkSuccessor里的if (ws < 0) compareAndSetWaitStatus(node, ws, 0);把head的状态设置为了0,然后尝试唤醒head后继线程B,这里也分3种情况:

              (1)、执行了if (ws < 0) compareAndSetWaitStatus(node, ws, 0),还没有执行LockSupport(this)方法(后继节点中的线程即将被唤醒);

    (2)、head后继线程B获取锁成功,直到head后继线程将自己设置为AQS的新head的这段时间里,head的状态为0(后继节点中的线程已经被唤醒);

    (3)、head后继线程B获取锁失败,直到将head重置为Node.SIGNAL这段时间里,这个head的状态也是为0的(后继节点中的线程已经被唤醒);

  2、同步队列中只有一个head == tail 的dummy node节点,它的状态为0;

  3、在第2中情况上更进一步,同步队列中只有一个head == tail 的dummy node节点,它的状态为0,此时有个线程A获取共享锁失败了,但是只进行了入队操作,还未执行shouldParkAfterFailedAcquire()方法,未将head节点设置未Node.SIGNAL状态,这段时间head的状态也0(后继节点中的线程即将被唤醒);

   综上所述,我们不难看出,其实head.waitStatus == 0 这个状态是一个中间状态,可能会很快改变。后继节点获取共享锁失败了,head节点不会发生变化,只不过很快会将head.waitStatus 设置未Node.SIGNAL;后继结点获取共享锁成功,后继节点会被设置为新的head,假如后继节点不是尾节点,那么必定新的head.waitSatus == Node.SIGNAL,如果后继节点是尾节点,那么必定head.waitStatus == 0,因为没有后续入队节点将它的状态置为Node.SIGNAL。

  Q6、什么时候会出现 ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)为true的情形呢?

读过上面的Q5中的分析,我们可以发现这个条件成立可能有两种状况:

  1)、第一种可能的情况是:有个线程A执行到doReleaseShared()方法,线程B获取共享锁的时候在同步队列中阻塞的,此时有个线程C也执行了doReleaseShared()方法,doReleaseShared()方法调用unparkSuccessor()方法,设置head的ws == 0,此时线程A正好执行到了这个ws == 0的位置,而此时线程B获取共享锁失败,执行shouldParkAfterFailedAcquire()方法,又设置head的ws = Node.SIGNAL,恰好线程A执行到compareAndSetWaitStatus(h,0,Node.PROPAGETE)为false;

  2)、第二种可能的情况是:同步队列中head头节点是刚刚成为头节点的,它的waitStatus值还为0,尾节点是在这之后刚刚加进来的。这种怎么理解呢?同步队列中的“旧的尾节点”状态是0,用线程A表示它,此时线程A刚刚获取到共享锁,将自己设置为头结点head节点,此时有个线程B获取共享锁失败,将自己加入到同步队列中,此时线程B还未执行shouldParkAfterFailedAcquire()方法,改变同步队列头结点head的状态;此时线程A执行了doReleaseShared()中的方法,发现ws == 0,但是恰好就在此时,线程B执行了

shouldParkAfterFailedAcquire()方法,设置head的waitStatus == Node.SIGNAL,紧接着线程A执行执行到compareAndSetWaitStatus(h,0,Node.PROPAGATE)失败了,继续continue进入下次“自旋”。

  由此可见,doReleaseShared()方法中else if 这个分支的 && 连接了两个不一致的状态,分别对应了shouldParkAfterFailedAcquirecompareAndSetWaitStatus(pred, ws, Node.SIGNAL)执行成功前和执行成功后,因为doReleaseSharedshouldParkAfterFailedAcquire是可以并发执行的,所以这一条件是有可能满足的,只是满足的条件非常严苛,可能只是一瞬间的事。

  至于共享锁的释放逻辑,相信看完上面的分析,再去看也不是什么难事,这里就不再赘述了。

  总结:

  • 共享锁的调用框架和独占锁和实现原理非常相似,两者最大不同在于获取锁的逻辑——共享锁可以被多个线程同时持有,而独占锁同一时刻只能被一个线程持有。
  • 由于共享锁同一时刻可以被多个线程持有,因此当头节点获取到共享锁时,可以立即唤醒后继节点来争锁,而不必等到释放锁的时候。因此,共享锁触发唤醒后继节点的行为可能有两处,一处在当前节点成功获得共享锁后,一处在当前节点释放共享锁后。

   鉴于水平有限就只能分析到如此了,如有说的不对的地方,还请大家批评指正,共同交流,共同进步。

  参考文章地址:

  1、https://www.cnblogs.com/waterystone/p/4920797.html

  2、https://blog.csdn.net/anlian523/article/details/106319294

  3、https://segmentfault.com/a/1190000016447307

  4、https://www.cnblogs.com/micrari/p/6937995.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK