基础 | 并发编程 - [AQS]
INDEX
- §1 AbstractQueuedSynchronizer
- §2 AbstractQueuedSynchronizer 的体系
- §3 `AbstractQueuedSynchronizer` 的结构
- §3.1 字段
- §3.2 CLH lock queue 等待队列
- §3.3 `AbstractQueuedSynchronizer.Node`
- §4 流程 (以 `ReentrantLock` 为例)
§1 AbstractQueuedSynchronizer
- 可以直接翻译成:抽象的、=(依赖)队列(结构)的 同步器
- java.util.concurrent.locks 包下 抽象类,有一门三父子,注意
AbstractOwnableSynchronizer
才是相对父级的AbstractOwnableSynchronizer
AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer
- 用于构建 锁 或 其他同步器组件
锁和其他同步器的是基于它实现的 - 通过 队列(CLH) 对抢锁的线程进行排队
- 通过 int 表示持有锁的状态
§2 AbstractQueuedSynchronizer 的体系
§3 AbstractQueuedSynchronizer
的结构
§3.1 字段
首先是一些父类字段,在 AbstractOwnableSynchronizer
中
//The current owner of exclusive mode synchronization
//哪个线程,现在独占此同步器
private transient Thread exclusiveOwnerThread;
然后是 AbstractQueuedSynchronizer
自己的主要字段
private volatile int state;
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
exclusiveOwnerThread 独占线程
exclusiveOwnerThread
在 AQS 的父类 AbstractOwnableSynchronizer
中声明
若 线程 x 申请 AQS 成功,则 exclusiveOwnerThread==x
state 同步状态
state
表示当前 AbstractQueuedSynchronizer
的状态,即 AQS 是否立即可用
也可以理解为 新加入的线程是否需要排队
- 0
立即可用,新加入的线程可以尝试获取锁 - >0
非立即可用,新加入的线程需要排队
CLH lock queue
head
和tail
两个字段其实是对 AQS 中等候队列的声明,详见 §3.2
§3.2 CLH lock queue 等待队列
队列的声明
head
和tail
实际上是一个变形的 CLH lock queue 的头尾节点,CLH 是三个人名 Craig & Landin & Hagersten
CLH lock queue 本质上是一个双向链表,所以 head / tail 也可以理解成队列本尊
CLH lock queue 特性如下:
- 遵循 先进先出 ,因为这是个双向队列,仅从数据结构上其实可以做到随意
- tail 追加
新的需要获取锁的线程从队尾加入队列 - CLH 为空时,新加入的线程立即获得锁
- CLH 中线程监听前一个线程的状态
CLH 中的线程从加入时起开始 本地自旋
自旋中需要做的事是获取前面线程的状态 - 头结点会尝试获取锁
只是尝试获取,说明头结点具有获取锁的优先权
但不保证一定可以获取,获取失败则重新等待 - CLH 添加元素的操作,都是 CAS 并且 自旋 的
这是为了确保添加元素时,若遇得到并发情况
只会后一个 Node 成功的成为新的 head / tail,但所有并发的线程最终都会成功
§3.3 AbstractQueuedSynchronizer.Node
Node
是需求抢锁,但是没抢成功需要排队的线程的封装
AQS 等候队列中的 Node,相当于是一个 waiter(在等轮到自己好抢锁)
结构
AbstractQueuedSynchronizer.Node
的主要字段如下
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
waitStatus
通常是下一个 Node(waiter)的等候状态,
- = 0,默认值,通常说明 Node 后面没有 Node 或后面的 Node 刚刚加入队列
- > 0 ,取消,是 当前节点 被取消了
- = -1,下一个 Node 可以安全阻塞
Node 不需要尝试申请同步器或申请失败后,需要阻塞 Node
但阻塞前,需要先给 Node 一个申请释放的信号,已经收到信息的 Node 才能安全的阻塞
这个信号会标记在前面 Node 的waitStatus
上 - = -2,条件等待
- = -3,无条件传播
nextWaiter
通常 Node 是否独占有关
- 若锁实现独占,则 Node 独占,则
nextWaiter == null
,如ReentrantLock
- 若锁实现非独占,则 Node 非独占,则
nextWaiter == new Node()
,如ReentrantReadWriteLock
§4 流程 (以 ReentrantLock
为例)
ReentrantLock
中的 同步器 与 AQS、公平同步器、非公平同步器 的关系
需注意,同步器 Sync
即 AbstractQueuedSynchronizer
的子类
以 ReentrantLock.lock()
、ReentrantLock.unlock()
为入口的示例
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(()->{
System.out.println("AAAAAAAAAAAAAAAAA");
lock.lock();
try {
try {
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) { e.printStackTrace(); }
}finally {
lock.unlock();
}
},"A").start();
new Thread(()->{
lock.lock();
try {
System.out.println("BBBBBBBBBBBBBBBBBBBBB");
}finally {
lock.unlock();
}
},"B").start();
new Thread(()->{
lock.lock();
try {
System.out.println("CCCCCCCCCCCCCCCCCCCC");
}finally {
lock.unlock();
}
},"C").start();
}
首先,A 线程申请上锁,ReentrantLock
.lock() 方法,调用的是同步器 Sync.lock()
这是一个抽象方法,被 公平同步器 FairSync
、非公平同步器 NonFairSync
实现
对比二者的实现,可见 非公平同步器 NonFairSync
只是多了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
从方法名风格上看,是一个 CAS 操作,查看其声明如下
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
可见是试图 CAS AbstractQueuedSynchronizer.state
字段
结合上文,此字段的作用是标记找到此 同步器 的线程是否 立即可用,
AbstractQueuedSynchronizer.state ==0
时,认为 AQS 立即可用,于是按 CAS 操作立即占用了此 同步器
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
exclusiveOwnerThread 字段记录当前独享此同步器的线程
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// ......
// The current owner of exclusive mode synchronization.
private transient Thread exclusiveOwnerThread;
此时,A 线程抢锁成功,state == 1
且 exclusiveOwnerThread == A 线程
随后,B、C 线程申请上锁,但很明显,下面的 if 条件明显是走不通的,因为 state == 1
了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
因此,执行 else 中的 acquire(1)
去申请,如下面代码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上段代码的整体思路是
- 当前线程再申请一次同步器
万一之前持有同步器的线程跑了呢,如果这次成功了,就不用往等待队列里塞了 - 如果还是尝试申请 失败了,当前线程就把自己包装成一个 Node,作为 waiter,存入请求队列(CHL)) 中
- 申请队列成功后,尝试让自己进入阻塞状态
将自己等候状态记录在上一个 Node的nextWaiter
中
随后当前线程阻塞
若当前线程退出阻塞,则立即发起一次线程中断检查return Thread.interrupted();
- 队列中的线程每次唤醒都会尝试申请同步器,失败后继续阻塞,直到成功获取同步器
快速尝试
tryAcquire(arg)
是 AQS 中接口,用于尝试但 AQS 没有实现,只是给了个异常
模板方法模式,实现延迟到子类,并且强制子类复写此方法以实现功能(否则就抛异常)
// AQS 里的 tryAcquire(arg)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
当前案例中我们需要看 非公平同步器,因为 ReentrantLock lock = new ReentrantLock();
申请的非公平锁
尝试申请的逻辑中进行了两次判断
- 是不是可以 抢锁
通过state == 0
判断,false
时当前线程不能直接尝试抢锁 - 是不是可以 重入
虽然不能抢锁,但之前占用了锁的有可能是自己
需要用exclusiveOwnerThread == 当前线程
判断(此时,当前线程时 B),案例场景中明显不是
// non fair sync 里的 tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// non fair sync 里的 tryAcquire(arg) 的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
加入队列
addWaiter()
是 AQS 中接口,有两个作用:
- 将线程包装成 Node
- 将 Node 放到 线程等候队列中
private Node addWaiter(Node mode) {
//封装 Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 这里的逻辑是下面 enq 的一部分,用于快速尝试
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq()
的功能是将封好的 Node 塞入等候队列,其逻辑如下面代码所示
- 首先,
enq()
整体上是个自旋,直达成功将 Node 塞进去才结束
这是因为可能出现多个线程来尝试申请同步器,失败后都需要进入队列
但为了保证每个节点下都只有一个 next ,所以每次只能有一个线程进入队列尾部
所以对于每个线程而言,可能需要尝试多次,直到成功加入队列为止 - 其次,AQS 的 等待队列 是需要初始化的,如下面代码注释
AQS 会使用一个空的 Node(傀儡节点 或 哨兵节点) 作为队列头,在刚刚初始化时,它同时也是队列尾
这个 Node 表示有一个线程正在占用同步器,即队列中不是所有 Node 都是等待同步器的 waiter- Node 通过
tryAcquire(arg)
抢到对同步器的占用权时,会将自己的线程赋值给exclusiveOwnerThread
- 接着在自己成为 head 后,将自己的 thread 置空
- Node 通过
- AQS 中此队列添加 Node 的操作,都是 CAS 的
AQS 向 等候队列 添加 Node 时,会先让这个 Node 指向队列尾,随后 CAS 的尝试让 tail 指向该 Node
CAS 成功的就是新的 tail,随后建立新老 tail 的联系(原 tail 的 next 指向 Node,相反的指向在之前已经完成了)
这是为了确保每次操作只会后一个 Node 成功的成为新的 head / tail
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
排队
acquireQueued()
也是 AQS 中接口,作用是 尝试让自己进入阻塞状态
- 尝试阻塞前,如果 Node 是 head 的下一个 Node,再尝试申请一次同步器,如果成功则直接替换 head 并返回
这是因为 head 约等于占用同步器的线程,此时 head 随时可能完成对同步器的占用,从而轮到当前 Node
申请成功时,Node 自己的线程已经给了 AQS 的exclusiveOwnerThread
,接着 Node 成为新的 head - Node 会尝试让自己阻塞,以在等候队列中等候
Node 会将自己没有申请到同步器这一结果(可能是失败或者不用尝试申请)记录在上一个 Node的waitStatus
中
随后当前线程阻塞
若当前线程退出阻塞,则立即发起一次线程中断检查return Thread.interrupted();
若线程中断,则依然会在队列中排队,直到获取了同步器后,会立即在acquire()
中进行自我中断 - 队列中的线程每次唤醒都会尝试申请同步器,失败后继续阻塞,直到成功获取同步器
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
用前一个 Node 的状态决定当前 Node 是否可以阻塞
- 需注意,
shouldParkAfterFailedAcquire()
是acquireQueued()
的一部分,而后者是自旋的 - 若前一个 Node 的状态是 0 或 -3 ,说明当前 Node 刚刚进入队列还没尝试申请过 同步器
当前 Node 在第一次申请失败时,将前一个 Node 的状态置为 -1 - 当且仅当 前一个 Node 的状态是 -1 时,当前 Node 才能安全的阻塞
- 若 前一个 Node 的状态 > 0 时,说明 前一个 Node 被取消,则当前 Node 的前指针指向更前面的 Node
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
用于阻塞当前线程
- 当前线程就是 当前 Node 中的线程
- 等待队列中的线程通常都是阻塞在
LockSupport.park(this);
- 等待队列中的线程每次唤醒,都会根据位置决定是否尝试申请同步器,申请失败的或不用申请的继续阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
再随后,A线程释放: ReentrantLock.lock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
用于尝试释放锁,下面是 AQS 里的模板方法和 ReentrantLock
中的实现
- 获取 AQS 的 state,因为可以重入,所以 state 可能不仅仅是 1
- 从 state 扣除本次 release 的次数,即 -1
- 若 state == 0,可以释放锁
- AQS 的
state = 0
- AQS 的
exclusiveOwnerThread = null
- AQS 的
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor()
用于唤醒等待队列的 head 的下一个 Node
if (h != null && h.waitStatus != 0)
是为了排除等待队列中的傀儡/守护节点
头结点的waitStatus == 0
说明此节点后没有 Node,后面的 Node 才是等待申请同步器的线程- 除非线程被取消了,否则清空当前 head 的
waitStatus
因为 head 的waitStatus
通常记录的其实是当前head
的下一个 Node 的状态
而 下一个 Node 马上要再尝试获取同步器了,无论成功还是失败,都不能技术使用以前的 signal - 若 head 的下一个 Node 被取消了,就从等待队列的 tail 开始向前找最靠前的未取消的 Node 唤醒
注意 for 循环中没有找到一个就 break,而是一直往前找,直到找到最靠前的未取消 Node - Node 会在上文
parkAndCheckInterrupt()
的位置被唤醒,随后检查中断,若没被中断继续尝试申请 同步器
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}