当前位置: 首页 > news >正文

基础 | 并发编程 - [AQS]

INDEX

      • §1 AbstractQueuedSynchronizer
      • §2 AbstractQueuedSynchronizer 的体系
      • §3 `AbstractQueuedSynchronizer` 的结构
        • §3.1 字段
        • §3.2 CLH lock queue 等待队列
        • §3.3 `AbstractQueuedSynchronizer.Node`
      • §4 流程 (以 `ReentrantLock` 为例)

§1 AbstractQueuedSynchronizer

  • 可以直接翻译成:抽象的、=(依赖)队列(结构)的 同步器
  • java.util.concurrent.locks 包下 抽象类,有一门三父子,注意 AbstractOwnableSynchronizer 才是相对父级的
    • AbstractOwnableSynchronizer
      • AbstractQueuedLongSynchronizer
      • AbstractQueuedSynchronizer
  • 用于构建 或 其他同步器组件
    锁和其他同步器的是基于它实现的
  • 通过 队列(CLH) 对抢锁的线程进行排队
  • 通过 int 表示持有锁的状态

§2 AbstractQueuedSynchronizer 的体系

在这里插入图片描述

§3 AbstractQueuedSynchronizer 的结构

§3.1 字段

首先是一些父类字段,在 AbstractOwnableSynchronizer

//The current owner of exclusive mode synchronization
//哪个线程,现在独占此同步器
private transient Thread exclusiveOwnerThread;

然后是 AbstractQueuedSynchronizer 自己的主要字段

private volatile int state;
/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

exclusiveOwnerThread 独占线程
exclusiveOwnerThread 在 AQS 的父类 AbstractOwnableSynchronizer 中声明
线程 x 申请 AQS 成功,则 exclusiveOwnerThread==x

state 同步状态
state 表示当前 AbstractQueuedSynchronizer 的状态,即 AQS 是否立即可用
也可以理解为 新加入的线程是否需要排队

  • 0
    立即可用,新加入的线程可以尝试获取锁
  • >0
    非立即可用,新加入的线程需要排队

CLH lock queue
headtail 两个字段其实是对 AQS 中等候队列的声明,详见 §3.2

§3.2 CLH lock queue 等待队列

队列的声明
headtail 实际上是一个变形的 CLH lock queue 的头尾节点,CLH 是三个人名 Craig & Landin & Hagersten
CLH lock queue 本质上是一个双向链表,所以 head / tail 也可以理解成队列本尊

CLH lock queue 特性如下:

  • 遵循 先进先出 ,因为这是个双向队列,仅从数据结构上其实可以做到随意
  • tail 追加
    新的需要获取锁的线程从队尾加入队列
  • CLH 为空时,新加入的线程立即获得锁
  • CLH 中线程监听前一个线程的状态
    CLH 中的线程从加入时起开始 本地自旋
    自旋中需要做的事是获取前面线程的状态
  • 头结点会尝试获取锁
    只是尝试获取,说明头结点具有获取锁的优先权
    不保证一定可以获取,获取失败则重新等待
  • CLH 添加元素的操作,都是 CAS 并且 自旋
    这是为了确保添加元素时,若遇得到并发情况
    只会后一个 Node 成功的成为新的 head / tail,但所有并发的线程最终都会成功

§3.3 AbstractQueuedSynchronizer.Node

Node
是需求抢锁,但是没抢成功需要排队的线程的封装
AQS 等候队列中的 Node,相当于是一个 waiter(在等轮到自己好抢锁)

结构
AbstractQueuedSynchronizer.Node 的主要字段如下

volatile int waitStatus; 
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

waitStatus 通常是下一个 Node(waiter)的等候状态,

  • = 0,默认值,通常说明 Node 后面没有 Node 或后面的 Node 刚刚加入队列
  • > 0 ,取消,是 当前节点 被取消了
  • = -1,下一个 Node 可以安全阻塞
    Node 不需要尝试申请同步器或申请失败后,需要阻塞 Node
    但阻塞前,需要先给 Node 一个申请释放的信号,已经收到信息的 Node 才能安全的阻塞
    这个信号会标记在前面 Node 的 waitStatus
  • = -2,条件等待
  • = -3,无条件传播

nextWaiter 通常 Node 是否独占有关

  • 若锁实现独占,则 Node 独占,则 nextWaiter == null,如 ReentrantLock
  • 若锁实现非独占,则 Node 非独占,则 nextWaiter == new Node(),如 ReentrantReadWriteLock

§4 流程 (以 ReentrantLock 为例)

ReentrantLock 中的 同步器AQS公平同步器非公平同步器 的关系
需注意,同步器 SyncAbstractQueuedSynchronizer 的子类

在这里插入图片描述

ReentrantLock.lock()ReentrantLock.unlock() 为入口的示例

public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    new Thread(()->{
        System.out.println("AAAAAAAAAAAAAAAAA");
        lock.lock();
        try {
            try {
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) { e.printStackTrace(); }
        }finally {
            lock.unlock();
        }

    },"A").start();

    new Thread(()->{
        lock.lock();
        try {
            System.out.println("BBBBBBBBBBBBBBBBBBBBB");
        }finally {
            lock.unlock();
        }
    },"B").start();

    new Thread(()->{
        lock.lock();
        try {
            System.out.println("CCCCCCCCCCCCCCCCCCCC");
        }finally {
            lock.unlock();
        }
    },"C").start();
}

首先,A 线程申请上锁ReentrantLock.lock() 方法,调用的是同步器 Sync.lock()
在这里插入图片描述
这是一个抽象方法,被 公平同步器 FairSync非公平同步器 NonFairSync 实现
在这里插入图片描述

在这里插入图片描述
对比二者的实现,可见 非公平同步器 NonFairSync 只是多了

if (compareAndSetState(0, 1))
	setExclusiveOwnerThread(Thread.currentThread());

从方法名风格上看,是一个 CAS 操作,查看其声明如下

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

可见是试图 CAS AbstractQueuedSynchronizer.state 字段
在这里插入图片描述
结合上文,此字段的作用是标记找到此 同步器 的线程是否 立即可用
AbstractQueuedSynchronizer.state ==0 时,认为 AQS 立即可用,于是按 CAS 操作立即占用了此 同步器

if (compareAndSetState(0, 1))
	setExclusiveOwnerThread(Thread.currentThread());

exclusiveOwnerThread 字段记录当前独享此同步器的线程

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
	// ......
    // The current owner of exclusive mode synchronization.
    private transient Thread exclusiveOwnerThread;

此时,A 线程抢锁成功state == 1exclusiveOwnerThread == A 线程

随后,B、C 线程申请上锁,但很明显,下面的 if 条件明显是走不通的,因为 state == 1

if (compareAndSetState(0, 1))
	setExclusiveOwnerThread(Thread.currentThread());
else
    acquire(1);

因此,执行 else 中的 acquire(1) 去申请,如下面代码

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上段代码的整体思路是

  • 当前线程再申请一次同步器
    万一之前持有同步器的线程跑了呢,如果这次成功了,就不用往等待队列里塞了
  • 如果还是尝试申请 失败了,当前线程就把自己包装成一个 Node,作为 waiter,存入请求队列(CHL)) 中
  • 申请队列成功后,尝试让自己进入阻塞状态
    将自己等候状态记录在上一个 Node的 nextWaiter
    随后当前线程阻塞
    若当前线程退出阻塞,则立即发起一次线程中断检查 return Thread.interrupted();
  • 队列中的线程每次唤醒都会尝试申请同步器,失败后继续阻塞,直到成功获取同步器

快速尝试
tryAcquire(arg) 是 AQS 中接口,用于尝试但 AQS 没有实现,只是给了个异常
模板方法模式,实现延迟到子类,并且强制子类复写此方法以实现功能(否则就抛异常)

// AQS 里的 tryAcquire(arg)
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

当前案例中我们需要看 非公平同步器,因为 ReentrantLock lock = new ReentrantLock(); 申请的非公平锁
尝试申请的逻辑中进行了两次判断

  • 是不是可以 抢锁
    通过 state == 0 判断,false 时当前线程不能直接尝试抢锁
  • 是不是可以 重入
    虽然不能抢锁,但之前占用了锁的有可能是自己
    需要用 exclusiveOwnerThread == 当前线程 判断(此时,当前线程时 B),案例场景中明显不是
// non fair sync 里的 tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
// non fair sync 里的 tryAcquire(arg) 的实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

加入队列
addWaiter() 是 AQS 中接口,有两个作用:

  • 将线程包装成 Node
  • 将 Node 放到 线程等候队列中
private Node addWaiter(Node mode) {
	//封装 Node
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 这里的逻辑是下面 enq 的一部分,用于快速尝试
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq() 的功能是将封好的 Node 塞入等候队列,其逻辑如下面代码所示

  • 首先,enq() 整体上是个自旋,直达成功将 Node 塞进去才结束
    这是因为可能出现多个线程来尝试申请同步器,失败后都需要进入队列
    但为了保证每个节点下都只有一个 next ,所以每次只能有一个线程进入队列尾部
    所以对于每个线程而言,可能需要尝试多次,直到成功加入队列为止
  • 其次,AQS 的 等待队列 是需要初始化的,如下面代码注释
    AQS 会使用一个空的 Node(傀儡节点 或 哨兵节点) 作为队列头,在刚刚初始化时,它同时也是队列尾
    这个 Node 表示有一个线程正在占用同步器,即队列中不是所有 Node 都是等待同步器的 waiter
    • Node 通过 tryAcquire(arg) 抢到对同步器的占用权时,会将自己的线程赋值给 exclusiveOwnerThread
    • 接着在自己成为 head 后,将自己的 thread 置空
  • AQS 中此队列添加 Node 的操作,都是 CAS 的
    AQS 向 等候队列 添加 Node 时,会先让这个 Node 指向队列尾,随后 CAS 的尝试让 tail 指向该 Node
    CAS 成功的就是新的 tail,随后建立新老 tail 的联系(原 tail 的 next 指向 Node,相反的指向在之前已经完成了)
    这是为了确保每次操作只会后一个 Node 成功的成为新的 head / tail
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

排队
acquireQueued() 也是 AQS 中接口,作用是 尝试让自己进入阻塞状态

  • 尝试阻塞前,如果 Node 是 head 的下一个 Node,再尝试申请一次同步器,如果成功则直接替换 head 并返回
    这是因为 head 约等于占用同步器的线程,此时 head 随时可能完成对同步器的占用,从而轮到当前 Node
    申请成功时,Node 自己的线程已经给了 AQS 的 exclusiveOwnerThread,接着 Node 成为新的 head
  • Node 会尝试让自己阻塞,以在等候队列中等候
    Node 会将自己没有申请到同步器这一结果(可能是失败或者不用尝试申请)记录在上一个 Node的 waitStatus
    随后当前线程阻塞
    若当前线程退出阻塞,则立即发起一次线程中断检查 return Thread.interrupted();
    若线程中断,则依然会在队列中排队,直到获取了同步器后,会立即在 acquire() 中进行自我中断
  • 队列中的线程每次唤醒都会尝试申请同步器,失败后继续阻塞,直到成功获取同步器
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire() 用前一个 Node 的状态决定当前 Node 是否可以阻塞

  • 需注意,shouldParkAfterFailedAcquire()acquireQueued() 的一部分,而后者是自旋的
  • 若前一个 Node 的状态是 0 或 -3 ,说明当前 Node 刚刚进入队列还没尝试申请过 同步器
    当前 Node 在第一次申请失败时,将前一个 Node 的状态置为 -1
  • 当且仅当 前一个 Node 的状态是 -1 时,当前 Node 才能安全的阻塞
  • 前一个 Node 的状态 > 0 时,说明 前一个 Node 被取消,则当前 Node 的前指针指向更前面的 Node
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt() 用于阻塞当前线程

  • 当前线程就是 当前 Node 中的线程
  • 等待队列中的线程通常都是阻塞在 LockSupport.park(this);
  • 等待队列中的线程每次唤醒,都会根据位置决定是否尝试申请同步器,申请失败的或不用申请的继续阻塞
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

再随后,A线程释放ReentrantLock.lock()

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease() 用于尝试释放锁,下面是 AQS 里的模板方法和 ReentrantLock 中的实现

  • 获取 AQS 的 state,因为可以重入,所以 state 可能不仅仅是 1
  • 从 state 扣除本次 release 的次数,即 -1
  • 若 state == 0,可以释放锁
    • AQS 的 state = 0
    • AQS 的 exclusiveOwnerThread = null
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected final boolean tryRelease(int releases) {
     int c = getState() - releases;
     if (Thread.currentThread() != getExclusiveOwnerThread())
         throw new IllegalMonitorStateException();
     boolean free = false;
     if (c == 0) {
         free = true;
         setExclusiveOwnerThread(null);
     }
     setState(c);
     return free;
 }

unparkSuccessor() 用于唤醒等待队列的 head 的下一个 Node

  • if (h != null && h.waitStatus != 0) 是为了排除等待队列中的傀儡/守护节点
    头结点的 waitStatus == 0 说明此节点后没有 Node,后面的 Node 才是等待申请同步器的线程
  • 除非线程被取消了,否则清空当前 head 的waitStatus
    因为 head 的 waitStatus 通常记录的其实是当前 head 的下一个 Node 的状态
    而 下一个 Node 马上要再尝试获取同步器了,无论成功还是失败,都不能技术使用以前的 signal
  • 若 head 的下一个 Node 被取消了,就从等待队列的 tail 开始向前找最靠前的未取消的 Node 唤醒
    注意 for 循环中没有找到一个就 break,而是一直往前找,直到找到最靠前的未取消 Node
  • Node 会在上文 parkAndCheckInterrupt() 的位置被唤醒,随后检查中断,若没被中断继续尝试申请 同步器
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

相关文章:

  • java8 新特性 stream
  • DAY45(DAY46拓展):SOCKS 代理技术
  • Handler消息传递机制
  • Django之路由匹配
  • Java面试(四)JVM基础
  • 新学期,新FLAG
  • 一文详解C语言文件
  • Java实现的一个编译器源代码(Win11)
  • 内核调试:crash工具与vmcore实践
  • C++ Qt / VS2019 +opencv + onnxruntime 部署语义分割模型【经验】
  • 如何查看线程死锁
  • 代码源每日一题div2 可重排列
  • 【原创】基于Jsp+Servlet的仓库管理系统
  • 堡垒机部署
  • linux的man命令
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • 5、React组件事件详解
  • LintCode 31. partitionArray 数组划分
  • MySQL几个简单SQL的优化
  • Spring-boot 启动时碰到的错误
  • SwizzleMethod 黑魔法
  • text-decoration与color属性
  • Windows Containers 大冒险: 容器网络
  • 关于使用markdown的方法(引自CSDN教程)
  • 官方解决所有 npm 全局安装权限问题
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 面试总结JavaScript篇
  • 区块链技术特点之去中心化特性
  • 深度学习入门:10门免费线上课程推荐
  • 数据结构java版之冒泡排序及优化
  • 通信类
  • 王永庆:技术创新改变教育未来
  • 项目管理碎碎念系列之一:干系人管理
  • 新手搭建网站的主要流程
  • PostgreSQL 快速给指定表每个字段创建索引 - 1
  • 阿里云服务器如何修改远程端口?
  • 资深实践篇 | 基于Kubernetes 1.61的Kubernetes Scheduler 调度详解 ...
  • #微信小程序(布局、渲染层基础知识)
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (顺序)容器的好伴侣 --- 容器适配器
  • (原創) 系統分析和系統設計有什麼差別? (OO)
  • (轉)JSON.stringify 语法实例讲解
  • (轉貼) 資訊相關科系畢業的學生,未來會是什麼樣子?(Misc)
  • .NET(C#、VB)APP开发——Smobiler平台控件介绍:Bluetooth组件
  • .NET高级面试指南专题十一【 设计模式介绍,为什么要用设计模式】
  • ?.的用法
  • [20180312]进程管理其中的SQL Server进程占用内存远远大于SQL server内部统计出来的内存...
  • [8-23]知识梳理:文件系统、Bash基础特性、目录管理、文件管理、文本查看编辑处理...
  • [C#]winform部署yolov5-onnx模型
  • [CSS]文字旁边的竖线以及布局知识
  • [CTO札记]盛大文学公司名称对联
  • [UI5 常用控件] 03.Icon, Avatar,Image
  • [WeChall] Training: Encodings I (Training, Encoding)
  • [笔记] 四边形不等式
  • [笔记]_ELVE_正则表达式