5

深入理解AQS--jdk层面管程实现【管程详解的补充】 - 忧愁的chafry

 1 year ago
source link: https://www.cnblogs.com/chafry/p/16756929.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

什么是AQS

  1.java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。

  2.JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的

    【1】一般是通过一个内部类Sync继承 AQS

    【2】将同步器所有调用都映射到Sync对应的方法

AQS具备的特性:

  1.阻塞等待队列  , 2.共享/独占  , 3.公平/非公平  , 4.可重入  , 5.允许中断 

AQS定义两种资源共享方式

  1.Exclusive-独占,只有一个线程能执行,如ReentrantLock(详情可查看 深入理解ReentrantLock类锁

  2.Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

AQS定义两种队列

  1.同步等待队列【主要用于维护获取锁失败时入队的线程】

    【1】AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。   

    【2】AQS 依赖CLH同步队列来完成同步状态的管理:

      1)当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程

      2)当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

      3)通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

    【3】图示:

           

2168218-20221006050929334-781954454.png

  2.条件等待队列【调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁】

    【1】AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

      1)调用await方法阻塞线程;

      2)当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

  3.AQS 定义了5个队列中节点状态:

    1)值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。

    2)CANCELLED,值为1,表示当前的线程被取消;

    3)SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;

    4)CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;

    5)PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

源码详解(将源码拆分为三块,抽象同步器AbstractQueuedSynchronizer类,节点Node类,条件对象ConditionObject类)

  AbstractQueuedSynchronizer类解析

    1.属性值解析

//用链表来表示队列
private transient volatile Node head;
private transient volatile Node tail;

private volatile int state;  //可以表示锁的加锁状态【独占锁只为1,共享锁可以大于1】,又可以表示锁的重入次数,0为没有加锁

    2.方法解析

//定义了主体的大体逻辑,如入队,如尝试加锁
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

//模板方法的处理,如果子类没有实现,则子类中调用的话会报错
//提供给子类去实现的公平与非公平的逻辑
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//释放锁的逻辑
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

  Node类详解

    1.代码展示

static final class Node {

    static final Node SHARED = new Node();  // 共享模式标记
    static final Node EXCLUSIVE = null;     // 独占模式标记

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    //值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
    //CANCELLED,值为1,表示当前的线程被取消;
    //SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
    //CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
    //PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
    volatile int waitStatus;

    
    volatile Node prev;//前驱结点
    volatile Node next;//后继结点
    volatile Thread thread; //与节点绑定的线程
    Node nextWaiter; // 存储condition队列中的后继节点

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {}

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

  Condition接口详解

    1.代码展示

//Condition用来替代synchronized锁的监视器的功能,而且更加灵活
//一个Condition实例需要与一个lock进行绑定
public interface Condition {
    //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断
    void await() throws InterruptedException;

    //调用此方法的线程将加入等待队列,阻塞直到被通知(线程中断忽略)
    void awaitUninterruptibly();

    //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或等待超时
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或等待超时
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    //调用此方法的线程将加入等待队列,阻塞直到被通知或者线程中断或超出指定日期
    boolean awaitUntil(Date deadline) throws InterruptedException;

    //唤醒一个等待中的线程
    void signal();

    //唤醒所以等待中的线程
    void signalAll();
}

    2.发现说明

      【1】在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。Condition的强大之处在于,对于一个锁,我们可以为多个线程间建立不同的Condition。如果采用Object类中的wait(), notify(), notifyAll()实现的话,当写入数据之后需要唤醒读线程时,不可能通过notify()或notifyAll()明确的指定唤醒读线程,而只能通过notifyAll唤醒所有线程,但是notifyAll无法区分唤醒的线程是读线程,还是写线程。所以,通过Condition能够更加精细的控制多线程的休眠与唤醒。

      【2】但,condition的使用必须依赖于lock对象,通过lock对象的newCondition()方法初始化一个condition对象。

  ConditionObject类详解【Condition接口的实现类】

    1.属性值解析

//由头尾两个节点指针形成的链表来达到队列的效果
private transient Node firstWaiter;
private transient Node lastWaiter;

    2.方法解析

      【1】核心await方法

public final void await() throws InterruptedException {
    //如果线程中断,直接抛出异常  
    if (Thread.interrupted())
        throw new InterruptedException();

    //进入等待队列中
    Node node = addConditionWaiter();
    //释放当前线程持有的锁,并获取当前同步器状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果不在同步队列中,那么直接阻塞当前线程;直到被唤醒时,加入到同步队列中
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //此时已经被唤醒,那么尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //如果节点中断取消,那么清除节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

//addConditionWaiter将一个节点添加到condition队列中。在入队时,判断当前尾节点是不是CONDITION。如果不是则判断当前尾节点已经被取消,将当前节点出队。那么也就是说在队列中的节点状态,要么是CONDITION,要么是CANCELLED
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

//方法的作用是移除取消的节点。方法本身只有在持有锁的时候会被调用。方法会遍历当前condition队列,将所有非Condition状态的节点移除。
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

      【2】核心signal方法与signalAll方法

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

final boolean transferForSignal(Node node) {
    //如果不能更改waitStatus,则表示该节点已被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK