当前位置: 首页 > news >正文

fork/join使用示例

fork/join框架是用多线程的方式实现分治法来解决问题。fork指的是将问题不断地缩小规模,join是指根据子问题的计算结果,得出更高层次的结果。

fork/join框架的使用有一定的约束条件:

1. 除了fork()  和  join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()

2. 线程不得进行I/O操作

3. 线程不得抛出checked exception

此框架有几个核心类:ForkJoinPool是实现了工作窃取算法的线程池。ForkJoinTask是任务类,他有2个子类:RecursiveAction无返回值,RecursiveTask有返回值,在定义自己的任务时,一般都是从这2类中挑一个,通过继承的方式定义自己的新类。由于ForkJoinTask类实现了Serializable接口,因此,定义自己的任务类时,应该定义serialVersionUID属性。

在编写任务时,推荐的写法是这样的:

If (problem size > default size){  
   task s = divide(task);  
   execute(tasks);  
} else {  
   resolve problem using another algorithm;  
}  

ForkJoinPool实现了工作窃取算法(work-stealing),线程会主动寻找新创建的任务去执行,从而保证较高的线程利用率。它使用守护线程(deamon)来执行任务,因此无需对他显示的调用shutdown()来关闭。一般情况下,一个程序只需要唯一的一个ForkJoinPool,因此应该按如下方式创建它:

static final ForkJoinPool mainPool = new ForkJoinPool(); //线程的数目等于CPU的核心数

下面给出一个非常简单的例子,功能是将一个数组中每一个元素的值加1。具体实现为:将大数组不断分解为更短小的子数组,当子数组长度不超过10的时候,对其中所有元素进行加1操作。

package forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class Test {

    public final static ForkJoinPool mainPool = new ForkJoinPool();

    public static void main(String[] args) {
        int n = 26;
        int[] a = new int[n];
        System.out.println("before:");
        for (int i = 0; i < n; i++) {
            a[i] = i;
            System.out.print(a[i] + " ");
        }
        SubTask task = new SubTask(a, 0, n);
        mainPool.invoke(task);
        System.out.println();
        System.out.println("after:");
        for (int i = 0; i < n; i++) {
            System.out.print(a[i] + " ");
        }
    }
}

class SubTask extends RecursiveAction {

    private static final long serialVersionUID = 1L;

    private int[] a;
    private int beg;
    private int end;

    public SubTask(int[] a, int beg, int end) {
        super();
        this.a = a;
        this.beg = beg;
        this.end = end;
    }

    @Override
    protected void compute() {
        if (end - beg > 10) {
            int mid = (beg + end) / 2;
            SubTask t1 = new SubTask(a, beg, mid);
            SubTask t2 = new SubTask(a, mid, end);
            invokeAll(t1, t2);
        } else {
            for (int i = beg; i < end; i++) {
                a[i] = a[i] + 1;
            }
        }
    }
}

结果:

before:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
after:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26

例子2,任务拥有返回值。随机生成一个数组,每个元素均是0-999之间的整数,统计该数组中每个数字出现1的次数的和。

实现方法,将该数组不断的分成更小的数组,直到每个子数组的长度为1,即只包含一个元素。此时,统计该元素中包含1的个数。最后汇总,得到数组中每个数字共包含了多少个1。

package forkjoin.demo2;

import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Test {

    public final static ForkJoinPool mainPool = new ForkJoinPool();

    public static void main(String[] args) {
        int n = 26;
        int[] a = new int[n];
        Random rand = new Random();
        System.out.println("before:");
        for (int i = 0; i < n; i++) {
            a[i] = rand.nextInt(1000);
            System.out.print(a[i] + " ");
        }
        SubTask task = new SubTask(a, 0, n);
        int count = mainPool.invoke(task);
        System.out.println();
        System.out.println("after:");
        for (int i = 0; i < n; i++) {
            System.out.print(a[i] + " ");
        }
        System.out.println("\n数组中共出现了" + count + "个1");
    }
}

class SubTask extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 1L;

    private int[] a;
    private int beg;
    private int end;

    public SubTask(int[] a, int beg, int end) {
        super();
        this.a = a;
        this.beg = beg;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (end - beg > 1) {
            int mid = (beg + end) / 2;
            SubTask t1 = new SubTask(a, beg, mid);
            SubTask t2 = new SubTask(a, mid, end);
            invokeAll(t1, t2);
            try {
                result = t1.get() + t2.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        } else {
            result = count(a[beg]);
        }
        return result;
    }

    // 统计一个整数中出现了几个1
    private int count(int n) {
        int result = 0;
        while (n > 0) {
            if (n % 10 == 1) {
                result++;
            }
            n = n / 10;
        }
        return result;
    }
}

结果:

before:
466 581 913 818 611 871 10 748 903 797 830 426 887 198 416 945 592 409 993 408 368 663 117 120 802 510 
after:
466 581 913 818 611 871 10 748 903 797 830 426 887 198 416 945 592 409 993 408 368 663 117 120 802 510 
数组中共出现了13个1

例子3,异步执行任务。前面两个例子都是同步执行任务,当启动任务后,主线程陷入了阻塞状态,直到任务执行完毕。若创建新任务后,希望当前线程能继续执行而非陷入阻塞,则需要异步执行。ForkJoinPool线程池提供了execute()方法来异步启动任务,而作为任务本身,可以调用fork()方法异步启动新的子任务,并调用子任务的join()方法来取得计算结果。需要注意的是,异步使用ForkJoin框架,无法使用“工作窃取”算法来提高线程的利用率,针对每个子任务,系统都会启动一个新的线程。

本例的功能是查找硬盘上某一类型的文件。给定文件扩展名后,将硬盘上所有该类型的文件名打印显示出来。作为主程序,启动任务后,继续显示任务的执行进度,每3秒钟打印显示一个黑点,表示任务在继续。最后,当所有线程都结束了,打印显示结果。

package forkjoin.demo3;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class ThreadLocalTest {

    public static void main(String[] args) throws Exception {
        Path p = Paths.get("D:/");
        List<Path> roots = (List<Path>) FileSystems.getDefault().getRootDirectories();
        List<Path> result = new ArrayList<>();
        List<MyTask> tasks = new ArrayList<>();
        ForkJoinPool pool = new ForkJoinPool();
        for (Path root : roots) {
            MyTask t = new MyTask(root, "pdf");
            pool.execute(t);
            tasks.add(t);
        }

        System.out.print("正在处理中");
        while (isAllDone(tasks) == false) {
            System.out.print(". ");
            TimeUnit.SECONDS.sleep(3);
        }

        for (MyTask t : tasks) {
            result.addAll(t.get());
        }

        for (Path pp : result) {
            System.out.println(pp);
        }
    }

    private static boolean isAllDone(List<MyTask> tasks) {
        boolean result = true;
        for (MyTask t : tasks) {
            if (t.isDone() == false) {
                result = false;
                break;
            }
        }
        return result;
    }
}

class MyTask extends RecursiveTask<List<Path>> {

    private static final long serialVersionUID = 1L;

    private Path path;
    private String fileExtention;

    public MyTask(Path path, String fileExtention) {
        super();
        this.path = path;
        this.fileExtention = fileExtention;
    }

    @Override
    protected List<Path> compute() {
        List<Path> result = new ArrayList<>();
        try {
            DirectoryStream<Path> paths = Files.newDirectoryStream(path);
            List<MyTask> subTasks = new ArrayList<>();
            for (Path p : paths) {
                if (Files.isDirectory(p)) {
                    MyTask t = new MyTask(p, fileExtention);
                    t.fork();
                    subTasks.add(t);
                } else if (Files.isRegularFile(p)) {
                    if (p.toString().toLowerCase().endsWith("." + fileExtention)) {
                        result.add(p);
                    }
                }
            }

            for (MyTask t : subTasks) {
                result.addAll(t.join());
            }
        } catch (IOException e) {
        }
        return result;
    }
}

结果:

正在处理中. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 

 

相关文章:

  • Tomcat6.0 for Linux X86_64 Install
  • HLSL学习实践记录: RenderMonkey实现(三):天空盒子
  • eclipse安装颜色主题,个性化你的IDE,让你的IDE焕然一新
  • extjs form 取值 赋值 重置
  • linux网络编程涉及的函数
  • coffeescript 基本语法
  • ASP.NET MVC3-第02节-添加一个Controller (C#)
  • for test
  • What’s New in Python 2.7 — Python 3.4.0b2 documentation
  • C# 集合类 Array,Arraylist,List,Hashtable,Dictionary...
  • 转观念 变架构 补短板——析科华恒盛向数据中心方案商转型
  • spring security3.x学习(5)_如何拦截用户请求
  • php.ini中文解释
  • ubuntu下gvim字体设置
  • 《JAVA面向对象的特征 》
  • [PHP内核探索]PHP中的哈希表
  • 【编码】-360实习笔试编程题(二)-2016.03.29
  • 【剑指offer】让抽象问题具体化
  • 8年软件测试工程师感悟——写给还在迷茫中的朋友
  • bearychat的java client
  • CSS进阶篇--用CSS开启硬件加速来提高网站性能
  • eclipse的离线汉化
  • export和import的用法总结
  • GitUp, 你不可错过的秀外慧中的git工具
  • HTTP--网络协议分层,http历史(二)
  • iBatis和MyBatis在使用ResultMap对应关系时的区别
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • js作用域和this的理解
  • leetcode98. Validate Binary Search Tree
  • OSS Web直传 (文件图片)
  • PAT A1050
  • Python进阶细节
  • v-if和v-for连用出现的问题
  • windows-nginx-https-本地配置
  • 从PHP迁移至Golang - 基础篇
  • 电商搜索引擎的架构设计和性能优化
  • 个人博客开发系列:评论功能之GitHub账号OAuth授权
  • 工作中总结前端开发流程--vue项目
  • 如何使用Mybatis第三方插件--PageHelper实现分页操作
  • 少走弯路,给Java 1~5 年程序员的建议
  • 小程序开发之路(一)
  • 原创:新手布局福音!微信小程序使用flex的一些基础样式属性(一)
  • k8s使用glusterfs实现动态持久化存储
  • 昨天1024程序员节,我故意写了个死循环~
  • #LLM入门|Prompt#3.3_存储_Memory
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • $(function(){})与(function($){....})(jQuery)的区别
  • $(selector).each()和$.each()的区别
  • %@ page import=%的用法
  • (HAL库版)freeRTOS移植STMF103
  • (分布式缓存)Redis持久化
  • (附源码)计算机毕业设计ssm电影分享网站
  • (一)SpringBoot3---尚硅谷总结
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)Google的Objective-C编码规范