当前位置: 首页 > news >正文

Java 8 并行流:必备技巧

Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。可以包装流来调用自己的线程池解决性能问题。

问题

Java 8 的并行流可以让我们相对轻松地执行并行任务。

myList.parallelStream.map(obj -> longRunningOperation()) 
复制代码

但是这样存在一个严重的问题:在 JVM 的后台,使用通用的 fork/join
池来完成上述功能,该池是所有并行流共享的。默认情况,fork/join
池会为每个处理器分配一个线程。假设你有一台16核的机器,这样你就只能创建16个线程。对 CPU
密集型的任务来说,这样是有意义的,因为你的机器确实只能执行16个线程。但是真实情况下,不是所有的任务都是 CPU 密集型的。例如:

myList.parallelStream  
   .map(this::retrieveFromA)
   .map(this::processUsingB)
   .forEach(this::saveToC)
 
myList.parallelStream  
   .map(this::retrieveFromD)
   .map(this::processUsingE)
   .forEach(this::saveToD)
复制代码

这两个流很大程度上是受限于IO操作,所以会等待其他系统。但这两个流使用相同的(小)线程池,因此会相互等待而被阻塞。这个非常不好,可以改进。我们以一个流为例:

final List<Integer> firstRange = buildIntRange();  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
});
复制代码

完整的代码可以在gist上查看。

在执行期间,我获取了一份线程dump的文件。这是相关的线程(在我的Macbook上):

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
复制代码

现在,我要并行的执行这两个并行流

Runnable firstTask = () -> {  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
 
Runnable secondTask = () -> {  
   secondRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
// run threads
复制代码

完整的代码可以在gist上查看。

这次我们再看一下线程dump文件:

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
复制代码

正如你所见,结果是一样的。我们只使用了4个线程。

一种变通方案

正如我所提到的,JVM 后台使用 fork/join 池,在 ForkJoinTask 的文档中,我们可以看到:

如果合适,安排一个异步执行的任务到当前正在运行的池中。如果任务不在inForkJoinPool()中,也可以调用ForkJoinPool.commonPool()获取新的池来执行。

让我试一试……

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    firstRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) { }
    });
});
 
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
forkJoinPool2.submit(() -> {  
    secondRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
        }
    });
});
复制代码

完整的代码可以在gist上查看。

现在,我们再次查看线程池:

ForkJoinPool-1-worker-1 
ForkJoinPool-1-worker-2 
ForkJoinPool-1-worker-3 
ForkJoinPool-1-worker-4 
ForkJoinPool-2-worker-1 
ForkJoinPool-2-worker-2 
ForkJoinPool-2-worker-3 
ForkJoinPool-1-worker-4
复制代码

因为我们创建自己的线程池,所以可以避免共享线程池,如果有需要,甚至可以分配比处理机数量更多的线程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>); 
复制代码


欢迎关注知乎专栏《跟上Java8》,分享优秀的Java8中文指南、教程,同时欢迎投稿高质量的文章。


原文链接: tobyhobson 翻译: ImportNew.com - paddx
译文链接: www.importnew.com/16801.html


相关文章:

  • 百度收购被收购传闻四起,UC 向左Or向右?
  • 维护keepalived与mysql漂移脚本
  • 题目分类
  • linux命令学习系列-用户切换su,sudo
  • 2013-04-24
  • XVIII Open Cup named after E.V. Pankratiev. Eastern Grand Prix
  • Mac下 Java开发配置MyEclipse对应的Tomcat/对Tomcat文件授权
  • 代码审查
  • 网站性能优化之黄金守则
  • Java并发系列学习(三)
  • axis1.4 websercice服务客户端开发
  • TCp
  • 黑马Java学习笔记之-----Java常见异常总结 (转)
  • ALVIX无人机来了,拥有环形收纳可变形旋翼
  • 错误信息原因[置顶] Android开发错误汇总(转)
  • 【翻译】babel对TC39装饰器草案的实现
  • 【面试系列】之二:关于js原型
  • Angular数据绑定机制
  • canvas 高仿 Apple Watch 表盘
  • gf框架之分页模块(五) - 自定义分页
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • leetcode46 Permutation 排列组合
  • Python 反序列化安全问题(二)
  • Python利用正则抓取网页内容保存到本地
  • rc-form之最单纯情况
  • Service Worker
  • UMLCHINA 首席专家潘加宇鼎力推荐
  • 包装类对象
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 对话 CTO〡听神策数据 CTO 曹犟描绘数据分析行业的无限可能
  • 检测对象或数组
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 入门级的git使用指北
  • 微服务框架lagom
  • 小程序开发中的那些坑
  • 新手搭建网站的主要流程
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • Prometheus VS InfluxDB
  • ​io --- 处理流的核心工具​
  • #、%和$符号在OGNL表达式中经常出现
  • #我与Java虚拟机的故事#连载01:人在JVM,身不由己
  • $(document).ready(function(){}), $().ready(function(){})和$(function(){})三者区别
  • (aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
  • (二)换源+apt-get基础配置+搜狗拼音
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (十)DDRC架构组成、效率Efficiency及功能实现
  • (十三)Flask之特殊装饰器详解
  • (数位dp) 算法竞赛入门到进阶 书本题集
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • .CSS-hover 的解释
  • .equals()到底是什么意思?
  • .FileZilla的使用和主动模式被动模式介绍
  • .NET Core 中插件式开发实现
  • .net Stream篇(六)
  • .NET 设计模式初探