当前位置: 首页 > news >正文

科普文:JUC系列之ForkJoinPool源码解读ForkJoinWorkerThread

科普文:JUC系列之ForkJoinPool基本使用及原理解读-CSDN博客

科普文:JUC系列之ForkJoinPool源码解读概叙-CSDN博客

科普文:JUC系列之ForkJoinPool源码解读WorkQueue-CSDN博客

科普文:JUC系列之ForkJoinPool源码解读ForkJoinTask-CSDN博客

ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

一、类结构及其成员变量

1.1 类结构和注释

类结构代码如下:

public class ForkJoinWorkerThread extends Thread {}

ForkJoinWorkerThread继承了Thread类,ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

1.2 常量

主要有两个:

final ForkJoinPool pool;                // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。

这是两个final修饰的常量,智能初始化一次。

二、构造函数

2.1 ForkJoinWorkerThread(ForkJoinPool pool)

protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);
}

其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。构造函数中最主要的方法就是registerWorker。

2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,AccessControlContext acc) {super(threadGroup, null, "aForkJoinWorkerThread");U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);eraseThreadLocals(); // clear before registeringthis.pool = pool;this.workQueue = pool.registerWorker(this);
}

这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。
之后调用方法eraseThreadLocals将threadLocals清除。
之后与同用的构造函数一致。
擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

final void eraseThreadLocals() {U.putObject(this, THREADLOCALS, null);U.putObject(this, INHERITABLETHREADLOCALS, null);
}

实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

三、核心方法

3.1 run

做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

public void run() {//如果workQueue为空,则抛出异常if (workQueue.array == null) { // only run onceThrowable exception = null;try {//调用onStart方法onStart();//调用pool的runWorker方法,运行workQueuepool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {//操作完之后处理try {//调用onTermination方法onTermination(exception);} catch (Throwable ex) {//如果出现异常,则进行异常处理if (exception == null)exception = ex;} finally {//最终需要指向deregister方法pool.deregisterWorker(this, exception);}}}
}

runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。
其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

3.2 定义的可扩展方法

由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

protected void onStart() {
}protected void onTermination(Throwable exception) {
}

这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

四、内部类InnocuousForkJoinWorkerThread

这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。

源码如下:

static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {/** The ThreadGroup for all InnocuousForkJoinWorkerThreads */private static final ThreadGroup innocuousThreadGroup =createThreadGroup();/** An AccessControlContext supporting no privileges */private static final AccessControlContext INNOCUOUS_ACC =new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, null)});InnocuousForkJoinWorkerThread(ForkJoinPool pool) {super(pool, innocuousThreadGroup, INNOCUOUS_ACC);}@Override // to erase ThreadLocalsvoid afterTopLevelExec() {eraseThreadLocals();}@Override // to always report system loaderpublic ClassLoader getContextClassLoader() {return ClassLoader.getSystemClassLoader();}@Override // to silently failpublic void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }@Override // paranoicallypublic void setContextClassLoader(ClassLoader cl) {throw new SecurityException("setContextClassLoader");}private static ThreadGroup createThreadGroup() {try {sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();Class<?> tk = Thread.class;Class<?> gk = ThreadGroup.class;long tg = u.objectFieldOffset(tk.getDeclaredField("group"));long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));ThreadGroup group = (ThreadGroup)u.getObject(Thread.currentThread(), tg);while (group != null) {ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);if (parent == null)return new ThreadGroup(group,"InnocuousForkJoinWorkerThreadGroup");group = parent;}} catch (Exception e) {throw new Error(e);}// fall through if null as cannot-happen safeguardthrow new Error("Cannot create ThreadGroup");}
}

AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

五、ForkJoinPool中创建工作线程的过程

此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

5.1 makeCommonPool创建过程

再ForkJoinPool重的makeCommPool,有如下代码:

if (factory == null) {if (System.getSecurityManager() == null)factory = defaultForkJoinWorkerThreadFactory;else // use security-managed defaultfactory = new InnocuousForkJoinWorkerThreadFactory();
}

这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

5.2 createWorker创建过程

ForkJoinWorkerThreadFactory fac = factory;
try {if (fac != null && (wt = fac.newThread(this)) != null) {wt.start();return true;}
} catch (Throwable rex) {ex = rex;
}

也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

5.3 InnocuousForkJoinWorkerThreadFactory

static final class InnocuousForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {private static final AccessControlContext innocuousAcc;static {Permissions innocuousPerms = new Permissions();innocuousPerms.add(modifyThreadPermission);innocuousPerms.add(new RuntimePermission("enableContextClassLoaderOverride"));innocuousPerms.add(new RuntimePermission("modifyThreadGroup"));innocuousAcc = new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, innocuousPerms)});}public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinWorkerThread>() {public ForkJoinWorkerThread run() {return new ForkJoinWorkerThread.InnocuousForkJoinWorkerThread(pool);}}, innocuousAcc);}
}

5.4 DefaultForkJoinWorkerThreadFactory

static final class DefaultForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 悠易科技周文彪:创始人专注度很重要,一旦战略分散无法形成合力 | 中国广告营销行业资本报告深访④
  • LeetCode | 441 | 排列硬币 | 二分查找
  • 计算机组成原理 —— 指令流水线影响因素分类
  • 提示词工程
  • 微信小程序--实现地图定位---获取经纬度
  • 打造智能障碍物检测系统:从零开始的深度学习项目
  • 【启明智显方案分享】6.86寸高清显示屏音频效果器解决方案
  • 基于NSGAII的的柔性作业调度优化算法MATLAB仿真,仿真输出甘特图
  • shell常用命令
  • 超越标注:合成数据引领下的文本嵌入技术革新
  • 科普文:JUC系列之多线程门闩同步器Semaphore的使用和源码解读
  • FreeRTOS实现低功耗管理
  • 如何将PostgreSQL的数据实时迁移到SelectDB?
  • 10. Docker 使用案例
  • Casper 安全事件已解决;区块链网络恢复运行
  • [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  • [译] 怎样写一个基础的编译器
  • 30天自制操作系统-2
  • Apache Spark Streaming 使用实例
  • C++类的相互关联
  • codis proxy处理流程
  • Golang-长连接-状态推送
  • JDK9: 集成 Jshell 和 Maven 项目.
  • leetcode98. Validate Binary Search Tree
  • nginx 配置多 域名 + 多 https
  • PAT A1120
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • Three.js 再探 - 写一个跳一跳极简版游戏
  • Unix命令
  • Vue 动态创建 component
  • webpack项目中使用grunt监听文件变动自动打包编译
  • -- 查询加强-- 使用如何where子句进行筛选,% _ like的使用
  • 初探 Vue 生命周期和钩子函数
  • 多线程 start 和 run 方法到底有什么区别?
  • 工作手记之html2canvas使用概述
  • 简单基于spring的redis配置(单机和集群模式)
  • 使用 QuickBI 搭建酷炫可视化分析
  • 微信小程序开发问题汇总
  • 以太坊客户端Geth命令参数详解
  • 原生JS动态加载JS、CSS文件及代码脚本
  • 在Docker Swarm上部署Apache Storm:第1部分
  • 在weex里面使用chart图表
  • 终端用户监控:真实用户监控还是模拟监控?
  • ​一些不规范的GTID使用场景
  • #13 yum、编译安装与sed命令的使用
  • #if #elif #endif
  • (C语言)球球大作战
  • (delphi11最新学习资料) Object Pascal 学习笔记---第13章第6节 (嵌套的Finally代码块)
  • (Matlab)使用竞争神经网络实现数据聚类
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (九)c52学习之旅-定时器
  • (三)Hyperledger Fabric 1.1安装部署-chaincode测试
  • (十三)Flask之特殊装饰器详解
  • (四)库存超卖案例实战——优化redis分布式锁