当前位置: 首页 > news >正文

并发编程之----线程池ThreadPoolExecutor,Excutors的使用及其工作原理

当前:并发编程之----线程池ThreadPoolExecutor,Excutors的使用及其工作原理

Redis高级----主从、哨兵、分片、脑裂原理-CSDN博客

计算机网络--面试知识总结一

计算机网络-----面试知识总结二

计算机网络--面试总结三(Http与Https)

计算机网络--面试总结四(HTTP、RPC、WebSocket、SSE)-CSDN博客

​​​​​​知识积累之ThreadLocal---InheritableThreadLocal总结

ThreadPoolExecutor

存放线程的容器:

private final HashSet<Worker> workers = new HashSet<Worker>();

构造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

参数介绍:

  • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
  • maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数
  • keepAliveTime:救急线程最大存活时间,当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等到 keepAliveTime 时间超过销毁
  • unit:keepAliveTime 参数的时间单位
  • workQueue:阻塞队列,存放被提交但尚未被执行的任务
  • threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字
  • handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略
    RejectedExecutionHandler 下有 4 个实现类:
    • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,默认策略
    • CallerRunsPolicy:让调用者运行的调节机制,将某些任务回退到调用者,从而降低新任务的流量即只要线程池没有关闭,就由提交任务的当前线程处理。(比如主线程中调用的线程池处理任务,然后当最大线程都在工作,工作队列也满了,那么执行该拒绝策略就是,被拒绝的任务回到调用线程池的主线程来进行处理
      • 应用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
    • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常
    • DiscardOldestPolicy:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交当前任务
      • 使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。

补充:其他框架拒绝策略

    • Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
    • Netty:创建一个新线程来执行任务
    • ActiveMQ:带超时等待(60s)尝试放入队列
    • PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

工作原理:
  1. 创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
  1. 当调用 execute() 方法添加一个请求任务时,线程池会做如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
    • 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行(可以理解为将其理解为核心线程和救急线程,核心线程一旦调用了execute方法,那么就会被创建并且不被销毁,而救急线程只有当阻塞队列满了之后还有任务来才创建,并且当创建出来的线程后面没有任务执行时,超过存活时间,就会被销毁)
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  1. 当一个线程完成任务时,会从队列中取下一个任务来执行
  1. 当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小

注意:救急线程的前提是有界队列的,如果使用的队列不是有界队列那么创建的就都是核心线程

注意:当有新任务来了,此时有现成处于空闲状态,那么新任务一定是有限被空闲线程所处理的,而不是将新任务放入队列中,然后再将队首任务进行消费,相当于不管队列满不满,只要有新任务来,如果核心线程有空闲,那么有限将新任务由核心线程来处理,而不是放入队列中

自定义拒绝策略

因为在自定义线程池的时候最后一个参数就是拒绝策略,其是一个接口,RejectedExcutionHandler ,那么我们自定义异常就可以直接实现该接口就可以了

实现

public class Pool {public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 50,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new MyThread(), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(r+"被拒绝");}});for (int i = 0; i < 100; i++) {int j=i;pool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName()+"--->"+j);});}pool.shutdown();}
}
//自定义线程工厂,用于命令线程
class MyThread implements ThreadFactory{AtomicInteger cnt=new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, ("线程" + cnt.incrementAndGet()));return thread;}
}

Executors

Executors 提供了四种线程池的创建:

  • newCachedThreadPool
  • newFixedThreadPool
  • newSingleThreadExecutor
  • newScheduledThreadPool

newFixedThreadPool:创建一个拥有 n 个线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 Integer.MAX_VALUE,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
    • 适用于任务量已知,相对耗时的长期任务
    • 注意这里就算线程执行完任务之后,线程任然会处于执行状态,并不会停止

newCachedThreadPool:创建一个可扩容的线程池
  • 其构造方法如下:
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

    • 核心线程数是 0,相当于全部是救急线程, 最大线程数是 29 个 1,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
    • SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
    • 使用场景:适合任务数比较密集,但每个任务执行时间较短的情况
    • 整体来说:整个线程池的线程数会根据任务量不断增长,没有上限,当前任务执行完毕,空闲1分钟后就会释放线程

newSingleThreadExecutor:创建一个只有 1 个线程的单线程池
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

创建只有一个核心线程的线程池,任务队列使用的是无界队列

    • 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放

对比:

  • 创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作 (即使用保持一个线程进行工作)而如果我们自己定义一个线程进行任务,当遇到异常时就会停止,后面的任务就被抛弃了
  • Executors.newSingleThreadExecutor() 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
    原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法
  • Executors.newFixedThreadPool(1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
public ScheduledThreadPoolExecutor(int corePoolSize) {// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,// 阻塞队列是 DelayedWorkQueuenew DelayedWorkQueue());
}
newScheduledThreadPool

任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:

  • 使用内部类 ScheduledFutureTask 封装任务
  • 使用内部类 DelayedWorkQueue 作为线程池队列,这个队列也可看作误解队列,他是会自动扩容的
  • 重写 onShutdown 方法去处理 shutdown 后的任务
  • 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展

构造方法:Executors.newScheduledThreadPool(int corePoolSize)

public ScheduledThreadPoolExecutor(int corePoolSize) {// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,// 阻塞队列是 DelayedWorkQueuenew DelayedWorkQueue());
}
定时任务提交方法

  • ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u):延迟执行任务
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit)定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位(相当于起点到起点之间的定时)

如果,执行任务耗时>设置的时间间隔,那么每个任务的执行间隔时间就为执行任务时间

  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,在启动线程后延时initialDelay这么长时间,即一次任务的结束到下一次任务的开始

基本使用:

  • 注意 延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行
public static void main(String[] args){// 线程池大小为1时也是串行执行ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);// 添加两个任务,都在 1s 后同时执行executor.schedule(() -> {System.out.println("任务1,执行时间:" + new Date());//int i = 1 / 0;try { Thread.sleep(2000); } catch (InterruptedException e) { }}, 1000, TimeUnit.MILLISECONDS);executor.schedule(() -> {System.out.println("任务2,执行时间:" + new Date());}, 1000, TimeUnit.MILLISECONDS);
}

  • 定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行 ,需要注意的时要是执行任务耗时大于了定时周期那么就只能等到任务执行完后继续执行
public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);System.out.println("start..." + new Date());pool.scheduleAtFixedRate(() -> {System.out.println("running..." + new Date());Thread.sleep(2000);}, 1, 1, TimeUnit.SECONDS);
}/*start...Sat Apr 24 18:08:12 CST 2021
running...Sat Apr 24 18:08:13 CST 2021
running...Sat Apr 24 18:08:15 CST 2021
running...Sat Apr 24 18:08:17 CST 2021

  • 定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔
public static void main(String[] args){ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);System.out.println("start..." + new Date());pool.scheduleWithFixedDelay(() -> {System.out.println("running..." + new Date());Thread.sleep(2000);}, 1, 1, TimeUnit.SECONDS);
}
/*start...Sat Apr 24 18:11:41 CST 2021
running...Sat Apr 24 18:11:42 CST 2021
running...Sat Apr 24 18:11:45 CST 2021
running...Sat Apr 24 18:11:48 CST 2021
开发要求

阿里巴巴 Java 开发手册要求:

  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
    • 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题
  • 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险
    Executors 返回的线程池对象弊端如下:
    • FixedThreadPool 和 SingleThreadPool:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
    • CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM
创建多大容量的线程池合适?

  • 一般来说池中总线程数是核心池线程数量两倍,确保当核心池有线程停止时,核心池外有线程进入核心池
  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存
    上下文切换:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态,任务从保存到再加载的过程就是一次上下文切换

线程池创建多少线程合适:

  • CPU 密集型任务 (N+1): 这种任务消耗的是 CPU 资源,可以将核心线程数设置为 N (CPU 核心数) + 1,比 CPU 核心数多出来的一个线程是为了防止线程发生缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 某个核心就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间
    CPU 密集型简单理解就是利用 CPU 计算能力的任务比如在内存中对大量数据进行分析
  • I/O 密集型任务: 这种系统 CPU 处于阻塞状态,用大部分的时间来处理 I/O 交互,远程RPC调用时,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用,因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N 或 CPU 核数/ (1-阻塞系数),阻塞系数在 0.8~0.9 之间
    IO 密集型就是涉及到网络读取,文件读取此类任务 ,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上

经验公式:

线程数=核数*期望CPU利用率*总时间(CPU计算时间+等待时间)/CPU计算时间

public class ExcutorsTest {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {AtomicInteger t=new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"线程--"+(t.getAndIncrement()+1));}});pool.execute(()->{System.out.println(Thread.currentThread().getName()+"1");});pool.execute(()->{System.out.println(Thread.currentThread().getName()+"2");});pool.execute(()->{System.out.println(Thread.currentThread().getName()+"3");});}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Scrapy 分布式爬虫框架 Scrapy-Redis
  • 手撕顺序表
  • 无需多部备用机,云手机方便又便宜!
  • gptk是什么意思?Mac电脑如何在crossover里安装gptk2.0测试版?借助GPTK玩《原神》《黑神话悟空》游戏
  • 【算法】深入浅出聚类算法:原理、应用与Java实现
  • Spring Boot实战:通过Spring Cloud Sentinel实现流量控制
  • 代码随想录 刷题记录-17 贪心算法(2)习题
  • Unity--AnimationCurve动画曲线设置
  • 创建vue项目
  • 深入理解 Go 语言并发编程之系统调用底层原理
  • IP子网划分之网络工程师软考中级
  • 分子属性梯度引导的3D分子生成扩散模型 TAGMOL - 评测
  • 【celery-2】python-Django发送邮件-短信-钉钉通知
  • 软件架构设计——关联对象
  • 【初阶数据结构】顺序表和链表算法题(上)
  • 【译】JS基础算法脚本:字符串结尾
  • Angular 响应式表单之下拉框
  • CAP 一致性协议及应用解析
  • create-react-app做的留言板
  • Iterator 和 for...of 循环
  • Java IO学习笔记一
  • JavaScript HTML DOM
  • Java小白进阶笔记(3)-初级面向对象
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 不用申请服务号就可以开发微信支付/支付宝/QQ钱包支付!附:直接可用的代码+demo...
  • 聊聊springcloud的EurekaClientAutoConfiguration
  • 融云开发漫谈:你是否了解Go语言并发编程的第一要义?
  • 如何优雅地使用 Sublime Text
  • 微信小程序设置上一页数据
  • 为什么要用IPython/Jupyter?
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • ​LeetCode解法汇总2304. 网格中的最小路径代价
  • # Swust 12th acm 邀请赛# [ K ] 三角形判定 [题解]
  • #Linux(权限管理)
  • $.ajax()方法详解
  • (a /b)*c的值
  • (rabbitmq的高级特性)消息可靠性
  • (第一天)包装对象、作用域、创建对象
  • (二十四)Flask之flask-session组件
  • (七)Java对象在Hibernate持久化层的状态
  • (十六)Flask之蓝图
  • (十一)手动添加用户和文件的特殊权限
  • (转)shell调试方法
  • (转)人的集合论——移山之道
  • *算法训练(leetcode)第四十五天 | 101. 孤岛的总面积、102. 沉没孤岛、103. 水流问题、104. 建造最大岛屿
  • .Net core 6.0 升8.0
  • .Net CoreRabbitMQ消息存储可靠机制
  • .NET Entity FrameWork 总结 ,在项目中用处个人感觉不大。适合初级用用,不涉及到与数据库通信。
  • .NET Micro Framework 4.2 beta 源码探析
  • .Net Redis的秒杀Dome和异步执行
  • .net 前台table如何加一列下拉框_如何用Word编辑参考文献
  • .NET/ASP.NETMVC 大型站点架构设计—迁移Model元数据设置项(自定义元数据提供程序)...
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地定义和使用弱事件
  • .NET开源项目介绍及资源推荐:数据持久层