当前位置: 首页 > news >正文

FutureTask源码分析

        Thread类的run方法返回值类型是void,因此我们无法直接通过Thread类获取线程执行结果。如果要获取线程执行结果就需要使用FutureTask。用法如下:

class CallableImpl implements Callable{@Overridepublic Object call() throws Exception {//do somethings//return result;}
}
FutureTask futureTask = new FutureTask(new CallableImpl());
new Thread(futureTask).start();
try {//获取线程执行结果Object result = futureTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}

        在实例化FutureTask时构造函数传入了实现Callable接口的实例。而在实例化Thread类时,构造函数传入FutureTask实例。因此,我们可以猜测线程在执行run方法时必定会调用call方法,并且保存call方法返回的结果。

总览

    通过类图,可以看到FutureTask主要实现了Runnable和Future。实现Runnable的run方法作为线程的执行体。正因为实现了Runnable,我们才可以使用FutureTask来创建线程。Future接口定义了如下几个方法
public interface Future<V> {
/***取消线程执行的任务*/
boolean cancel(boolean mayInterruptIfRunning);
/***判断任务是否被取消*如果任务在正常完成前因调用cancel方法而被取消,返回true*/
boolean isCancelled();
/*** 判断任务是否完成,如果任务已经完成,返回true* 完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回true。*/
boolean isDone();
/*** 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程* 直到任务完成或者被中断*/
V get() throws InterruptedException, ExecutionException;
/*** 获取任务执行的结果,调用该方法时,如果任务还没有执行完成,将会阻塞当前线程* 直到任务完成或者被中断或者超时*/
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

      可以看出FutureTask具备获取线程任务执行结果、取消线程任务的能力。

成员变量

public class FutureTask<V> implements RunnableFuture<V> {//state标识任务的运行状态private volatile int state;//新建状态,这是任务的最初状态private static final int NEW          = 0;//正在完成,任务执行体已经完成,正在保存执行结果private static final int COMPLETING   = 1;//任务正常完成private static final int NORMAL       = 2;//任务执行过程中发生异常private static final int EXCEPTIONAL  = 3;//任务被取消private static final int CANCELLED    = 4;//正在中断运行任务的线程private static final int INTERRUPTING = 5;//任务被中断private static final int INTERRUPTED  = 6;//任务的执行体,任务完成后,将会设置成nullprivate Callable<V> callable;//任务执行体的返回结果private Object outcome; //运行callable的线程private volatile Thread runner;//等待任务执行结果的线程队列private volatile WaitNode waiters;static final class WaitNode {//当前节点代表的线程volatile Thread thread;//下一个节点volatile WaitNode next;WaitNode() { thread = Thread.currentThread(); }}}

         成员变量的含义只有在分析具体的方法代码和作者的注释时才能知晓。接下来具体分析FutureTask是如何实现保存任务执行结果和获取结果的。

源码分析

        在分析FutureTask源码前,需要对其中使用到jdk的方法做个简单的介绍。其中Unsafe类提供的cas操作的相关方法。

public final native boolean compareAndSwapObject(Object obj, 
long offset, Object expect, Object update);
  • obj :要修改字段的对象;
  • offset :要修改的字段在对象内的偏移量;
  • expect : 字段的期望值;
  • update :如果该字段的值等于字段的期望值,用于更新字段的新值;

   LockSupport的park和unpark提供了阻塞和解除阻塞线程的有效方法,park会使当前线程阻塞,unpark可以唤醒指定的线程。

public static void park() {UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {if (thread != null)UNSAFE.unpark(thread);
}

构造函数

public FutureTask(Callable<V> callable) {if (callable == null)       throw new NullPointerException();    this.callable = callable;    this.state = NEW;       
}

接收callable实例并赋值给成员变量callable,将任务状态初始化为NEW。

run方法

public void run() {//先检查任务的状态,如果任务状态是NEW。利用cas操作设置runner为当前执行任务的线程//这里是为了确保在多线程的情况下任务执行和结果设置的安全性及一致性//比如下面的代码,会导致一个任务在多个线程中运行。//  FutureTask futureTask = new FutureTask(task);//  futureTask.run();//  Thread thread = new Thread(futureTask);//  Thread thread1 = new Thread(futureTask);//  thread.start();//  thread1.start();if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//调用callable方法,执行真正的任务逻辑result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//执行异常处理,将任务状态修改为EXCEPTIONALsetException(ex);}if (ran)//任务执行体运行成功,保存任务结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//handlePossibleCancellationInterrupt需要结合cancel方法分析if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//保存任务运行结果outcome = v;//将任务状态设置为正常完成UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列finishCompletion();}
}protected void setException(Throwable t) {//在outcome还没有保存返回结果前,先将任务状态设置为COMPLETING(正在完成)if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将任务的结果设置为异常信息outcome = t;//将任务状态设置为EXCEPTIONAL(异常中断)UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); //结束任务:唤醒所有因调用get方法而阻塞的线程,并清空等待队列finishCompletion();}
}/***唤醒所有因调用get方法而阻塞的线程,并清空等待队列*/
private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {//先将成员变量waiters设置为nullif (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//从头开始遍历等待队列,唤醒其每个节点对应的线程for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}//获取下一个节点WaitNode next = q.next;if (next == null)break;//当前节点next指向null,当前节点从等待队列中断开,之后被GC回收q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint
}

从run方法中,FutureTask的生命周期和线程的生命周期有一定的关联:

        当FutureTask的state为NEW时,执行任务的线程可能处于New状态、Runable状态(线程在操作系统中被创建,处于等待CPU时间或运行中)、Blocked状态(线程在等待锁)。

        当程序调用call方法后,在将call的执行结果保存到FutureTask的成员变量outcome前,会将FutureTask设置为COMPLETING。此时FutureTask的COMPLETING 对应线程的Runable状态。

        如果程序调用call发生异常,FutureTask最终被设置为EXCEPTIONAL,正常执行则被设置为NORMAL,此时线程即将进入Terminated状态。

get方法

/***无限时长的等待获取执行结果*/
public V get() throws InterruptedException, ExecutionException {int s = state;//COMPLETING代表任务正在保存执行结果。<=这个状态,说明任务执行还没有保存执行结果//则会调用awaitDone方法等待执行结果。if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}
/***有限时长的等待获取执行结果*/
public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}/***等待任务完成,在被中断或超时时终止*/
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;//当前节点是否在等待队列中boolean queued = false;for (;;) {//检查当前获取结果的线程是否被中断,如果被中断,从等待队列中移除,并抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)//1、创建一个新的节点q = new WaitNode();else if (!queued)//如果新的节点没有在排队,将这个节点加入到队列的头部queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {//判断是否超时nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}//阻塞当前线程LockSupport.parkNanos(this, nanos);}else阻塞当前线程LockSupport.park(this);}
}
/**
*遍历等待队列移除节点
*/
private void removeWaiter(WaitNode node) {if (node != null) {//首先将节点thread设置为null,防止节点被意外地再次使用或唤醒//同时thread =null的节点是作为需要被移除节点的标记node.thread = null;retry:for (;;) {          // restart on removeWaiter race//声明pre q s 三个WaitNode变量for (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next;//如果节点的thread !=null说明当前节点不需要被移除,遍历下一个if (q.thread != null)pred = q;else if (pred != null) {//上一个节点pred != null,说明当前节点不是队列的第一个节点//则将pred.next指向当前节点的下一个节点s,即跳过了当前节点pred.next = s;if (pred.thread == null) // check for race//队列在遍历过程中发生了变化,从头开始遍历continue retry;}//如果当前节点是头节点,将头节点设置为当前节点的下一个节点else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))//队列在遍历过程中发生了变化,从头开始遍历continue retry;}//完成对等待队列的遍历,成功移除了节点(无论是通过更新队列头部还是通过跳过内部节点)//退出break;}}
}

cancel方法

    public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}

cancel返回值是boolean类型,任务取消成功,返回true,任务取消失败返回false;参数mayInterruptIfRunning表示是否尝试取消正在执行中的任务。

1、如果任务状态不是NEW状态,直接返回false,通过对run方法的分析,可以知道当FutureTask状态不为NEW时,任务已经被取消或者已经执行了call方法,无法取消任务。

2、任务状态是NEW

        2.1 参数mayInterruptIfRunning为false,设置任务状态为CANCELLED,从run方法中可以得到,正在执行的任务不会被取消,还未开始的任务会被取消。

        2.2 参数mayInterruptIfRunning为true,尝试调用正在执行任务的线程的interrupt()方法(在一个线程内部存在着名为interrupt flag的标识,如果一个线程被interrupt,那么它的flag将被设置,但是如果当前线程正在执行可中断方法被阻塞时,如Object的wait方法,Thread的sleep、join方法等,调用interrupt方法将其中断,反而会导致flag被清除)

再回过头看看run方法最后的handlePossibleCancellationInterrupt

public void run() {......finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;//handlePossibleCancellationInterrupt需要结合cancel方法分析if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}private void handlePossibleCancellationInterrupt(int s) {// It is possible for our interrupter to stall before getting a// chance to interrupt us.  Let's spin-wait patiently.if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); // wait out pending interrupt// assert state == INTERRUPTED;// We want to clear any interrupt we may have received from// cancel(true).  However, it is permissible to use interrupts// as an independent mechanism for a task to communicate with// its caller, and there is no way to clear only the// cancellation interrupt.//// Thread.interrupted();}

        这个方法到底有什么作用呢,作者通过源代码注释告诉我们这里的自旋目的是如果其他线程调用了cancel(true)方法,确保中断只能传递给当前执行任务的线程runner,并且state在runner线程执行期间最终能够被设置为INTERRUPTED。当线程调用cancel(true)方法方法时,先将任务状态设置为INTERRUPTING,再执行运行任务线程的中断方法,最后将任务状态设置为INTERRUPTED。执行任务的线程检测到有其他线程正在中断任务时,会等待完成中断操作后再退出。

        通过对源码的阅读,我们大致了解到了:任务是如何执行并且保存执行结果,完成任务后,如何唤醒等待获取执行结果的线程。在获取执行结果时,如果任务还未完成,如何进入等待队列,如果等待超时或者被中断,如何从等待队列中移除。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 江科大笔记—STM32课程简介
  • 杭州电子科技大学《2020年+2021年861自动控制原理真题》 (完整版)
  • nginx基础篇(一)
  • docker-compose up 报错:KeyError: ‘ContainerConfig‘
  • python学习第十节:爬虫基于requests库的方法
  • 什么是区块链,以及应用场景
  • 钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句
  • STM32——输入捕获
  • 工业一体机在汽车零部件工厂ESOP系统中的关键作用
  • mysql事务的隔离级别学习
  • 基于SpringBoot+Vue+MySQL的家乡特色推荐系统
  • 视觉SLAM ch5——相机与图像
  • C++:内部类,匿名对象,操作符new与delete
  • Linux环境使用Git同步教程
  • 大模型学习起步的经验分享
  • 【Leetcode】101. 对称二叉树
  • 4个实用的微服务测试策略
  • Akka系列(七):Actor持久化之Akka persistence
  • Git的一些常用操作
  • GraphQL学习过程应该是这样的
  • HTTP中GET与POST的区别 99%的错误认识
  • in typeof instanceof ===这些运算符有什么作用
  • js ES6 求数组的交集,并集,还有差集
  • mysql 数据库四种事务隔离级别
  • NSTimer学习笔记
  • PHP CLI应用的调试原理
  • React 快速上手 - 07 前端路由 react-router
  • SSH 免密登录
  • 从setTimeout-setInterval看JS线程
  • 排序算法学习笔记
  • 如何用vue打造一个移动端音乐播放器
  • 事件委托的小应用
  • 算法---两个栈实现一个队列
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 我感觉这是史上最牛的防sql注入方法类
  • 与 ConTeXt MkIV 官方文档的接驳
  • HanLP分词命名实体提取详解
  • 分布式关系型数据库服务 DRDS 支持显示的 Prepare 及逻辑库锁功能等多项能力 ...
  • ​用户画像从0到100的构建思路
  • #if和#ifdef区别
  • #Linux(帮助手册)
  • $jQuery 重写Alert样式方法
  • (04)odoo视图操作
  • (k8s)Kubernetes 从0到1容器编排之旅
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (pojstep1.3.1)1017(构造法模拟)
  • (附源码)springboot掌上博客系统 毕业设计063131
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (六)c52学习之旅-独立按键
  • (图)IntelliTrace Tools 跟踪云端程序
  • (万字长文)Spring的核心知识尽揽其中
  • (学习总结16)C++模版2
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)真正的中国天气api接口xml,json(求加精) ...
  • (转载)利用webkit抓取动态网页和链接