当前位置: 首页 > news >正文

深入理解 CompletableFuture 的底层原理

引言

在现代 Java 编程中,异步编程变得越来越重要。为了实现高效和非阻塞的代码,Java 8 引入了 CompletableFuture,一个用于构建异步应用程序的强大工具。本文将详细探讨 CompletableFuture 的底层原理,展示其工作机制,并通过代码示例说明如何在实际应用中使用它。

异步编程的背景

异步编程是指在程序运行过程中,不等待某个操作完成,而是继续执行其他操作,待异步操作完成后再处理其结果。这样可以提高程序的效率,特别是在 I/O 操作和网络请求等耗时操作中。

在 Java 8 之前,实现异步编程主要依赖于 Future 接口。然而,Future 存在一些局限性,例如无法手动完成、不能链式调用等。为了解决这些问题,Java 8 引入了 CompletableFuture

什么是 CompletableFuture

CompletableFuture 是 Java 8 中新增的类,实现了 FutureCompletionStage 接口,提供了强大的异步编程能力。CompletableFuture 允许以非阻塞的方式执行任务,并且可以通过链式调用来组合多个异步操作。

CompletableFuture 的特点

  1. 手动完成:可以手动设置 CompletableFuture 的结果或异常。
  2. 链式调用:支持多个 CompletableFuture 的链式调用,形成复杂的异步任务流。
  3. 组合操作:提供了丰富的方法来组合多个异步任务,例如 thenCombinethenAcceptBoth 等。
  4. 异常处理:提供了灵活的异常处理机制,可以在任务链中处理异常。

CompletableFuture 的底层原理

工作机制

CompletableFuture 的核心是基于 ForkJoinPool 实现的。ForkJoinPool 是一种特殊的线程池,适用于并行计算任务。它采用了工作窃取算法,能够有效利用多核 CPU 的性能。

当我们提交一个任务给 CompletableFuture 时,它会将任务提交到默认的 ForkJoinPool.commonPool() 中执行。我们也可以指定自定义的线程池来执行任务。

状态管理

CompletableFuture 具有以下几种状态:

  • 未完成(Pending):任务尚未完成。
  • 完成(Completed):任务已经成功完成,并返回结果。
  • 异常(Exceptionally Completed):任务在执行过程中抛出了异常。

这些状态通过内部的 volatile 变量来管理,并使用 CAS(Compare-And-Swap) 操作保证线程安全。

任务调度

CompletableFuture 的任务调度机制基于 ForkJoinPool 的工作窃取算法。当一个线程完成当前任务后,会从其他线程的任务队列中窃取任务执行,从而提高 CPU 利用率。

下面我们通过一个简单的示例代码来理解 CompletableFuture 的基本用法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExample {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建一个 CompletableFuture 实例CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Hello, World!";});// 阻塞等待结果String result = future.get();System.out.println(result);}
}

在上面的示例中,我们创建了一个 CompletableFuture 实例,并使用 supplyAsync 方法异步执行任务。supplyAsync 方法会将任务提交到默认的 ForkJoinPool 中执行。最后,我们使用 get 方法阻塞等待结果并打印输出。

链式调用

CompletableFuture 的一个重要特性是支持链式调用。通过链式调用,我们可以将多个异步任务组合在一起,形成一个任务流。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureChainExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new IllegalStateException(e);}return "Hello, World!";}).thenApply(result -> {return result + " from CompletableFuture";}).thenApply(String::toUpperCase);String finalResult = future.get();System.out.println(finalResult);}
}

在这个示例中,我们使用 thenApply 方法对前一个任务的结果进行处理,并返回一个新的 CompletableFuture 实例。通过链式调用,我们可以将多个任务串联在一起,形成一个任务流。

组合操作

CompletableFuture 提供了多种方法来组合多个异步任务。以下是一些常用的组合操作示例:

  1. thenCombine:组合两个 CompletableFuture,并将两个任务的结果进行处理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureCombineExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10);CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, Integer::sum);System.out.println(combinedFuture.get());  // 输出 15}
}

2. thenAcceptBoth:组合两个 CompletableFuture,并对两个任务的结果进行消费处理。

import java.util.concurrent.CompletableFuture;public class CompletableFutureAcceptBothExample {public static void main(String[] args) {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 5);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 10);future1.thenAcceptBoth(future2, (result1, result2) -> {System.out.println("Result: " + (result1 + result2));}).join();}
}

3. allOf:组合多个 CompletableFuture,并在所有任务完成后执行操作。

import java.util.concurrent.CompletableFuture;public class CompletableFutureAllOfExample {public static void main(String[] args) {CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println("Task 1 completed");});CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println("Task 2 completed");});CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);combinedFuture.join();System.out.println("All tasks completed");}
}

异常处理

在异步任务中处理异常是非常重要的。CompletableFuture 提供了多种方法来处理任务执行过程中的异常。

  1. exceptionally:在任务抛出异常时,提供一个默认值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureExceptionallyExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Exception occurred");}return "Hello, World!";}).exceptionally(ex -> {System.out.println("Exception: " + ex.getMessage());return "Default Value";});System.out.println(future.get());  // 输出 Default Value}
}

2. handle:无论任务是否抛出异常,都进行处理。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureHandleExample {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Exception occurred");}return "Hello, World!";}).handle((result, ex) -> {if (ex != null) {return "Default Value";}return result;});System.out.println(future.get());  // 输出 Default Value}
}

实战案例:构建异步数据处理管道

为了更好地理解 CompletableFuture 的实际应用,我们来构建一个异步数据处理管道。假设我们有一个数据源,需要对数据进行一系列的处理操作,并将处理结果输出到文件中。

数据源模拟

我们首先模拟一个数据源,该数据源会生成一系列数据。

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class DataSource {public List<Integer> getData() {return IntStream.range(0, 10).boxed().collect(Collectors.toList());}
}

数据处理

接下来,我们定义数据处理操作。假设我们需要对数据进行两步处理:首先对每个数据乘以 2,然后对结果进行累加。

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;public class DataProcessor {public List<Integer> processStep1(List<Integer> data) {return data.stream().map(x -> x * 2).collect(Collectors.toList());}public Integer processStep2(List<Integer> data) {return data.stream().reduce(0, Integer::sum);}public CompletableFuture<List<Integer>> processStep1Async(List<Integer> data) {return CompletableFuture.supplyAsync(() -> processStep1(data));}public CompletableFuture<Integer> processStep2Async(List<Integer> data) {return CompletableFuture.supplyAsync(() -> processStep2(data));}
}

结果输出

我们定义一个方法将处理结果输出到文件中。

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;public class ResultWriter {public void writeResult(String fileName, Integer result) throws IOException {Files.write(Paths.get(fileName), result.toString().getBytes());}public CompletableFuture<Void> writeResultAsync(String fileName, Integer result) {return CompletableFuture.runAsync(() -> {try {writeResult(fileName, result);} catch (IOException e) {throw new IllegalStateException(e);}});}
}

主程序

最后,我们在主程序中将上述组件组合在一起,构建异步数据处理管道。

import java.util.List;
import java.util.concurrent.CompletableFuture;public class Main {public static void main(String[] args) {DataSource dataSource = new DataSource();DataProcessor dataProcessor = new DataProcessor();ResultWriter resultWriter = new ResultWriter();List<Integer> data = dataSource.getData();CompletableFuture<List<Integer>> step1Future = dataProcessor.processStep1Async(data);CompletableFuture<Integer> step2Future = step1Future.thenCompose(dataProcessor::processStep2Async);CompletableFuture<Void> writeFuture = step2Future.thenCompose(result -> resultWriter.writeResultAsync("result.txt", result));writeFuture.join();System.out.println("Data processing completed");}
}

在这个例子中,我们使用 CompletableFuture 将数据处理步骤和结果输出串联在一起,形成了一个完整的异步数据处理管道。通过 thenCompose 方法,我们将前一个任务的结果传递给下一个异步任务,从而实现了链式调用。

总结

本文深入探讨了 CompletableFuture 的底层原理,展示了其工作机制,并通过多个代码示例说明了如何在实际应用中使用 CompletableFuture。通过理解 CompletableFuture 的异步编程模型、状态管理、任务调度和异常处理机制,我们可以更好地利用这一强大的工具构建高效、非阻塞的 Java 应用程序。

希望这篇文章能够帮助你全面理解 CompletableFuture,并在实际开发中灵活应用。如果你有任何问题或建议,欢迎随时交流!

相关文章:

  • 计算机视觉硬件整理(四):相机与镜头参数介绍
  • 【Kubernetes】常见面试题汇总(三十四)
  • python的逻辑控制
  • 高刷显示器哪个好?540Hz才有资格称高刷
  • 重修设计模式-行为型-责任链模式
  • 【玩转贪心算法专题】738. 单调递增的数字【中等】
  • 硬件设计很简单?合宙低功耗4G模组Air780E—开机启动及外围电路设计
  • 文件上传js代码
  • 华为认证HCIA篇--网络通信基础
  • JavaScript中if嵌套assert的方法
  • 【python append函数的一些细节】
  • 初步认识了解分布式系统
  • 货拉拉高级大数据平台算法工程师社招一面
  • 服务器数据恢复—SAN环境下LUN映射出错导致文件系统一致性出错的数据恢复案例
  • useCallback()
  • [ JavaScript ] 数据结构与算法 —— 链表
  • 【comparator, comparable】小总结
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 【跃迁之路】【444天】程序员高效学习方法论探索系列(实验阶段201-2018.04.25)...
  • IE报vuex requires a Promise polyfill in this browser问题解决
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • java8-模拟hadoop
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • Python利用正则抓取网页内容保存到本地
  • sublime配置文件
  • Vue 重置组件到初始状态
  • Xmanager 远程桌面 CentOS 7
  • 初探 Vue 生命周期和钩子函数
  • 反思总结然后整装待发
  • 前端存储 - localStorage
  • 前端面试题总结
  • 前端之React实战:创建跨平台的项目架构
  • 人脸识别最新开发经验demo
  • 入门级的git使用指北
  • 温故知新之javascript面向对象
  • 由插件封装引出的一丢丢思考
  • d²y/dx²; 偏导数问题 请问f1 f2是什么意思
  • LevelDB 入门 —— 全面了解 LevelDB 的功能特性
  • 浅谈sql中的in与not in,exists与not exists的区别
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • ​ssh免密码登录设置及问题总结
  • ​软考-高级-信息系统项目管理师教程 第四版【第19章-配置与变更管理-思维导图】​
  • ## 临床数据 两两比较 加显著性boxplot加显著性
  • #AngularJS#$sce.trustAsResourceUrl
  • #我与Java虚拟机的故事#连载11: JVM学习之路
  • $emit传递多个参数_PPC和MIPS指令集下二进制代码中函数参数个数的识别方法
  • %3cscript放入php,跟bWAPP学WEB安全(PHP代码)--XSS跨站脚本攻击
  • (02)Cartographer源码无死角解析-(03) 新数据运行与地图保存、加载地图启动仅定位模式
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (4) openssl rsa/pkey(查看私钥、从私钥中提取公钥、查看公钥)
  • (function(){})()的分步解析
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (vue)el-checkbox 实现展示区分 label 和 value(展示值与选中获取值需不同)
  • (超简单)使用vuepress搭建自己的博客并部署到github pages上
  • (二刷)代码随想录第15天|层序遍历 226.翻转二叉树 101.对称二叉树2