关于AbstractQueuedSynchronizer(JDK1.8)的一点理解.
目录
- 前言
- 1.AbstractQueuedSynchronizer中的独占模式
- acquire获取锁
- 1.1 tryAcquire尝试获取锁
- 1.2 addWaiter加入队列
- 1.3 acquireQueued唤醒或阻塞线程
- 1.3.1 shouldParkAfterFailedAcquire
- 1.3.2 parkAndCheckInterrupt
- 1.3.3 cancelAcquire
- release 释放锁
前言
在多线程环境下多个线程同时竞争一把锁,结果是获得锁的线程去执行代码,其余没有获得锁的线程只能等待获得锁的线程释放锁之后才能去竞争获取锁;在AbstractQueuedSynchronizer(AQS)中没有使用synchronized关键字,那AQS具体是如何实现锁机制的呢?它如何解决下面几个问题:
- 如何处理没有获得锁的线程?
- 在没有使用synchronized关键字的时候,如何实现线程的阻塞?
- 如何释放锁?
- 释放锁之后如何唤醒等待锁的线程?
- 等待锁的线程,如果在等待锁的过程中发生异常该怎么处理?
- 如何判断哪些是正常等待的线程,哪些是发生异常的线程?
在AQS中有两种模式,在Node节点中用nextWaiter来标识是哪种模式,只有2种取值:
- SHARED(共享模式):同一个对象锁可以同时被多个线程共同持有;
- EXCLUSIVE(独占模式):同一个对象锁在一个时刻只能被一个线程持有;
1.AbstractQueuedSynchronizer中的独占模式
在AQS中,没有获取到锁的线程会被包装成Node节点添加到同步队列中(双向链表);当持有锁的线程释放锁之后会唤醒队列中下一个线程获取锁。
AbstractQueuedSynchronizer是一个基于CAS(compareAndSwap)来构建的同步队列,如果对CAS还有疑惑可以看这篇文章的CAS部分;
队列中每个元素都是Node的数据类型,下面是Node的数据结构:
Node对象中,属性值的含义:
thread:等待锁的线程;
waitState: 是等待节点的状态值;独占模式中waitState 的取值有3种:
- 0:初始值;新加入队列的节点,waitState的值为 0;
- SIGNAL:表示该节点等待被唤醒;
- CANCELLED : 取消等待;线程获取锁时发生异常,可能导致等待状态为CANCELLED;
nextWaiter:获取锁的类型,在独占模式中值为:EXCLUSIVE ;
prev,next:分别表示该节点的上一个节点和下一个节点;
在AQS中获取独占锁有3个方法:
- acquire(int arg);获取锁;
- acquireInterruptibly(int arg);该方法在获取锁时,如果线程有中断标记,会抛出异常。
- tryAcquireNanos(int arg, long nanosTimeout);该方法有超时时间,超过时间不会阻塞在队列中等待获取锁,会直接返回false,表示没有获取到锁;该方法也同样会检测线程是否有中断标志;如果有也会抛出异常;
acquire获取锁
上面3个方法实现基本差不多,后2个方法只是多添加了对中断标志和超时的判断;以acquire为例分析独占锁的实现过程:
/**
*tryAcquire --》尝试获取锁(由子类实现)
*addWaiter(Node.EXCLUSIVE) ==》添加节点到队列中
*acquireQueued() ===>阻塞,唤醒线程
*selfInterrupt==》设置中断标志;
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();//acquireQueued返回值为true,线程设置中断标志;
}
获取锁到同步队列的过程:
- 首先调用tryAcquire获取锁;(tryAcquire是子类实现)
- 没有获取到锁就调用addWaiter,将线程包装成Node添加到同步队列表尾;
- 在线程添加队列之后,调用acquireQueued,根据情尝试获取锁(当该节点的prev节点是head时),或者况阻塞线程 ;
1.1 tryAcquire尝试获取锁
AQS只是提供一个框架,定义了线程获取锁的流程,以及出现异常的处理;具体的实现还要看子类的实现,比如基于AQS实现的就有:ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Semaphore等;这些实现类的tryAcquire各有不同;
1.2 addWaiter加入队列
在AQS中,竞争锁失败的线程要加入到同步队列中;AQS的同步队列(CLH)是一个双向链表,head 指向链表头节点, tail 指向链表尾节点;每一个加入队列的节点都是添加到队列尾部
private Node addWaiter(Node mode) {
//mode:表示锁的类型:互斥锁,共享锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//判断尾节点tail是不是为空;
node.prev = pred;
if (compareAndSetTail(pred, node)) {//将新插入的节点放到链表尾部;新插入的node作为新的tail
pred.next = node;
return node;
}
}
//上面插入失败,说明有多个线程同时竞争锁失败,加入到同步队列中;
//同时竞争要添加到链表的末尾,添加失败的进入到enq方法中继续尝试加入队列
enq(node);
return node;
}
1. 当线程竞争锁失败时会将线程包装成Node节点加入到同步队列,包装成Node对象时会标识该线程要竞争锁的类型:SHARED 或者 EXCLUSIVE ,在独占模式中,mode = EXCLUSIVE;
2. 判断链表的尾节点 tail 是否为null。
- 如果 tail != null 说明同步队列中已经有线程在排队等待获取锁了,这个时候将新加入队列的node节点,放到链表表尾;
- 如果 tail == null 说明此时同步队列还是一个空队列,需要先初始化head,再添加新插入的节点;(初始化head在enq方法中)
3. 同步队列是空队列或者新node插入失败的,都会进入到enq方法中继续尝试将节点插入到队列中;
enq源码:
private Node enq(final Node node) {
for (;;) {//死循环,保证能将node节点添加到链表中;
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//初始化head
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//CAS,竞争将node添加到表尾;
t.next = node;
return t;
}
}
}
}
在enq方法中,如果检测到tail == null,就会用一个node初始化head和tail,这个node没有有用信息,各个属性值都是初始值就是一个占位用的,没有实际意义;
在addWaiter和enq中,有一个有意思的问题:为什么链表添加新node的时候都是先让node的prev指向旧tail,CAS竞争修改表尾tail成功之后再修改旧tail的next指针?为什么不在CAS竞争成功之后,一起修改 prev 和 next 呢?
- 新节点先设置prev,CAS竞争成功之后再设置旧tail的next:
- CAS成功之后再一起修改next 和 prev
后一种写法会让新节点成为队列的 tail节点的这一刻:tail脱离队列,队列的尾节点tail不在队列中;这种写法会在同步队列释放锁时可能导致部分节点线程不能被唤醒;
1.3 acquireQueued唤醒或阻塞线程
在acquireQueued方法中关于什么时候阻塞线程什么时候唤醒线程的处理流程的大框架并不是特别复杂:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前一个节点:prev
final Node p = node.predecessor();
//prev节点:prev节点是头节点head ====》 调用tryAcquire尝试获取锁;
if (p == head && tryAcquire(arg)) {
setHead(node);//设置该节点node为头节点
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire:根据prev的waitState状态值,判断是否应该阻塞当前线程;
//parkAndCheckInterrupt调用LockSupport.park,阻塞当前线程;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) //线程发生异常时,failed = true;
cancelAcquire(node);//将node的waitState值,改为 CANCELLED;
}
}
- 在新节点刚加入队列的时候:首先会判断该节点node的前一个节点prev是否是head节点;
- 如果是head节点会尝试获取锁;获取锁成功结束流程;获取锁失败,继续下面的流程;
- 不是head,继续下面的流程;
- 调用shouldParkAfterFailedAcquire方法,判断是否应该阻塞当前线程;(这个方法中涉及到等待状态( waitState)的改变: 0 -> SIGNAL;也可能会涉及到,将该节点之前的:waitState = CANCELLED的节点清除出队列)waitState = SIGNAL;就阻塞,waitState = 0,就将其设置成SIGNAL;
- 阻塞线程:调用parkAndCheckInterrupt方法阻塞线程;
- 不阻塞线程:回到第一步,继续尝试获取锁;
- 如果线程发生异常,则会将节点的状态改为 CANCELLED;
刚看完源码一定会有疑惑:关于failed: 它的初始值为 true,但在整个流程中没有看到关于fail值如何改变,只有在获取锁成功之后才会将值修改为false;而fail放在了finally块中;要执行这一块代码的时候,已经是成功获取锁结束循环要返回时才会执行finally块的代码;那这个变量有什么意义呢?这个时候:fail = false,finally块里面的逻辑也不会执行。
也就是说正常情况下,是不会执行finally块的代码;那么就只有一种情况了,就是在线程执行for循环块的代码时抛出了异常;这个时候没有获取到锁也不会修改fail的值,这种情况下进入到finally块中,fail = true 会执行cancelAcquire方法,修改node的waitState,将值改为waitState = CANCELLED;(当然这个方法不止是修改waitState,里面还有其他 的处理。)
1.3.1 shouldParkAfterFailedAcquire
流程:
源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取prev的状态值:waitStatus
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {// CANCELLED = 1
//从node节点开始往前找到最近一个waitState值不为 1 的节点,将其设置为 该节点的prev节点;
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//在EXCLUSIVE(独占)模式下waitStatus 的取值只有: 0 ,CANCELLED ,SIGNAL;
//因此在waitStatus 的值排除了 CANCELLED ,SIGNAL;到这个分支waitStatus 只能是 0;
//将prev节点的状态值从 0 -> 1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这段代码的逻辑比较简单就是根据该节点的prev节点的状态值waitStatus,来处理并返回结果;从返回结果来看:如果prev的waitStatus =SIGNAL就返回true,否则返回false ;
流程:
- 获取prev的waitStatus
- waitStatus = SIGNAL;返回true;会继续执行parkAndCheckInterrupt(),来阻塞当前线程;
- waitStatus > 0;当waitStatus = CENCELLED时,值才会 大于 0 ;在这种情况下,表示prev线程已经取消等待,因此当前节点node会继续往前寻找到最近一个waitStatus <= 0 的节点当作该节点的prev节点;把 waitStatus = CENCELLED 的节点 清除出队列;返回false;回到循环体中,执行获取锁的流程;
- waitStatus = 0;刚加入队列的node,waitStatus = 0;将waitStatus 的值设置为:waitStatus = SIGNAL;返回false;回到循环体中,执行获取锁的流程;
上节简单提到,在acquireQueued方法中获取锁发生异常时会进入到finally块执行cancelAcquire方法;进入到这个方法可以将node的状态waitStatus 值改为 CANCELLED;因此在这里才会判断prev节点的状态是否为CANCELLED;在发现prev节点的状态为CANCELLED时,会往前找到一个状态值正常(0,SIGNAL)的节点作为prev节点,将状态值为CANCELLED的节点清除出队列;
每一个新加入队列的节点,都会检查自己的prev节点的状态是否是CANCELLED,如果是这个状态就会将这个节点清除出队列;在cancelAcquire方法中,除了将自己所在节点的状态标记为CANCELLED之外,该线程还会将自己所在节点删除,并且往前检查,将不合法(CANCELLED)的状态节点删除。
shouldParkAfterFailedAcquire(pred, node) && parkAndCheckInterrupt(),当该方法返回true时,会执行parkAndCheckInterrupt方法阻塞当前线程;
1.3.2 parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//阻塞当前线程
return Thread.interrupted();
}
在这个方法中,调用LockSupport.park(this);实现了对当前线程的阻塞;在当前线程被唤醒之后调用:Thread.interrupted() 清除中断标记;如果线程设置了中断标记返回true,反之返回false;
1.3.3 cancelAcquire
流程图:
源码:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;//获取prev节点
// 检查当前节点的prev节点是否存在状态为CANCELLED的节点;
//如果prev异常,就找到离最近node最近的正常节点,当作node节点的prev
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//如果node是tail尾节点,就将pred节点设置为tail;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);//修改pred节点的next指针
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {//pred不是head节点,且状态正常
Node next = node.next;
//pred的next指向node的next指向的节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {//pred是head节点,或者node节点的状态是CANCELLED;
unparkSuccessor(node);//释放node
}
node.next = node; // help GC
}
}
在删除node节点时有一个细节,就是没有彻底将node节点从队列中删除,只是将node的next指针删除,而保留了prev指针;
在队列中节点是以prev,next指针来遍历获取队列中的节点(head开始,tail结束),而无论从队列中的哪个节点开始遍历都获取不到node节点,从这个角度看node 已经从队列中删除。而以node的视角来看它仍然在队列中,因为在删除node时保留了node的prev指针;node节点可以用prev指针获取到队列中节点,然后用这个节点可以遍历整个队列的节点;那为什么不将其prev指针删除呢?
可以考虑下面的场景:当新增的node节点,添加到队列的末尾,成为了队列的tail节点;新节点成为tail有2个步骤:
- newNode.repv = oldTail
- tail = newNode
队列是双向链表,因此在成为当newNode成为队列的tail节点之后还需要将oldTail的next指针更新为 newNode :oldTail.next = newNode 这才算newNode完整的加入到了队列中;如果在这一步操作之前,因为操作系统的线程调度导致该线被挂起,暂停运行。这个时候:oldTail.next =null ;在该线程被挂起的期间,oldTail节点发生异常,需要被清理出队列;可以分别看:清除prev指针和不清除prev的结果:
- 清除prev指针
- 不清除prev指针
release 释放锁
源码:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg)方法由子类实现,unparkSuccessor方法是关键,由这个方法将head的下一个节点持有的线程唤醒获取锁;
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
释放next节点;
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)//找到距离node最近的下一个正常节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒节点持有的线程;
}
释放next节点时首先会判断next是否为空或者是状态是否异常(CANCELLED )?如果是,则会从队列的tail开始向前遍历,找到距离head节点最近的状态正常的节点将其释放;
在添加新节点到队列时,每个新节点的prev指针都是先指向旧tail节点,当新节点成为tail时新节点就已经在队列中而不是和整个队列脱节;并且在删除异常节点时,也是保留了异常节点的prev指针,无论在什么情况下都保证了队列能够从tail节点开始从后向前能够遍历整个队列;
在队列加入新节点时,提到过如果不先将prev指向旧tail导致新节点成为tail时会与整个队列脱节,造成部分节点不能被唤醒;就是因为释放锁时next节点如果是CANCELLED状态,就会导致需要从后往前遍历,找下一个被唤醒的节点;这个时候如果tail与队列脱节,就会直接将tail节点释放;而之前的节点不会有被唤醒的机会;