当前位置: 首页 > news >正文

深入剖析 Java 中的 AbstractQueuedSynchronizer(AQS)

简介

AQS是抽象类AbstractQueueSynchronizer的简称,翻译为抽象的队列同步器,它定义了一套多线程访问共享资源的同步器框架,许多同步器的实现都依赖于它,如ReentrantLockCountDownLatchSemaphore等。

数据结构

在这里插入图片描述
AQS 实现锁和同步工具依赖的核心数据结构:

public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizer {private transient volatile Node head;private transient volatile Node tail;private volatile int state;
}public abstract class AbstractOwnableSynchronizer {private transient Thread exclusiveOwnerThread;
}

state

AQS中的state的作用类似于ObjectMonitor中的_owner字段。只不过_owner字段是一个指针,存储的是获取锁的线程,而state是一个int类型的变量,存储0、1等整型值。

  • 0:表示锁没有被占用;
  • 1:表示锁已经被占用;
  • 大于 1:表示重入的次数。

当多个线程竞争锁时,它们会通过compareAndSetState的CAS操作来更新state的值,即先检查state的值是否为0,如果state值为0的话,将state值设置为1。谁设置成功,谁就获取了这个锁。

protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

state的访问方式有三种:

  • getState();
  • setState();
  • compareAndSetState()

exclusiveOwnerThread

exclusiveOwnerThread成员变量存储持有锁的线程,它配合state成员变量,可以实现锁的重入机制。

head 和 tail

AQS 使用双向链表来实现等待队列,用来存储等待锁的线程和等待条件变量的线程。双向链表的节点如下所示:

static final class Node {static final Node SHARED = new Node();static final Node EXCLUSIVE = null;static final int CANCELLED =  1; // 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入此状态后的结点将不再变化。static final int SIGNAL    = -1; // 表示后继结点在等待当前结点唤醒。后继结点入队时,会将当前结点的状态更新为SIGNALstatic final int CONDITION = -2; // 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。volatile Thread thread;volatile Node prev;volatile Node next;volatile int waitStatus;Node nextWaiter;
}

Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,变量waitStatus则表示当前Node结点的等待状态,共五种取值CANCELLEDSIGNALCONDITIONPROPAGATE0(新节点入队时的默认状态)

负值表示结点处于有限等待状态,而正值表示结点已被取消。所以源码中很多地方用>0,<0来判断结点的状态是否正常。

AQS中的headtail两个成员变量分别为双向链表的头指针和尾指针。

原理

AQS 定义了 8 个模板方法,分别用于 AQS 的两种工作模式:独占模式共享模式
关于模板方法设计模式,可参考 【模板方法】设计模式:构建可扩展软件的基石
在这里插入图片描述

Lock锁为排它锁,因此,Lock锁的底层实现只会用到AQS的独占模式;

ReadWriteLock锁中的读锁为共享锁,写锁为排它锁,因此,ReadWriteLock锁的底层实现既会用到AQS的独占模式,又会用到AQS的共享模式;

Semaphore、CountdownLatch这些同步工具只会用到AQS的共享模式。

AQS又定义了 4 个抽象方法。
在这里插入图片描述

ReentrantLock

ReentrantLock为例,说明下AQS 是怎么用的。
在这里插入图片描述
ReentrantLock定义了两个继承自AQS的子类:NonfairSyncFairSync,分别用来实现非公平锁和公平锁。因为 NonfairSync 和 FairSync 释放锁的逻辑是一样的,所以,NonfairSync 和FairSync 又抽象出了一个公共的父类Sync

public class ReentrantLock implements Lock {private final Sync sync;abstract static class Sync extends AbstractQueuedSynchronizer { ... }static final class NonfairSync extends Sync { ... }static final class FairSync extends Sync { ... }public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();} public void lock() {sync.acquire(1);}public void unlock() {sync.release(1);}//...省略其他方法...
}

ReentrantLock中的lock()函数调用AQS的acquire()模板方法来实现,unlock()函数调用AQS的release()模板方法来实现。接下来,我们就来看下acquire()release()的底层实现原理。

acquire()模板方法
 public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}

函数流程如下:

  • 先调用tryAcquire方法去竞争锁,如果获取锁成功,则acquire方法直接返回;
  • 竞争锁失败,则执行addWaiter方法,将线程包裹为Node节点放入等待队列的尾部;
  • 最后调用acquireQueued阻塞当前线程;
  • 如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

acquire()不可中断,因此,在acquire()接收到中断时,继续阻塞等待锁,直到获取到锁之后,才调用selfInterrupt()将中断标记恢复。

tryAcquire()

tryAcquire()是抽象方法,在NonfairSyncFairSync中被实现。

static final class NonfairSync extends Sync {// 尝试获取锁,成功返回true,失败返回false。AQS用于实现锁时,acquires=1protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); //获取state值if (c == 0) { //1、锁没有被其他线程占用if (compareAndSetState(0, acquires)) { //CAS设置state值为1。此处体现了“非公平”,上来就抢锁setExclusiveOwnerThread(current); //设置exclusiveOwnerThreadreturn true; //获取锁成功}} else if (current == getExclusiveOwnerThread()) { //2、锁可重入int nextc = c + acquires; // state+1if (nextc < 0) //重入次数太多,超过了int最大值,溢出为负数,此情况罕见throw new Error("Maximum lock count exceeded");setState(nextc); // state=state+1,state记录重入的次数,解锁的时候用return true; //获取锁成功}return false; //3、锁被其他线程占用}    
}static final class FairSync extends Sync {protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) { //1、锁没有被占用if (!hasQueuedPredecessors() &&  //此处体现了“公平”,先判断等待队列中没有线程时才获取锁compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}} else if (current == getExclusiveOwnerThread()) { //2、锁可重入int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
}
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

两个tryAcquire()方法的区别是在获取锁之前,FairSync会调用hasQueuedPredecessors()函数,查看等待队列中是否有线程在排队,如果有,那么tryAcquire()返回false,表示竞争锁失败,从而禁止“插队”行为。

addWaiter

在多线程环境下,往链表尾部添加节点会存在线程安全问题,因此,下面的代码采用自旋+CAS操作来解决这个问题。

private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) {node.prev = pred;//尝试快速模式直接放到队尾if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 再次尝试入队尾enq(node);return node;
}private Node enq(final Node node) {//自旋执行CAS操作(添加节点到链表尾部),直到成功为止for (;;) {Node t = tail;if (t == null) { //链表为空,添加虚拟头节点if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;//CAS操作解决了两个线程同时往链表尾部添加节点时的线程安全问题if (compareAndSetTail(t, node)) {t.next = node;return t;}}}
}
acquireQueued

acquireQueued()的代码实现如下所示,主要包含两部分逻辑:

  • 使用tryAcquire()函数来竞争锁;
  • 使用park()函数来阻塞线程。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //拿到前驱节点//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。//如果线程是被中断唤醒的,那么p不一定等于head。只有p==head,线程才能去竞争锁if (p == head && tryAcquire(arg)) {setHead(node);//把node设置成虚拟头节点,也就相当于将它删除p.next = null; // help GCfailed = false;return interrupted;}//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true; //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true}} finally {//以上过程只要抛出异常,都要将这个节点标记为CANCELLED,等待被删除if (failed)cancelAcquire(node);}
}
//此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; //拿到前驱的状态if (ws == Node.SIGNAL)//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了return true;if (ws > 0) {/** 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就GC回收)!*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
// 1)被unpark();2)被interrupt()。
private final boolean parkAndCheckInterrupt() {//LockSupport.park(this)底层调用JVM提供的Unsafe.park()函数来实现LockSupport.park(this); //调用park()使线程进入waiting状态return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

acquireQueued() 方法不可中断,但LockSupport.park()是可以被中断的。

整个acquire()模板方法的流程如下:
在这里插入图片描述

release()模板方法

主要包含两部分逻辑:使用tryRelease()函数释放锁和调用unpark()函数唤醒链首节点(即虚拟头节点的后继节点)对应的线程。

public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head; //找到头结点if (h != null && h.waitStatus != 0)unparkSuccessor(h);//唤醒等待队列里的下一个线程,内部调用unpark()函数,唤醒排在h后面的线程return true;}return false;
}

tryRelease()是抽象方法。不管是公平锁还是非公平锁,tryRelease()释放锁的逻辑相同。

// 释放锁,成功返回true,失败返回false。AQS用于实现锁时,releases=1
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c); //state-1 != 0,说明锁被重入多次,还不能解锁。return free;
}

总结

AQS 提供了一个强大而灵活的框架,用于构建锁和其他同步器。通过理解其内部原理和状态管理机制,我们可以创建出高效且线程安全的并发工具。

参考资料
《Java 编程之美》
《 Java并发之AQS详解》

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 苹果宣布iOS 18正式版9月17日推送:支持27款iPhone升级
  • C# WPF上位机与西门子PLC通信实现实例解析
  • Android 使用JSON动画:Lottie框架基本使用
  • 学生成绩操作
  • Leetcode面试经典150题-134.加油站
  • 关于Spring Cloud 表达式注入漏洞——分析复现
  • Pyspark下操作dataframe方法(1)
  • activiti学习第一步
  • MySQL数据库 — Explain命令
  • 我的IP地址经常变化对我的账号安全有影响吗?
  • [前端][HTML]<a>标签中href=“javascript:;“表示什么意思
  • git删除本地分支报错:error: the branch ‘xxx‘ is not fully merged
  • 现在有一台ubuntu22.04 的工作站机器,现在想通过RDP的方式进行远程开发
  • 从零开始一步一步搭建 Vue3 + Webpack5 项目脚手架指南
  • Java中实现消息告警推送的几种方式
  • Android Volley源码解析
  • AWS实战 - 利用IAM对S3做访问控制
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • ECS应用管理最佳实践
  • go语言学习初探(一)
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • k8s 面向应用开发者的基础命令
  • k8s如何管理Pod
  • Synchronized 关键字使用、底层原理、JDK1.6 之后的底层优化以及 和ReenTrantLock 的对比...
  • tensorflow学习笔记3——MNIST应用篇
  • WePY 在小程序性能调优上做出的探究
  • windows-nginx-https-本地配置
  • 工作中总结前端开发流程--vue项目
  • 记一次用 NodeJs 实现模拟登录的思路
  • 前嗅ForeSpider中数据浏览界面介绍
  • 强力优化Rancher k8s中国区的使用体验
  • 深度解析利用ES6进行Promise封装总结
  • 收藏好这篇,别再只说“数据劫持”了
  • 手机端车牌号码键盘的vue组件
  • 通信类
  • 因为阿里,他们成了“杭漂”
  • 原生Ajax
  • k8s使用glusterfs实现动态持久化存储
  • Linux权限管理(week1_day5)--技术流ken
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • # 利刃出鞘_Tomcat 核心原理解析(二)
  • # 数论-逆元
  • #13 yum、编译安装与sed命令的使用
  • #define用法
  • #etcd#安装时出错
  • #Lua:Lua调用C++生成的DLL库
  • $GOPATH/go.mod exists but should not goland
  • (2022版)一套教程搞定k8s安装到实战 | RBAC
  • (3)选择元素——(17)练习(Exercises)
  • (4)STL算法之比较
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (ZT)薛涌:谈贫说富
  • (附源码)ssm跨平台教学系统 毕业设计 280843
  • (回溯) LeetCode 40. 组合总和II