2

【一知半解】AQS - Hitechr

 2 years ago
source link: https://www.cnblogs.com/hitechr/p/16474402.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

什么是AbstractQueuedSynchronizer(AQS)

字面意思是抽象队列同步器,使用一个voliate修饰的int类型同步状态,通过一个FIFO队列完成资源获取的排队工作,把每个参与资源竞争的线程封装成一个Node节点来实现锁的分配。

AbstractQueuedSynchronizer源码

public abstract class AbstractQueuedSynchronizer 
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient volatile Node head;//链表头
    private transient volatile Node tail;//链表尾
    private transient Thread exclusiveOwnerThread;//持有锁的线程
    private volatile int state;//同步状态,0表示当前没有线程获取到锁
    static final class Node {//链表的Node节点类
      volatile int waitStatus;//当前节点在队列中的状态
      volatile Node prev;//前置节点
      volatile Node next;//后置节点
      volatile Thread thread;//当前线程
    }
}

AQS同步队列的基本结构

1336600-20220713163322354-449426838.png

Node.waitStatus的说明

状态值 描述
0 节点默认的初始值
SIGNAL=-1 线程已经准备好,等待释放资源
CANCELLED=1 获取锁的请求线程被取消
CONDITION=-2 节点在队列中,等待唤醒

state为什么要用volatile修饰?

  1. 可见性,一个线程对变量的修改可以立即被别的线程感知到
  2. 有序性,禁止指令重排

AQS获取锁步骤

  public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }
  1. 当一个线程获取锁时,首先判断state状态值是否为0
  2. 如果state==0,则通过CAS的方式修改为非0状态
  3. 修改成功,则表明获取锁成功,执行业务代码
  4. 修改失败,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程
  5. 如果state!=0,则把当前线程封装为一个Node节点,加入到队列中并挂起当前线程

AQS获取锁过程

首先调用tryAcquire去修state的状态值,成功就获取当前锁;失败则加入当前等待队列中,然后挂起线程。

tryAcquire

AQS的源码中tryAcquire是一空实现,需要它的子类去实现这个空方法。因为在AQS中虽然公平锁非公平锁的都是基于一个CLH去实现,但是在获取锁的过程中略有不同。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

公平锁FairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            int c = getState();//获取同步器的状态
            if (c == 0) {//当前没有线程获取到锁
                //首先判断祖宗节点的线程是否当前线程一样
                if (!hasQueuedPredecessors() &&
                    //更改state的状态值为非0
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果锁持有者的线程是当前线程,则可放行,锁的重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
/**
* 判断祖宗节点的线程是否当前线程一样
* 傀儡节点的下个节点
*/
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    //头节点的下个节点所持有的线程是否与当前线程相同
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

非公平锁NonfairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //通过CAS更改state的状态值
        if (compareAndSetState(0, acquires)) {
            //把当前线程设置为锁的持有者
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果锁持有者的线程是当前线程,则可放行,锁的重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
} 
1336600-20220713163322314-138546421.png

对比后发现,公平锁先判断是否有老祖宗节点,如果有则返回false;如果当前线程对应的node就是老祖宗节点,则直接去修改state状态,把state改为非0。

addWaiter

获取锁成功的线程去执行业务逻辑了,获取锁失败的线程则会在队列中排队等候,每个等候的线程也都不安分的。

private Node addWaiter(Node mode) {
    //把当前线程封装为一个Node节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //加入到队列的尾部
    enq(node);
    return node;
}
  1. 把当前线程封装为一个Node节点
  2. 当第一次执行这个方法时,由于head和tail都还没有赋值,则pred指向的tail也是空,所以直接直到enq(node)
  3. 当pred指向的tail不为空时,则通过CAS的方式加入到尾部,如果成功直接返回;如果失败,则进入enq(node)通过自旋的方式加入。
//通过自旋的方式将节点加入到节点的尾部
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

为了操作链表的方便,一般都要在链表的头前加入一个傀儡节点,AQS的链表也不例外。
先创建一个傀儡节点,并把head、tail均指向它,然后再把node节点加入到尾部后面,移动tail的指向。

acquireQueued

当节点成功加入到链表的尾部后,等待被唤醒,然后通过自旋的方式去获取锁

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //当前节点的前置节点
            final Node p = node.predecessor();
            //如果前置节点是傀儡节点(head指向傀儡节点),则再次尝试去获取锁
            if (p == head && tryAcquire(arg)) {
                //获取成功后,则移除之前的傀儡节点,head指向当前node,
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //获取锁失败后,设置node节点的状态,并挂起当前节点
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 获取node节点的前置节点,如果前置节点是head,则再次尝试去获取锁
  2. 设置当前node节点的前置节点状态为-1(表示后续节点正在等待状态,默认是0),然后通过自旋的后会进行到parkAndCheckInterrupt挂起当前节点
  3. LockSupport.park(this)执行完事,当前线程会一直阻塞到这个地方
  4. 当前唤醒时再次从1开始执行
1336600-20220713163322270-639899469.png

AQS释放锁过程

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

主要是恢复state的值、重置锁持有都线程,然后唤醒挂起的线程。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //当前线程与锁持有者线程不一样会报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//重入的次数为0时,则当前线程已经没有重入了,可以清空锁的持有者
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

恢复state状态的值,如果重入次数为0时,则清空锁的持有都为null

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)//唤醒下个node对应的线程
            LockSupport.unpark(s.thread);
    }

设置head指向的node节点的watiStatus的状态值,然后找到下个节点对应的线程并唤醒。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK