当前位置: 首页 > news >正文

一些关于线程之间协作的心得

join

首先看代码

public class Join_Test {public static void main(String[] args) throws InterruptedException {//创建线程 ThreadA,当然,里面具体执行了什么其实不重要Thread threadA = new Thread(new ThreadA());//创建线程 ThreadB,当然,里面具体执行了什么其实不重要Thread threadB = new Thread(new ThreadB());//启动线程 ThreadAthreadA.start();//调用join()方法,使主线程等待线程 ThreadA 执行完毕threadA.join();//启动线程 ThreadBthreadB.start();//调用join()方法,使主线程等待线程 ThreadB 执行完毕threadB.join();//主线程执行完毕System.out.println("主线程执行完毕");}
}

其实我在搜索:java有哪些方法或者类 和 多线程有关的时候,join 总是出现其中,这就不免的让我感觉,join应该和 多线程,并行 这些词汇挂钩,于是我有了下面的疑问:

java中线程的join方法是让当前线程暂停,等待join方法执行完毕后,
再继续执行。那么我可不可以理解成join其实是一个串行的方法
在Java中,join() 方法的行为确实可以被理解为在某种程度上“串型化”了线程的执行,
但这种理解需要一些细微的调整和澄清。首先,join() 方法是 Thread 类的一个方法,用于让当前正在执行的线程(我们称之为“调用线程”)暂停执行,直到该线程(调用 join() 方法的线程)执行完毕。
这确实在效果上类似于将两个线程的执行顺序串型化,即先执行完一个线程,再执行另一个。然而,说 join() 是一个“串型的方法”可能不完全准确,因为它并不改变Java多线程并发执行的本质。Java的线程模型是基于并发执行的,即多个线程可以同时运行
(尽管在单核处理器上,这通常是通过时间片轮转来实现的)。join() 方法只是提供了一种机制,让程序员能够控制线程之间的执行顺序,但它并不改变Java多线程的并发特性。此外,join() 方法的使用场景通常是在需要等待某个线程完成其任务之后,再继续执行当前线程的情况下。这有助于维护程序状态的正确性和一致性,
特别是在多线程需要共享资源或数据的情况下。总结来说,join() 方法可以被视为在特定情况下“串型化”线程执行的一种手段,但它并不改变Java多线程并发执行的本质。
它提供了一种控制线程执行顺序的机制,使得程序能够按照预定的顺序执行多个线程的任务。

当我看完这个答复之后,我觉得 join 其实和 线程之间的执行顺序,或者说调度息息相关,它在多线程并发执行以用来提高效率方面,好像没那么明显。

CompletableFuture

首先要明确,CompletableFuture 更多的是作为一个工具类来协调多个线程之间的调度,而非一个实体类,所以我们一般不去new CompletableFuture()

看第一个例子:

public class 提交一个无返回值的异步任务 {public static void main(String[] args) {ThreadA threadA = new ThreadA();//ThreadA是一个实现了Runnable的类CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(threadA);System.out.println("主线程执行到了这里");//因为runAsync是异步的,// 会交给线程池去执行,主线程不会等待runAsync执行完毕,所以这段话会直接打印出来voidCompletableFuture.join();//这里这么做是为了让主线程等待runAsync执行完毕,// join()方法是让当前线程等待voidCompletableFuture执行完毕//如果不这么做,主线程一旦执行完,整个程序就结束了,runAsync的线程可能还没执行完,控制台也就看不到runAsync的输出了}
}

再看第二个例子:

public class 提交一个有返回值的异步任务 {public static void main(String[] args) throws Exception {ThreadC threadC = new ThreadC();FutureTask<String> stringFutureTask = new FutureTask<>(threadC);stringFutureTask.run();//这里并不是异步执行,而是同步执行。其实他是调用了ThreadC的call方法,// 并不是另起一个线程去执行call方法,没有Runnable的实现类的start()方法的效果//而且,代码执行到这里的时候,你是看不到 "线程C正在执行" 这句话的,但是你能看到 线程C正在执行xxx 这句话Thread.sleep(3000);System.out.println("主线程执行到了这里");System.out.println("stringFutureTask.get() = " + stringFutureTask.get());
//        CompletableFuture.supplyAsync(stringFutureTask)//这里是报错的,我们不能用这种方式来提交一个有返回值的任务}
}class ThreadC implements Callable<String> {@Overridepublic String call() throws Exception {for (int i = 0; i < 10; i++) {System.out.println("线程C正在执行" + i);Thread.sleep(1000);}return "线程C执行完毕";}
}
这两个无非就是提交一个Runnable接口的,还有提交一个Callable接口的差别
但是前者可以,后者就不可以,这是为什么。或者这么说:CompletableFuture.supplyAsync 
既然是一个有返回值的方法,为什么里面不能传 Callable 的实现类,
而CompletableFuture.runAsync是一个没有返回值的方法,
里面就可以传 Runnable 的实现类。这不是很矛盾吗?
这个问题实际上涉及到了CompletableFuture中supplyAsync和runAsync方法的设计意图和它们各自处理的任务类型。
这两个方法的设计并不矛盾,而是根据它们的功能需求来区分使用场景。CompletableFuture.supplyAsync 
CompletableFuture.supplyAsync方法用于异步地执行一个供应(Supplier)任务,这个任务完成后会返回一个结果。
Supplier是一个函数式接口,它只包含一个无参方法get(),该方法返回一个结果。由于supplyAsync需要能够处理
并返回这个任务的结果,因此它不能接收Callable接口的实现类作为参数,因为Callable接口的设计是为了同时支持
返回值和异常处理(通过call()方法抛出异常),而supplyAsync已经通过CompletableFuture的异常处理机制来
管理异常,所以不需要Callable的额外功能。CompletableFuture.runAsync
相反,CompletableFuture.runAsync方法用于异步地执行一个运行(Runnable)任务,这个任务不返回任何结果。
Runnable是一个函数式接口,它只包含一个无参无返回值的方法run()。由于runAsync不需要处理返回值,
因此它非常适合与Runnable接口一起使用。为什么不是“矛盾”的
这两个方法的设计并不矛盾,因为它们分别解决了不同的需求场景:如果你需要异步执行一个任务并获取其结果,你应该使用supplyAsync并传入一个Supplier。
如果你只需要异步执行一个不需要返回结果的任务,你应该使用runAsync并传入一个Runnable。
结论
CompletableFuture的这两个方法通过清晰地区分“有返回值的任务”和“无返回值的任务”,
提供了一种灵活而强大的方式来处理异步编程中的常见需求。这种设计避免了混淆,并使得代码更加清晰和易于理解。

第二个问题

为什么Runnable,或者说 Thread直接可以调用start方法来开启一个线程,Callable却没有这种机制,一定要借助线程池
Thread 类直接提供 start() 方法来开启一个线程,而 Callable 接口没有这种机制的原因主要涉及到它们的设计目的和用途。Thread 类
Thread 类是 Java 中用于表示和操作线程的一个类。它是 Java 线程机制的核心。当你创建一个 Thread 类的实例时,
你实际上是在创建一个新的线程。Thread 类提供了 start() 方法来启动这个线程,使其开始执行其内部的 run() 方法
(无论是通过继承 Thread 类并重写 run() 方法,还是通过实现 Runnable 接口并将其实例传递给 Thread 类的构造函数)。Callable 接口
与 Runnable 类似,Callable 也是一个函数式接口,但它提供了一种可以产生结果并能抛出异常的方式来执行任务。
然而,Callable 本身并不直接关联到线程的启动和管理。相反,它是为了与 ExecutorService 一起使用而设计的,
后者提供了一种更高级别的线程管理接口,能够处理任务的提交、执行、结果获取和异常处理等。为什么 Callable 需要借助线程池
结果和异常处理:Callable 的 call() 方法可以返回一个结果,并且可以抛出异常。为了处理这些结果和异常,
ExecutorService 提供了 Future 接口的实现(如 FutureTask),这些实现可以捕获 Callable 任务的结果和异常,
并在任务完成时提供给调用者。
线程复用和管理:线程池(如 ExecutorService)能够管理一组工作线程,并允许你提交任务给这些线程执行。
使用线程池可以显著提高性能,因为它减少了线程创建和销毁的开销,并允许你更好地控制同时运行的线程数量。
对于 Callable 任务来说,使用线程池可以更加高效地管理任务的执行。
解耦:将任务的执行(Callable 或 Runnable)与线程的管理(Thread 或 ExecutorService)解耦是一种更好的设计选择。
这样做可以使你的代码更加灵活和可重用。例如,你可以轻松地更改任务的执行策略(如使用不同的线程池或单线程执行器),
而无需修改任务本身的代码。
因此,尽管 Thread 类直接提供了 start() 方法来启动线程,但 Callable 并不这样做,而是与 ExecutorService 
等高级线程管理工具一起使用,以提供更强大、更灵活的任务执行和结果处理功能。
我们知道 callable 接口需要配合线程池一起使用,那么我有这样一个疑问,当我往线程池提交了一个callable的时候,
如果它3秒就执行完了,但是我5秒后才使用了get方法获取他的返回结果,那么中间这两秒,它是将结果放在了哪里?
还是一直占用着一个线程池中的线程,直到我使用get方法
在Java的线程池(如ExecutorService)中,当你提交一个Callable任务时,这个任务会被线程池中的一个线程执行。
Callable接口与Runnable接口的主要区别在于Callable能够返回一个结果,并且这个结果是可以通过Future对象获取的。当你提交一个Callable任务到线程池,并获取到一个Future对象时,这里有几个关键点需要理解:任务执行:一旦线程池中的某个线程被分配给这个任务,它就会开始执行这个任务。如果任务执行得非常快(
比如3秒内完成),那么执行完成后,线程会返回到线程池中等待下一次被分配任务。
结果存储:当Callable任务执行完成后,其结果会被存储在与之关联的Future对象中。但请注意,这个结果并不
是存储在某个特定的“地方”,而是与Future对象紧密关联的。换句话说,这个结果是通过Future对象来管理和访问的。
线程释放:一旦Callable任务执行完成,不管你是否立即通过Future.get()方法获取结果,执行该任务的线程都会被释
放回线程池。这意味着线程不会被这个任务持续占用,直到你调用get()方法。线程池中的线程是动态分配的,用于执行多个任务。
结果获取:当你调用Future.get()方法时,如果任务已经完成,get()方法会立即返回结果。如果任务尚未完成,
get()方法会阻塞当前线程(调用get()方法的线程,而不是执行Callable任务的线程),直到任务完成并返回结果。
这种阻塞是get()方法的行为,与线程池或执行任务的线程无关。
总结来说,当你往线程池提交了一个Callable任务,并且它很快执行完成后,即使你稍后通过Future.get()方法获取结果,
执行该任务的线程也会在任务完成后立即被释放回线程池。中间的时间,任务的结果是被存储在Future对象中,而不是占用线程池中的线程。
Future 对象中,哪个字段是用来存储结果的
在Java的Future接口中,实际上并没有直接暴露一个字段来存储结果。Future接口定义了一组方法,
用于查询计算是否完成、等待计算完成以及检索计算结果,但它并没有具体实现这些功能,而是留给了具体的实现类去完成。通常,当你通过ExecutorService提交一个Callable任务时,会返回一个Future实现类的对象(在JDK中,
这个实现类通常是FutureTask)。FutureTask类内部有一个Callable对象以及一个用于存储结果的字段
(尽管这个字段并不是Future接口的一部分,而是FutureTask类的实现细节)。在FutureTask类中,有一个Object类型的字段(在较新版本的JDK中,这个字段可能是泛型化的,比如V,
代表Callable的返回类型),用于存储Callable任务执行完成后的结果。然而,这个字段并不是Future接口
的一部分,它是FutureTask类为了实现Future接口而添加的。当你调用Future.get()方法时,FutureTask的实现会检查任务是否已经完成,并且结果是否已经被存储。
如果任务已经完成并且结果可用,它会立即返回这个结果。如果任务尚未完成,根据get()方法的参数
(是否有超时或是否中断当前线程),它可能会阻塞调用线程直到任务完成或等待超时。由于Future接口是抽象的,并且不直接提供存储结果的字段,因此你应该通过接口定义的方法(如get())
来与结果交互,而不是直接访问任何字段。这样,你的代码就与具体的实现(如FutureTask)解耦了,
这有助于提高代码的灵活性和可维护性。

通过上面的例子,我们了解到了 CompletableFuture 可以开启异步线程,但是我们也说过了,
CompletableFuture 一般还用于协调线程之间的执行,这时候就用到了组合这一功能。

这里暂时不做赘述,等我再研究研究

Semaphore 信号量

Semaphore(信号量)是一种控制多个线程对共享资源访问的同步工具。它主要用于限制对某个资源的同时访问数量
这句话很重要,他并不保证多线程的时候对共享变量的准确性

实际上,当 Semaphore(1) 的时候,的确可以达到互斥的效果,从而达到对共享变量操作的准确性。
但是当 Semaphore(2)或者是更多的时候,他的意思是,允许同时有两个线程执行代码,
仅仅只能限制这个而已。那么既然能有两个线程执行代码,那么就不可避免的会遇到操作共享变量出问题的时候,
所以说,还是需要去使用锁的,当然,具体看业务,如果业务中能够直接用 CopyOnWriteArrayList 这种并发类,当然最好
想象有一个管道,管道的一头是生产者,一头是消费者。 Semaphore 控制的是出入的流量,也就是限流问题,并不保证其他。
所以 Semaphore 的关键字是 :“限流”	“限流”	“限流”	,重要的事情说三遍

CyclicBarrier

先来看基础用法

public class WorkerDemoFromMySelf {//比如有10个人,白天上班,晚上下班,晚上下班后,大家集合吃饭,但是由于每个人的工作时间不一样,// 所以有的人晚一点到,有的人早一点到,但是必须等到所有人都到了,才能开始吃饭public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> {System.out.println("所有人都到了,开始吃饭");try {Thread.sleep(4000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("吃饭结束");});for (int i = 0; i < 10; i++) {new Thread(new Worker(i, cyclicBarrier)).start();}}
}class Worker implements Runnable {private final int id;private final CyclicBarrier cyclicBarrier;Worker(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {//随机一个10以内的数,模拟工作时间int workTime = (int) (Math.random() * 10);try {for (int i = 0; i < workTime; i++) {System.out.println("工人 " + id + " 正在工作...");Thread.sleep(1000);}cyclicBarrier.await();//这里就是等待所有人都到了,去执行 new CyclicBarrier 里面的代码//到这里说明吃饭都结束了,该回家了System.out.println("工人 " + id + " 回家了");} catch ( Exception e) {Thread.currentThread().interrupt(); // 保持中断状态System.out.println("工人 " + id + " 被中断或屏障破裂");}}
}

如果出现了异常怎么办?

public class WorkerDemoFromMySelf {//比如有3个人,白天上班,晚上下班,晚上下班后,大家集合吃饭,但是由于每个人的工作时间不一样,// 所以有的人晚一点到,有的人早一点到,但是必须等到所有人都到了,才能开始吃饭public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("所有人都到了,开始吃饭");try {java.lang.Thread.sleep(4000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("吃饭结束");});for (int i = 1; i < 4; i++) {new Thread(new Worker(i, cyclicBarrier)).start();}}
}class Worker implements Runnable {private final int id;private final CyclicBarrier cyclicBarrier;Worker(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {Thread.sleep(id * 2000L);if(id==2){throw new RuntimeException("模拟异常");}cyclicBarrier.await();System.out.println("屏障结束,开始吃饭");} catch (Exception e) {throw new RuntimeException(e);}}
}
//具体来说,这里发生了以下事情:
//
// 创建了一个 CyclicBarrier 对象,其参与者数量为 3,并设置了一个在屏障被触发时执行的 Runnable。
// 启动了三个线程(ID 为 1, 2, 3),每个线程都在其 run 方法中首先通过 Thread.sleep() 模拟不同的工作时间,然后尝试到达屏障点。
//但是,当 ID 为 2 的线程执行到 if(id==2) { throw new RuntimeException("模拟异常"); } 这一行时,它会抛出一个 RuntimeException。这个异常会被 catch (Exception e) 块捕获,并且由于 catch 块内部又抛出了一个新的 RuntimeException(这实际上是不必要的,但不影响结果),这个异常将终止线程 2 的执行。
//由于线程 2 在到达 cyclicBarrier.await() 之前就已经终止,它不会触发屏障。因此,屏障的参与者数量永远不会达到 3,所以线程 1 和线程 3 将永远等待在 cyclicBarrier.await() 调用上。
//由于没有线程能够到达屏障点并触发 Runnable(即打印开始吃饭和结束吃饭的消息),因此这些消息将不会被打印。
//同样,由于 cyclicBarrier.await() 从未被成功调用(因为线程 2 抛出了异常),所以 "屏障结束,开始吃饭" 这句话也永远不会被任何线程打印。

总结一下,就是说线程2在没有到达屏障点之前就出现异常了,意味着线程2永远不会到达屏障点了,但是其他两个线程感知不到,就一直死等,那么如果在到达屏障点之后又出现了异常呢?

public class WorkerDemoFromMySelf {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {System.out.println("所有人都到了,开始吃饭");int i = 1 / 0;dosomething();});for (int i = 1; i < 4; i++) {new Thread(new Worker(i, cyclicBarrier)).start();}}private static void dosomething() {System.out.println("WorkerDemoFromMySelf.dosomething");}
}class Worker implements Runnable {private final int id;private final CyclicBarrier cyclicBarrier;Worker(int id, CyclicBarrier cyclicBarrier) {this.id = id;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {Thread.sleep(id * 2000L);cyclicBarrier.await();System.out.println("屏障结束,开始吃饭");} catch (Exception e) {throw new RuntimeException(e);}}
}
执行的结果就是,会执行 System.out.println("所有人都到了,开始吃饭"); 
然后线程奔溃 dosomething()不会执行,
三个 worker 也不会执行 System.out.println("屏障结束,开始吃饭");  
当然,如果我们将这个 算数异常捕获,那么程序还是可以执行成功的, 
三个 worker 会执行 System.out.println("屏障结束,开始吃饭")

那么worker在调用await()成功之后,又出现了异常怎么办,这里就不再给出例子,我们结合之前的经验去想一下,自己试验一下:ABC三个人一开始在工作,然后下班之后一起吃饭,吃完饭回家睡觉,睡觉的时候B着凉了(线程崩溃了),但是AC并不知道,所以AC还是会继续执行代码,知道遇到下一个 await(),但是 await() 需要三个人才能出发,AC只有两个人,B崩溃了,所以AC就死等。

countDownLatch

这里例子举一个和 CyclicBarrier 类似的,这样可以对比二者的差别

public class Test {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(3);for (int i = 1; i < 4; i++) {int finalI = i;new Thread(new Runnable() {@Overridepublic void run() {System.out.println("工人 " + finalI + " 正在工作...");try {Thread.sleep(finalI * 2000L);} catch (InterruptedException e) {e.printStackTrace();}countDownLatch.countDown();//不会阻塞,只是将计数值减1}}).start();}// 主线程等待,直到所有任务完成(即CountDownLatch的计数值减到0)countDownLatch.await(); // 阻塞当前线程,直到计数值为0System.out.println("所有人都到了,开始吃饭");}}
简单的说二者的区别
countDownLatch 是一个 分--总  的结构
CyclicBarrier 却可以做到 分--总--分 的结构就比如,老板早上把员工派出去了,中午集合,汇总一下工作。如果到这里就打住了,
那么可以使用countDownLatch,也可以使用CyclicBarrier,但是如果:
老板早上把员工派出去了,中午集合,汇总一下工作,下午继续把员工派出去,那么只能使用 CyclicBarrier了这两个都能达到 中午集合(等待其他线程),汇总工作(主线线程执行一段代码)。
但是区别就在于,主线程执行完代码后,还需不需要这些线程干别的事情

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 基于微信小程序的美食外卖管理系统
  • npm install --force or --legacy-peer-deps
  • 1网络安全的基本概念
  • LeetCode从入门到超凡(二)递归与分治算法
  • C++中move和forword的区别
  • spring自定义属性编辑器
  • SOCKS5代理为何比HTTP代理更快?
  • LeetCode63:不同路径II
  • 生成式AI:ChatGPT及其在各行业的应用前景
  • MyBatis-Plus 插件扩展
  • k8s部署jenkins集群时,使用ThinBackup进行定期备份
  • Mybatis Plus分页查询返回total为0问题
  • prometheus通过nginx-vts-exporter监控nginx
  • Android APN type 配置和问题
  • 数据结构之线性表(python)
  • (三)从jvm层面了解线程的启动和停止
  • [笔记] php常见简单功能及函数
  • 【comparator, comparable】小总结
  • HTTP中GET与POST的区别 99%的错误认识
  • js ES6 求数组的交集,并集,还有差集
  • node-glob通配符
  • Vue UI框架库开发介绍
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 服务器之间,相同帐号,实现免密钥登录
  • 今年的LC3大会没了?
  • 区块链将重新定义世界
  • 如何正确配置 Ubuntu 14.04 服务器?
  • 设计模式 开闭原则
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 在Unity中实现一个简单的消息管理器
  • 你对linux中grep命令知道多少?
  • Spring第一个helloWorld
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • ​ubuntu下安装kvm虚拟机
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • #预处理和函数的对比以及条件编译
  • (DFS + 剪枝)【洛谷P1731】 [NOI1999] 生日蛋糕
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (二十五)admin-boot项目之集成消息队列Rabbitmq
  • (六)激光线扫描-三维重建
  • (实战篇)如何缓存数据
  • (一)使用Mybatis实现在student数据库中插入一个学生信息
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • (转)Spring4.2.5+Hibernate4.3.11+Struts1.3.8集成方案一
  • **《Linux/Unix系统编程手册》读书笔记24章**
  • *2 echo、printf、mkdir命令的应用
  • .libPaths()设置包加载目录
  • .net MVC中使用angularJs刷新页面数据列表
  • .Net多线程Threading相关详解
  • .NET设计模式(2):单件模式(Singleton Pattern)
  • .NET中winform传递参数至Url并获得返回值或文件
  • ?.的用法
  • @SuppressWarnings注解
  • [ 物联网 ]拟合模型解决传感器数据获取中数据与实际值的误差的补偿方法
  • [20171102]视图v$session中process字段含义