3

AQS 源码详解

 1 year ago
source link: https://www.extlight.com/2023/03/01/AQS-%E6%BA%90%E7%A0%81%E8%AF%A6%E8%A7%A3/;jsessionid=E22824904138BE5C0EF8AA1A045EC31C
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

AQS 是抽象的队列同步器,是用来构建锁或其他同步组件的重量级基础框架及整个 JUC 体系的基石。

二、相关组件

下边的组件都是基于 AQS 框架扩展实现的:

  • ReentrantLock:可重入锁,避免多线程竞争资源的安全问题
  • Semaphore:信号量,限制多线程的访问数量
  • CountDownLatch:计数器,用于线程之间的等待场景(如线程A等待其他多个线程完成任务后,线程A才能执行自己的任务)
  • CyclicBarrier:回环栅栏,用于线程之间的等待场景(如在一组线程中,如果线程A执行到代码段S点就会停下等待,等到组内其他线程都执行到S点时它们才会立刻一起执行剩余的任务)

虽然这些组件在多线程场景下有不同的作用,但代码中也有相似之处,如都需要管理锁状态,维护阻塞线程,维护唤醒线程。而 AQS 的作用就是将这些相似的、公共的代码封装在一起。

三、运行原理

AQS 使用一个 volatile 的 int 类型的 state 变量来表示锁竞争状态,将每条要去抢占资源的线程封装成一个 Node 节点放入到内置的 CLH 同步队列(FIFO 双向队列)来维护排队工作,通过 CASstate 值进行修改。

我们常说的 AQS 指的是 java.util.concurrent.locks.AbstractQueuedSynchronizer 类,其源码的核心如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

static final class Node {
// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 工作线程
volatile Thread thread;

...
}

// 头结点
private transient volatile Node head;

// 尾节点
private transient volatile Node tail;

// 同步状态,默认值 0,说明 0:资源未抢占 1:资源已抢占
private volatile int state;

...
}

注意:源码中有个状态:一个是 state,针对资源抢占的状态;另一个是 waitStatus,针对 node 节点的状态。

将上文的源码转成图形,可便于我们理解,加深记忆,其运行模型图如下:

mark

四、源码分析

文中涉及到 CAS 和 LockSupport 相关内容,不清楚的读者可以先跳至末尾,浏览相关的参考资料。

由于 AQS 并非单独使用,为了完整的讲解 AQS 的源码,本篇章以 ReentrantLock 组件为例,一步一步揭开 AQS 如何作为基石使用的。

public class LockTest {

public static void main(String[] args) {

Lock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t1");

Thread t2 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t2");

Thread t3 = new Thread(() -> {
lock.lock();
// 业务代码
System.out.println(Thread.currentThread().getName() + " 开始工作");
lock.unlock();
}, "t3");

t1.start();
t2.start();
t3.start();

}
}

创建一把非公平锁,3 个线程通过抢占锁来执行任务。

此时,AQS 的模型如下:

mark

其中,state 表示锁的抢占状态,ownerThread 表示抢占锁的线程。

进入 lock() 方法,来到 ReentrantLock 源码中:

public class ReentrantLock implements Lock, java.io.Serializable {

...

public void lock() {
sync.lock();
}

abstract static class Sync extends AbstractQueuedSynchronizer {
// 锁定
abstract void lock();

// 尝试获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
...
}

// 尝试释放锁
protected final boolean tryRelease(int releases) {
...
}

...
}

...
}

可以看出 lock() 方法是通过 Sync 类来实现的。而 Sync 是一个抽象的静态内部类,它继承了 AbstractQueuedSynchronizer 类,因此它具备了 AQS 的“能力”。

Q1: Sync 定义了抽象的 lock() 方法,需要通过其子类来实现具体的锁方式(模板方法模式)。为何要这要设计的? 当然是为了方便扩展。

我们都知道通过 ReentrantLock 类能创建公平锁和非公平锁,其原因是 Sync 有两个实现类: FairSyncNonfairSync,它们都实现了 lock() 的具体细节。案例中,我们创建出的 lock 实例底层使用到的就是 NonfairSync 的实例(多态特性),即非公平锁。假设哪天 ReentrantLock 需要新增第三种锁,只需新增个子类继承 Sync ,实现 lock() 方法即可。

演示案例中我们创建的是非公平锁,我们来看看 Sync 对应非公平锁的子类 NonfairSync ,它同样是定义在 ReentrantLock 的静态内部类:

public class ReentrantLock implements Lock, java.io.Serializable {

...

// 非公平锁实现
static final class NonfairSync extends Sync {
final void lock() {
// (1)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// (2)
acquire(1);
}

// (3)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

...
}

我们开始沿着演示案例讲解:

假设 t1 线程率先启动获取到 CPU 资源调用了 lock() 方法,执行到 (1) 处,即 compareAndSetState(0, 1),该方法来自 AQS。通过 CAS 方式将 AQS 中的 state 值变成 1,执行成功返回 true。

由于 t1 是第一条执行的线程,结果肯定返回 true,然后将 ownerThread 设置为当前线程 t1。最终整个 lock() 方法也成功返回,获取锁成功,执行自己的业务代码。

此时,AQS 的模型如下:

mark

这时 t2 和 t3 线程也启动,“轮流” 执行到了 (1) 处,判断肯定失败,然后执行 (2),即 acquire(1) 方法,该方法来自 AQS

我们进到 acquire(1) 方法中:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

public final void acquire(int arg) {
// (4)
if (!tryAcquire(arg) &&
// (5):addWaiter
// (6): acquireQueued
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// (7)
selfInterrupt();
}

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

...
}

假设当前线程为 t2,执行到 (4) 处,即 tryAcquire(arg) 方法,用于尝试获取锁。它是个抽象方法,由子类 NonfairSync 实现,NonfairSynctryAcquire 方法最终调用其父类 SyncnonfairTryAcquire() 方法来实现,可以返回至 (3) 处查看。

abstract static class Sync extends AbstractQueuedSynchronizer {

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// (8)
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

t2 线程进入到 nonfairTryAcquire() 方法,先获取当前线程(t2 线程),然后执行到 (8) 处,即 getState(),此方法来自 AQS,用于返回 AQSstate 状态。

由于 t1 线程没有释放锁,因此 state = 1OwnerThread = t1 ,下边的 if 判断都不成立,最终方法返回 false。

回到 acquire() 方法中,由于返回 false,!tryAcquire(arg) 判断成立,t2 线程会继续执行到 (5) 处,即 addWaiter(Node.EXCLUSIVE),该方法用于将线程封装到 Node 节点中。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// (9)
if (pred != null) {
node.prev = pred;
// (10)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// (11)
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// (12)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// (13)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

...
}

t2 线程进入到 addWaiter 方法中,先被封装到 node1 节点中,由于是首个线程被封装成 Node,因此 tail 和 pred 必定为 null,(9) 处的判断不成立,执行到 (11) 处,即 enq() 方法。该方法中开启无限循环,通过 CAS 方式设置 CLH 队列的头结点/尾节点。

第一次循环,由于 tail 为空,因此线程执行到 (12) 处,创建一个傀儡节点(无数据,用于占位)设置为头结点和尾节点。

此时,AQS 的模型图如下:

mark

由于没有遇到循环终止的指令,将执行下一次循环。

在第二次循环中,尾节点不为空,因此进入(13) 处,将 node1 节点设置成尾节点,同时前后两个 node 节点建立关系,最终返回 node1 节点。

此时,AQS 的模型图如下:

mark

返回的 node1 节点后,t2 线程开始执行 (6) ,node1 节点被当作参数传入到 acquireQueued() 方法中,该方法用于将节点放入到 CLH 队列中,将其线程挂起等待。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// (14)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// (15): shouldParkAfterFailedAcquire
// (16): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

...
}

acquireQueued() 方法中也开启无限循环。在循环中,node1 节点先获取它的前驱节点(傀儡节点),然后判断是否为头结点,是则调用 tryAcquire()尝试获取锁,该方法在 (4) 处出现过一次,此处不再赘述。

由于 t1 线程还没有释放锁,(14) 处的最终判断肯定为 false。t2 执行到 (15) 处,即 shouldParkAfterFailedAcquire(),该方法用于修改 node1 节点的前驱节点的 waitStatus 状态。

Node 节点的 waitStatus 有以下 5 种状态:

状态说明
0初始状态
1线程已取消
-1当前节点封装的线程释放锁后,会唤醒后继节点的线程
-2线程处在等待状态,在等待队列中
-3表示下一次的共享状态会被无条件的传播下去
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的 waitStatus 值
int ws = pred.waitStatus;
// (17) 状态为 -1
if (ws == Node.SIGNAL)

return true;
if (ws > 0) {
// (18) 状态值 > 0,即线程被取消,从后往前遍历节点,删除已取消状态的线程的节点

do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// (19) 将前驱节点的 waitStatus 值改成 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

...
}

t2 线程进入 shouldParkAfterFailedAcquire() 方法中, 获取 node1 前驱节点的 waitStatus 状态,值为 0,直接跳到 (19) 处,通过 CAS 方式将 waitStatus 值改成 -1,最终的方法是返回 false 的。

此时,AQS 的模型图如下:

mark

Q2: 为何在创建 node 节点封装线程时,不直接将 waitStatus 的值设置成 -1,而是专门定义这个方法进行修改? 答案我们留在下文解答。

shouldParkAfterFailedAcquire() 方法返回 false,t2 线程回到 acquireQueued() 方法中,由于之前是在无限循环中进行的,没有遇到终止指定,因此 t2 线程将执行第二次循环的操作。

毫无疑问,t2 线程又会再次执行 shouldParkAfterFailedAcquire() 方法,此时 node1 的前驱节点的 waitStatus = -1,最终方法返回 true。随后 t2 开始执行 (16),即 parkAndCheckInterrupt() 方法,用于线程等待。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// (20)
return Thread.interrupted();
}

...
}

方法里边调用了 LockSupport.park(this),t2 线程立即被挂起,变成 WAIT 状态(此处的状态是线程的状态,与 AQS 中的 state 状态,Node 节点的 waitStatus 无关)。

假设 t1 线程仍未释放锁,轮到 t3 线程运行,不出意外其运行的最终结果也是被封装到 node2 节点,放到 CLH 队列中被挂起等待。

此时, AQS 的模型图如下:

loading.gif

现在只有 t1 线程处在运行状态,当它运行完业务代码,随后执行 unlock() 方法:

public class ReentrantLock implements Lock, java.io.Serializable {

...

public void unlock() {
sync.release(1);
}

...

}

方法通过调用 Syncrelease() 方法来实现,而 release() 方法来自 AQS

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

public final boolean release(int arg) {
// (21)
if (tryRelease(arg)) {
Node h = head;
// (22)
if (h != null && h.waitStatus != 0)
// (23)
unparkSuccessor(h);
return true;
}
return false;
}

...
}

当 t1 线程执行到 (21) 处,即 tryRelease(arg) 方法,该方法都抽象方法,由 Sync 子类实现:

public class ReentrantLock implements Lock, java.io.Serializable {

...

abstract static class Sync extends AbstractQueuedSynchronizer {

...

// 尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

}

...
}

进入该方法:

  1. 先获取 AQS 状态 state = 1,减去入参(值为 1),结果 c = 0。
  2. 判断当前线程(t1) 是否不等于 OwnerThread 的线程(t1),显然是相等的。
  3. 判断 c 是否为 0,判断成立设置返回值 free 为 true, 将 OwnerThread 线程设置为 null。
  4. 修改 AQS 状态为 c 的值,即 AQSstate = 0
  5. 最终返回 free。

经过上述的操作,此时 AQS 模型图为:

loading.gif

方法执行完回到 (21),条件判断成立,进入 if 方法体中:

  1. 获取头结点(值为 dummy 节点)
  2. 头节点进行非空判断 和 waitStatus 的非零判断(值为 -1),判断成立进入到 (23) 处,即 unparkSuccessor() 方法,用于唤醒下个节点的线程。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

...

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 还原 waitStatus 状态为 0
compareAndSetWaitStatus(node, ws, 0);
// (24)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从后往前找 waitStatus <= 0 的节点(剔除取消状态的线程节点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}

// (25)
if (s != null)
// 唤醒后继节点封装的线程
LockSupport.unpark(s.thread);
}

...

}

头结点被当作参数传入到 unparkSuccessor() 方法中,该方法执行 3 个操作:

  1. 将头结点的 waitStatus 状态恢复成 0。
  2. 获取头结点的后继节点,如果后继节点为空或 waitStatus 状态为已取消,则从后往前遍历 CLH 队列,获取非取消状态的节点。
  3. 唤醒后继节点中封装的线程。

回到案例中,线程执行到 (24) 处,获取的后继节点为 node1(封装 t2 线程)。来到 (25) 处,唤醒 node1 中的 t2 线程。

讲解到此处,大家是否还记得 Q2:为何在创建 Node 节点封装线程时,不直接将 waitStatus 的值设置成 -1。

我们以创建的 Node 节点时, waitStatus 值设置为 -1 为前提,进行案例推演:

  1. 依然是 3 个线程在运行,t1 线程先拿到锁执行业务代码。
  2. CPU 切换到 t2 线程,执行到 (5) 处,即 addWaiter() ,被封装到 node1 节点中(waitStatus = -1),此时 node1 节点已经和头结点建立关系。
  3. 在 t2 线程执行 (16) 处,即在执行 parkAndCheckInterrupt() 方法,t2 线程要被挂起之前,CPU 又切换至 t1 线程。
  4. t1 线程执行完业务代码,要释放锁时,会执行 (25) 处代码 LockSupport.unpark(s.thread); ,唤醒 t2 。
  5. 但实际上 t2 线程并没有被挂起等待,如果某个线程被提前 unpark(thread),那么当该 thread 线程调用 park() 时是不会被挂起等待的,这样锁的机制就乱套了(线程未获取到锁,但又不挂起等待)。

因此,Node 节点的 waitStatus 值不能一开始被设置的成 -1。

回到正常案例中,t1 线程唤醒 t2 线程结束任务。t2 线程被唤醒并拿到 CPU 资源,执行到 (20) 处,即 Thread.interrupted() 方法,检测当前线程是否被中断,t2 线程并未中断,因此 parkAndCheckInterrupt() 方法返回 false。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...

final boolean (final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// (14)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// (15): shouldParkAfterFailedAcquire
// (16): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

...
}

t2 线程将在 acquireQueued() 方法中进行第三次循环操作:

  1. 获取前驱节点 oldHead (dummy)
  2. 前驱节点是否为头结点,是则尝试获取锁。因为 AQSstate 已恢复成 0,因此 t2 可以成功获取到锁(修改 state = 1ownerThread = t2)。
  3. 将 node1 节点设置为头结点(将前驱节点设置为 null,封装的线程设置为 null)
  4. 将 oldHead 节点的后继节点设置为 null。

此时,AQS 的模型图如下:

loading.gif

t2 线程获取到锁执行任务,之后释放锁。。。

t3 线程之后的执行流程与 t2 类似,此处也不在赘述。

五、执行流程

最后附加代码执行流程图:

loading.gif

流程图并未充分展示执行的细节,最终还得需要读者自行阅读源码加深理解。

六、参考资料

CAS 原理新讲

LockSupport 工具介绍


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK