当前位置: 首页 > news >正文

completablefuture 线程池_线程池ForkJoinPool简介

ForkJoinPool线程池最大的特点就是分叉(fork)合并(join),将一个大任务拆分成多个小任务,并行执行,再结合工作窃取模式(worksteal)提高整体的执行效率,充分利用CPU资源。

一. 应用场景

ForkJoinPool使用分治算法,用相对少的线程处理大量的任务,将一个大任务一拆为二,以此类推,每个子任务再拆分一半,直到达到最细颗粒度为止,即设置的阈值停止拆分,然后从最底层的任务开始计算,往上一层一层合并结果,简单的流程如下图:

61ebe4cd69bf37fe5d9671714802c4f0.png

从图中可以看出ForkJoinPool要先执行完子任务才能执行上一层任务,所以ForkJoinPool适合在有限的线程数下完成有父子关系的任务场景,比如:快速排序,二分查找,矩阵乘法,线性时间选择等场景,以及数组和集合的运算。

下面是个简单的代码示例计算从1到1亿之间所有数字之和:

package com.javakk;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;import java.util.stream.LongStream;/** * ForkJoinPool求和 * @author 老K */public class ForkJoinPoolTest {    private static ForkJoinPool forkJoinPool;    /**     * 求和任务类继承RecursiveTask     * ForkJoinTask一共有3个实现:     * RecursiveTask:有返回值     * RecursiveAction:无返回值     * CountedCompleter:无返回值任务,完成任务后可以触发回调     */    private static class SumTask extends RecursiveTask {        private long[] numbers;        private int from;        private int to;        public SumTask(long[] numbers, int from, int to) {            this.numbers = numbers;            this.from = from;            this.to = to;        }        /**         * ForkJoin执行任务的核心方法         * @return         */        @Override        protected Long compute() {            if (to - from < 10) { // 设置拆分的最细粒度,即阈值,如果满足条件就不再拆分,执行计算任务                long total = 0;                for (int i = from; i <= to; i++) {                    total += numbers[i];                }                return total;            } else { // 否则继续拆分,递归调用                int middle = (from + to) / 2;                SumTask taskLeft = new SumTask(numbers, from, middle);                SumTask taskRight = new SumTask(numbers, middle + 1, to);                taskLeft.fork();                taskRight.fork();                return taskLeft.join() + taskRight.join();            }        }    }    public static void main(String[] args) {        // 也可以jdk8提供的通用线程池ForkJoinPool.commonPool        // 可以在构造函数内指定线程数        forkJoinPool = new ForkJoinPool();        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();        // 这里可以调用submit方法返回的future,通过future.get获取结果        Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));        forkJoinPool.shutdown();        System.out.println("最终结果:"+result);        System.out.println("活跃线程数:"+forkJoinPool.getActiveThreadCount());        System.out.println("窃取任务数:"+forkJoinPool.getStealCount());    }}

输出结果(活跃线程数和窃取任务会根据本地环境和任务执行情况变化):

最终结果:5000000050000000活跃线程数:4窃取任务数:12

上例中在compute方法里拆分的最小粒度是10个元素,大家可以改成其他的值试下,会发现执行的效率差别很大,所以要注意拆分粒度对性能的影响。

ForkJoinPool内部的队列能够保证执行任务的顺序,至于为什么它能够在有限的线程数量下完成非常多的任务,后面会讲到。

二. 与ThreadPoolExecutor原生线程池的区别

ForkJoinPool和ThreadPoolExecutor都实现了Executor和ExecutorService接口,都可以通过构造函数设置线程数,threadFactory,可以查看ForkJoinPool.makeCommonPool()方法的源码查看通用线程池的构造细节。

在内部结构上我觉得两个线程池最大的区别是在工作队列的设计上,如下图

ThreadPoolExecutor:

f701214f616c165e5cbdac56865d308c.png

ForkJoinPool:

2a9c2a198298f10a691cf134137a0b7d.png

图上细节画得不严谨,但大致能看出区别:

  • ForkJoinPool每个线程都有自己的队列
  • ThreadPoolExecutor共用一个队列

通过上面的代码示例可以看到使用ForkJoinPool可以在有限的线程数下来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过2000万个任务。但是使用ThreadPoolExecutor是不可能的,因为ThreadPoolExecutor中的线程无法选择优先执行子任务,要完成2000万个具有父子关系的任务时,就需要2000万个线程,这样会导致ThreadPoolExecutor的任务队列撑满或创建的最大线程数把内存撑爆直接gg。

ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的不同使用场景和注意事项。

所以ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。

三. 工作窃取的实现原理

第一节的代码示例输出结果显示活跃线程是4个,但却完成了2000万个子任务,窃取任务是12个(窃取数跟拆分层级和计算复杂度有关),这是work steal工作窃取的作用。

ForkJoinPool类中的WorkQueue正是实现工作窃取的队列,javadoc中的注释如下:

3b63ec4fc62f7c6c82a82ed6021c5048.png

大意是大多数操作都发生在工作窃取队列中(在嵌套类工作队列中)。这些是特殊形式的Deques,主要有push,pop,poll操作。

Deque是双端队列(double ended queue缩写),头部和尾部任何一端都可以进行插入,删除,获取的操作,即支持FIFO(队列)也支持LIFO(栈)顺序。

Deque接口的实现最常见的是LinkedList,除此还有ArrayDeque,ConcurrentLinkedDeque等

工作窃取模式主要分以下几个步骤:

  1. 每个线程都有自己的双端队列
  2. 当调用fork方法时,将任务放进队列头部,线程以LIFO顺序,使用push/pop方式处理队列中的任务
  3. 如果自己队列里的任务处理完后,会从其他线程维护的队列尾部使用poll的方式窃取任务,以达到充分利用CPU资源的目的
  4. 从尾部窃取可以减少同原线程的竞争
  5. 当队列中剩最后一个任务时,通过cas解决原线程和窃取线程的竞争

流程大致如下所示:

23fa6f9f99d4a3ec12984cd5496eea30.png

工作窃取便是ForkJoinPool线程池的优势所在,在一般的线程池比如ThreadPoolExecutor中,如果一个线程正在执行的任务由于某种原因无法继续运行,那么该线程会处于等待状态,包括singleThreadPool,fixedThreadPool,cachedThreadPool这几种线程池。

而在ForkJoinPool中,那么线程会主动寻找其他尚未被执行的任务然后窃取过来执行,减少线程等待时间。

JDK8中的并行流(parallelStream)功能是基于ForkJoinPool实现的,另外还有java.util.concurrent.CompletableFuture异步回调future,内部使用的线程池也是ForkJoinPool。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 判断多个数据是否是两个素数相乘_循环结构程序举例2素数的判断
  • chrome 验证码识别 扩展_chrome插件市场国内无法打开,我来教你如何安装插件
  • python深度神经网络_PyTorch进阶:深度神经网络中的前馈过程(图解分析)
  • python判断能否组成三角形_如何用PYTHON判断三角形的类型 需要技巧
  • 查看python安装成功的命令_Python sqlite3模块 安装查询命令等使用讲解
  • python输入半径计算公式_python练习:取输入半径的周长和面积
  • python重命名文件或目录_命令和Python重命名文件
  • python 列表_Python 算法 09 -- 散列表
  • python粘性拓展_拓展Python Markdown
  • ping不通服务器_服务器远程桌面不上不要急,这就告诉你原因
  • 钢笔墨水能否代替打印机墨水_喷墨打印机该用染料墨水还是颜料墨水,区别在哪里?...
  • 零基础学python靠谱吗_零基础学python的我(开篇扯淡)
  • linux安装ssh_如何为Ubuntu服务器建立无密码SSH认证
  • responseentity 返回异常_Spring Boot异常处理
  • sklearn逻辑回归 极大似然 损失_算法 | 一个硬币与逻辑回归的故事
  • JavaScript-如何实现克隆(clone)函数
  • 《用数据讲故事》作者Cole N. Knaflic:消除一切无效的图表
  • 【347天】每日项目总结系列085(2018.01.18)
  • 【5+】跨webview多页面 触发事件(二)
  • 【笔记】你不知道的JS读书笔记——Promise
  • ES学习笔记(12)--Symbol
  • Java反射-动态类加载和重新加载
  • leetcode46 Permutation 排列组合
  • Python3爬取英雄联盟英雄皮肤大图
  • SQLServer插入数据
  • Vue UI框架库开发介绍
  • webpack4 一点通
  • 复习Javascript专题(四):js中的深浅拷贝
  • 技术胖1-4季视频复习— (看视频笔记)
  • 今年的LC3大会没了?
  • 如何解决微信端直接跳WAP端
  • 深度解析利用ES6进行Promise封装总结
  • 手写双向链表LinkedList的几个常用功能
  • 数据可视化之 Sankey 桑基图的实现
  • 网页视频流m3u8/ts视频下载
  • ​Base64转换成图片,android studio build乱码,找不到okio.ByteString接腾讯人脸识别
  • ​queue --- 一个同步的队列类​
  • # Redis 入门到精通(一)数据类型(4)
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • (ibm)Java 语言的 XPath API
  • (二)hibernate配置管理
  • (附源码)小程序 交通违法举报系统 毕业设计 242045
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • (轉貼) 資訊相關科系畢業的學生,未來會是什麼樣子?(Misc)
  • .NET Compact Framework 多线程环境下的UI异步刷新
  • .NET Core 成都线下面基会拉开序幕
  • .NET WPF 抖动动画
  • .Net6支持的操作系统版本(.net8已来,你还在用.netframework4.5吗)
  • .NET上SQLite的连接
  • []C/C++读取串口接收到的数据程序
  • []指针
  • [AAuto]给百宝箱增加娱乐功能
  • [Bug]使用gradio创建应用提示AttributeError: module ‘gradio‘ has no attribute ‘inputs‘