Condition使用
一、Condition概述
1、什么是Condition?
我们知道synchronized 锁提供了wait() 和 notify() 方法为持有锁的线程提供了等待(挂起)和
唤醒的功能。
在juc的locks包下提供了一个接口 Condition,Condition提供了类似 wait 和 notify 的功能。
Condition 主要用来线程间的通信,可以很好的代替传统的、通过监视器对象调用的
wait()、notify()、notifyAll等线程间的通信方式。
Condition 的作用是主要用来控制锁并且判断某个条件是否满足,如果不满足,那么使用该锁的
线程将会被挂起,等待另外的线程将其唤醒,与其同时被挂起的线程将会进入阻塞队列中并且
释放对显式锁Lock的持有,这与wait() 很相似
2、Condition特点
1)Condition 不能直接创建,是由某个显式锁Lock 创建的,并需要跟Lock关联;
2) 一个显式锁Lock 可以创建多个Condition对象,并与之关联。
3、接口Condition 结构
接口 Condition 的结构如下
public interface Condition {/*** 进入等待状态,并等待其他线程执行 signal() 方法来唤醒*/void await() throws InterruptedException;/*** 不可中断的等待*/void awaitUninterruptibly();/*** 带超时时间的等待,单位纳秒*/long awaitNanos(long nanosTimeout) throws InterruptedException;/*** 带有超时时间的等待*/boolean await(long time, TimeUnit unit) throws InterruptedException;/***await 等待当前线程到指定的时间点(日期)*/boolean awaitUntil(Date deadline) throws InterruptedException;/*** 通知*/void signal();//通知,线程需要重新竞争锁/*** 通知所有*/void signalAll();
}
4、接口Condition的实现类
Condition 有2个实现类,分别是:AbstractQueuedSynchronizer 的内部类 ConditionObject 和
AbstractQueuedLongSynchronizer 的内部类 ConditionObject;这2个内部类实现差不多,后
边就看下内部类 AbstractQueuedSynchronizer.ConditionObject 就行了,因为
AbstractQueuedLongSynchronizer 在jdk中并没有被使用。
基于AQS实现的锁 ReetrantLock、ReentrantReadWriteLock 和线程池ThreadPoolExecutor 都
提供了Condition 功能;就以 ReetrantLock 为例学习下Condition的使用
二、Condition 使用示例
以 ReetrantLock 为例学习下Condition的使用
1、示例(一)
创建2个线程分别对数据读写
只有共享数据发生改变时,读数据的线程才会对数据进行读取和进一步处理,
当共享数据未发生改变时读取数据的线程将会等待数据先修改再读取,
当数据未被读取时,修改共享数据的线程将会进入等待状态,只有该数据被
使用过才会进一步产生新的数据
示例代码如下:
public class ConditionExample1 {// shareData 和 dataUsed 在该示例中是共享资源private static int shareData;//定义共享数据private static boolean dataUsed;//标识当前共享数据是否被使用//创建显式锁private static final Lock lock= new ReentrantLock();//创建Condition//Codition不能直接创建,需要通过Lock来创建,并与锁进行关联private static final Condition condition = lock.newCondition();//对数据的写操作,相当于生产者private static void change(){lock.lock();try{//若当前数据未被使用,则当前线程进入 wait 状态,并释放锁,其他线程先消费数据while (!dataUsed){condition.await();}TimeUnit.SECONDS.sleep(current().nextInt(5));shareData++;dataUsed = false;System.out.println("produce the new value: "+shareData);//通知并唤醒在 wait 队列中的其他线程condition.signal();}catch (InterruptedException e){e.printStackTrace();}finally {lock.unlock();}}//使用数据,相当于消费者private static void use(){lock.lock();try{//若当前数据已经被使用,则当前线程进入 wait 状态,并释放锁while (dataUsed){condition.await();}TimeUnit.SECONDS.sleep(current().nextInt(5));dataUsed = true;System.out.println("The Shared data changed: "+shareData);//通知并唤醒在 wait 队列中的其他线程condition.signal();}catch (InterruptedException e){e.printStackTrace();}finally {lock.unlock();}}public static void main(String[] args) {new Thread(() -> {for(;;){change();}},"Producer").start();new Thread(() -> {for(;;){use();}},"Consumer").start();}
}
2、示例(二)
基于Condition 实现一个 “生产者--消费者模式”
将生产的数据放入一个缓存种(集合或队列中),消费者去消费缓存中的数据;
若缓存为空,则消费者线程需要进入等待状态,等待生产者生成数据;
若缓存已满,则生产者进入等待状态,等待消费者消耗数据。
示例代码如下:
public class ConditionExample2 {//定义显式锁private static final ReentrantLock lock = new ReentrantLock();//定义与显式锁关联的 Conditionprivate static final Condition condition = lock.newCondition();//定义 Long 类型的链表,这里链表是共享的数据资源private static final LinkedList<Long> list = new LinkedList<>();private static final int CAPACITY = 100;//定义链表的最大长度private static long i=0;//定义数据的初始值为0//生产方法public static void produce(){lock.lock();try {//若链表中的数据大于 CAPACITY,则当前线程进入阻塞状态,并释放对锁 lock 的持有while (list.size() >=CAPACITY){condition.await();}//若list 的大小不足 CAPACITY,则执行下面的生产数据,生产数据后要通知其他线程消费数据i++;list.add(i);System.out.println(currentThread().getName()+" -> "+i);condition.signalAll();} catch (InterruptedException e) {e.printStackTrace();}finally {//释放锁lock.unlock();}}//消费方法public static void consume(){lock.lock();//链表中没有数据,消费线程需要进入阻塞状态,释放锁try {while (list.isEmpty()){condition.await();}//若链表不为空,则消费数据long n = list.removeFirst();System.out.println(currentThread().getName()+" -> "+n);condition.signalAll();} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}private static void sleep(){try {TimeUnit.SECONDS.sleep(current().nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {//启动10个生产者线程IntStream.range(0,10).forEach(i -> {new Thread(() -> {for(;;){produce();sleep();}},"Producer--"+i).start();});//启动5个消费者线程IntStream.range(0,5).forEach(i -> {new Thread(() -> {for(;;){consume();sleep();}},"Consumer--"+i).start();});}}
3、示例(三)
优化示例(二)的代码
给一个ReetrantLock 锁关联2个Condition(多个),生产者一个,消费者一个
示例代码如下:
/*** 基于 Condition 实现 生产者--消费者模式** 创建2个跟显式锁相关联的Condition,分别是生产者的Condition 和 消费者的 Condition*/
public class ConditionExample3 {//定义显式锁private static final ReentrantLock lock = new ReentrantLock();//生产者线程的 Conditionprivate static final Condition EMPTY_CONDITION = lock.newCondition();//消费者线程的 Conditionprivate static final Condition FULL_CONDITION = lock.newCondition();//定义 Long 类型的链表,这里链表是共享的数据资源private static final LinkedList<Long> list = new LinkedList<>();private static final int CAPACITY = 100;//定义链表的最大长度private static long i=0;//定义数据的初始值为0//生产方法public static void produce(){lock.lock();try {//若链表中的数据大于 CAPACITY,则当前线程进入阻塞状态,并释放对锁 lock 的持有while (list.size() >=CAPACITY){FULL_CONDITION.await();}//若list 的大小不足 CAPACITY,则执行下面的生产数据,生产数据后要通知其他线程消费数据i++;list.add(i);System.out.println(currentThread().getName()+" -> "+i);//唤醒消费者EMPTY_CONDITION.signalAll();} catch (InterruptedException e) {e.printStackTrace();}finally {//释放锁lock.unlock();}}//消费方法public static void consume(){lock.lock();//链表中没有数据,消费线程需要进入阻塞状态,释放锁try {while (list.isEmpty()){EMPTY_CONDITION.await();}//若链表不为空,则消费数据long n = list.removeFirst();System.out.println(currentThread().getName()+" -> "+n);//唤醒生产者FULL_CONDITION.signalAll();} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}}private static void sleep(){try {TimeUnit.SECONDS.sleep(current().nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {//启动10个生产者线程IntStream.range(0,10).forEach(i -> {new Thread(() -> {for(;;){produce();sleep();}},"Producer--"+i).start();});//启动5个消费者线程IntStream.range(0,5).forEach(i -> {new Thread(() -> {for(;;){consume();sleep();}},"Consumer--"+i).start();});}}
三、Condition 原理之ConditionObject
以Condition在AQS 中的实现类 AbstractQueuedSynchronizer.ConditionObject 来看下
Condition的 “等待/通知” 的实现原理。
ConditionObject 维护了一个单项的条件等待队列,进入等待队列的线程节点会被放入
ConditionObject的条件等待队列中,当有其他线程唤醒等待的线程时,会把等待的线程
节点从条件等待队列中转移到AQS的阻塞队列中。
1、ConditionObject 核心属性
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** First node of condition queue.* 等待队列第一个节点** todo 注意:* 虽然Node 中有 prev和next 这2个属性,但在ConditionObject 中是不会使用这2个属性的;* 也可以认为在Conditon 条件等待队列中,这2个属性的值都为null;* 在ConditionObject 中只会使用Node 的nextWaiter 属性实现单项列表的效果* */private transient Node firstWaiter;/** Last node of condition queue.* 等待队列最后一个节点* */private transient Node lastWaiter;/*** Creates a new {@code ConditionObject} instance.*/public ConditionObject() { }
}
2、await() 方法解析
await 是Condition 的核心方法,线程调用await()方法进入等待状态,
await方法代码:
public final void await() throws InterruptedException {//若线程被中断,则抛出异常if (Thread.interrupted())throw new InterruptedException();//将当前线程以条件等待的模式添加到等待队列中//将当前线程包装等等待节点,然后添加到条件等待队列中Node node = addConditionWaiter();//执行release操作当前线程释放锁,并唤醒竞争队列的节点// 释放当前线程所持有的锁,并返回同步状态// 注意:当前线程的state变量,也即控制可重入的变量需要保存,因为在后边唤醒后需要恢复状态int savedState = fullyRelease(node);//中断模式int interruptMode = 0;//节点node未放如AQS阻塞队列之前一直阻塞while (!isOnSyncQueue(node)) {//阻塞当前线程LockSupport.park(this);//相应中断//检查当前线程的中断状态,若当前线程被中断了,则退出自旋(即退出等待)if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// acquireQueued 开始竞争锁//若node 中的当前线程尝试获取锁失败,当前线程的已经被中断(线程被中断会退出阻塞),且//中断模式不是 THROW_IE,则设置当前线程的中断模式为 REINTERRUPTif (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;//REINTERRUPT 表示退出之后的中断状态//若node 的下一个等待节点不为null,则从等待队列中清除已取消的节点if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
3、addConditionWaiter() 方法解析
/*** 将当前线程包装成新的等待节点,并添加到到条件等待队列** @return its new wait node 返回一个新的等待节点*/private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.// 如果等待队列最后一个节点 lastWaiter 被取消了(即 不是等待状态,即waitStatus != -1),则把其从等待队列中删除if (t != null && t.waitStatus != Node.CONDITION) {//清除等待队列中已经取消的的等待节点unlinkCancelledWaiters();t = lastWaiter;}//创建新的等待节点,并添加到等待队列中Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)//等待条件队列为空firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
4、fullyRelease(Node node)
该方法功能是当前线程释放所持有的锁资源,若释放失败,则需取消ConditionObject
条件等待队列中的节点。
注意:fullyRelease 是AQS的方法
fullyRelease 方法解析:
final int fullyRelease(Node node) {boolean failed = true;try {//先获取当前线程节点状态值int savedState = getState();//释放所有状态(即释放节点node所持有的锁),并唤醒后继节点//todo 注意:此时当前线程节点一定是AQS阻塞队列的头节点,因为node线程持有了锁资源,// 若线程没有持有锁,则会抛出异常//release() 执行成功,表示等待的条件节点node 被唤醒了,被唤醒时节点同时会被取消等待,if (release(savedState)) {failed = false;return savedState;} else {/*** 释放锁失败,出现非法监视器状态异常* 一般如未加锁直接wait、unlock会出现这个问题*/throw new IllegalMonitorStateException();}} finally {//如果release()执行失败,则把节点node的状态置为取消if (failed)//try块执行异常,取消正在正在等待的节点node//todo 注意:这里的node是ConditionObject条件等待队列的节点node.waitStatus = Node.CANCELLED;}}
5、isOnSyncQueue(Node node)
该方法作用是判断条件等待队列中的节点node是否存在于AQS的阻塞队列中,若存在则
返回true,否则返回fasle
isOnSyncQueue 方法也是AQS的方法
isOnSyncQueue 方法解析:
final boolean isOnSyncQueue(Node node) {//若节点node的状态值waitStatus=CONDITION,那么节点node一定在ConditionObjectde 条件阻塞队列中//若节点node的prev=null,那么节点node一定不在AQS的竞争队列中,因为AQS竞争队列中的节点prev一定不为null,//且prev前置节点状态值可能是SIGNAL(AQS队列中头节点代表了假节点,不算是AQS队列中的节点,只是为了获取后继的节点而设立的// AQS中的头结点唯一的意义是“头结点表示获取到了锁的节点”)if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//若node的next不为null,表示节点node一定在AQS竞争队列上//前边aqs竞争队列新增节点是先修改tail节点,再修改前置节点if (node.next != null)return true;//表示一瞬间没有及时修改node的next节点,需要从aqs竞争队列的队尾tail开始查找节点nodereturn findNodeFromTail(node);
}private boolean findNodeFromTail(Node node) {//从尾指针往前遍历,直到找到节点nodeNode t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}
6、unlinkCancelledWaiters()
该方法作用是从ConditionObject 条件等待队列中删除已经取消的等待节点。
仅在锁定时调用。当条件等待期间发生取消时,以及当lastWaiter被视为已取消时插入一个
新的waiter时,将调用此函数。这种方法是为了在没有信号的情况下避免垃圾的保留。因此,
即使它可能需要一个完整的遍历,它只有在超时或取消发生在没有信号时才会发挥作用。它
遍历所有节点,而不是停止在一个特定的目标,解除所有指向垃圾节点的指针,而不需要在
取消风暴期间多次重新遍历。
unlinkCancelledWaiters方法解析
private void unlinkCancelledWaiters() {//等待队列的第一个节点Node t = firstWaiter;Node trail = null;//遍历等待队列中状态waitStatus != CONDITION 的节点并删除while (t != null) {//下一个等待节点Node next = t.nextWaiter;//若节点t 的waitStatus 的值不等于 Node.CONDITION,则将t.nextWaiter 设置为null,即节点t 没有等待节点if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;//移动等待队列的头节点if (trail == null)firstWaiter = next;else/** 上一个状态waitStatus = CONDITION 的节点跳过指向当前节点在等待队列中的下一个节点,* 即从等待队列中删除当前状态waitStatus != CONDITION 的节点*/trail.nextWaiter = next;if (next == null)lastWaiter = trail;}else//当 节点t 的waitStatus 的值不等于 Node.CONDITION,则将 节点t 赋值给 trailtrail = t;//更新节点t ,向后移动t = next;}}
7、awaitUninterruptibly()方法解析
该方法作用是实现不可中断的等待
awaitUninterruptibly 方法如下:
public final void awaitUninterruptibly() {//将当前线程添加到条件等待队列中Node node = addConditionWaiter();//释放节点node 中线程持有的锁,并取消节点nodeint savedState = fullyRelease(node);boolean interrupted = false;//节点node 自旋,若节点node 不在AQS阻塞队列中(即节点node 没有处于同步等待中),则阻塞(挂起)当前线程while (!isOnSyncQueue(node)) {LockSupport.park(this);//记录中断状态if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)//中断当前线程selfInterrupt();}
8、singnal() 方法解析
该方法的作用时唤醒ConditionObject 条件等待队列中等待最长时间的线程,即firstWaiter节点
的线程,并将 firstWaiter 节点移动到AQS的阻塞队列中。
singnal() 方法如下:
public final void signal() {//isHeldExclusively()判断当前线程释放持有锁,若不持有锁,调用该方法则抛出异常//signal被唤醒时当前线程一定持有互斥锁if (!isHeldExclusively())throw new IllegalMonitorStateException();//唤醒条件等待队列中的第一个线程,即等待时间最长的线程Node first = firstWaiter;if (first != null)doSignal(first);}
9、doSignal(Node first) 方法
该方法作用是唤醒节点first 中的线程
将condition等待节点从ConditionObject条件等待队列移动到AQS竞争队列,尽量
在AQS阻塞队列中给first找个好位置,如果没有找到,就唤醒自己
doSignal 方法如下:
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null) //第二次循环 firstWaiter 一定为nulllastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&//取消节点first失败(first = firstWaiter) != null);//firstWaiter 不等于null}
10、transferForSignal(Node node)
该方法功能是 将节点从ConditionObject条件队列移动到AQS同步队列。如果成功返回true。
注意:
1)transferForSignal 是AQS的方法
2)transferForSignal 中可以发现并不是立即唤醒node节点线程
transferForSignal 方法代码如下:
final boolean transferForSignal(Node node) {/** CAS避免中断唤醒竞争* 将节点node状态修改为0,0=初始化*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//将节点插入AQS竞争队列中,并返回 node 的前置节点Node p = enq(node);//节点node添加到aqs竞争队列之后,前驱节点节点p的状态可能发生改变int ws = p.waitStatus;/*** 如果节点p 被取消(即p 关联的线程被取消)或* 将节点 p 的waitStatus 的值修改为SIGNAL(表示P的后继节点是等待唤醒)失败,* 则唤醒节点p的后继节点 node 节点中的线程*/if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//唤醒node 中的线程//唤醒等待条件变量的线程,让其调用acquireQueue()方法将自己调整位置,并再次阻塞或获取锁LockSupport.unpark(node.thread);return true;}
11、signalAll() 方法解析
signalAll 与 signal的区别是,signal只唤醒ConditionObiect 条件等待队列中一个节点,即
firstWaiter,而 firstWaiter 是唤醒 ConditionObiect 条件等待队列中 的所有节点。
signalAll 代码如下:
public final void signalAll() {//判断当前线程是否持有锁if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;//唤醒 first 之后的所有线程do {Node next = first.nextWaiter;first.nextWaiter = null;//唤醒 first 节点中的线程transferForSignal(first);first = next;} while (first != null);}