当前位置: 首页 > news >正文

线程池吞掉异常的case:源码阅读与解决方法

1. 问题背景

有一天给同事CR,看到一段这样的代码

try {for (param : params) {//并发处理,func无返回值ThreadPool.submit(func(param));}
} catch (Exception e) {log.info("func抛异常啦,参数是:{}", param)
}

我:你这段代码是利用并发降低RT对吧,如果func内部抛异常,你确定可以catch到吗

同事:可以啊, 为什么不可以(...

我:不如你run一把,在func mock一个异常出来试试

同事:我靠还真是

我:你可以用execute,改动比较小

同事:那么是为什么呢

2. 同事的例子

import java.util.concurrent.*;public class ThreadPoolTest {public static void main(String[] args) throws Exception {ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());testExecute(executorService);Thread.sleep(2000);testSubmit1(executorService);Thread.sleep(2000);testSubmit2(executorService);}private static void testExecute(ExecutorService executorService) {executorService.execute(() -> {System.out.println("执行线程池execute方法");throw new RuntimeException("execute方法抛出异常");});}private static void testSubmit1(ExecutorService executorService) {executorService.submit(() -> {System.out.println("执行线程池submit方法1");throw new RuntimeException("submit方法1抛出异常");});}private static void testSubmit2(ExecutorService executorService) throws Exception {Future<Object> feature = executorService.submit(() -> {System.out.println("执行线程池submit方法2");throw new RuntimeException("submit方法2抛出异常");});feature.get();}
}

执行结果:

执行线程池execute方法
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: execute方法抛出异常at ThreadPoolTest.lambda$testExecute$0(ThreadPoolTest.java:23)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
执行线程池submit方法1
执行线程池submit方法2
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit方法2抛出异常at java.util.concurrent.FutureTask.report(FutureTask.java:122)at java.util.concurrent.FutureTask.get(FutureTask.java:192)at ThreadPoolTest.testSubmit2(ThreadPoolTest.java:39)at ThreadPoolTest.main(ThreadPoolTest.java:17)
Caused by: java.lang.RuntimeException: submit方法2抛出异常at ThreadPoolTest.lambda$testSubmit2$2(ThreadPoolTest.java:37)at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)at java.util.concurrent.FutureTask.run(FutureTask.java)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

3. 原理分析

3.1 线程池包的继承结构

3.2 submit和execute方法的差异

3.2.1 execute

方法定义在最顶层的Executor接口,并且Executor接口有且仅有这一个方法

public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}

方法实现在ThreadPoolExecutor:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}

实际执行的过程,在worker(是runnable的实现类)的run方法,run方法实际执行的是runWorker方法

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

可以看到执行过程中,如果task.run();发生异常,没有catch处理,异常会层层向外抛出;最终进入finally块,执行processWorkerExit;

3.2.2 submit

submit方法定义在ExecutorService

public interface ExecutorService extends Executor {/*** Submits a value-returning task for execution and returns a* Future representing the pending results of the task. The* Future's {@code get} method will return the task's result upon* successful completion.** <p>* If you would like to immediately block waiting* for a task, you can use constructions of the form* {@code result = exec.submit(aCallable).get();}** <p>Note: The {@link Executors} class includes a set of methods* that can convert some other common closure-like objects,* for example, {@link java.security.PrivilegedAction} to* {@link Callable} form so they can be submitted.** @param task the task to submit* @param <T> the type of the task's result* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/<T> Future<T> submit(Callable<T> task);/*** Submits a Runnable task for execution and returns a Future* representing that task. The Future's {@code get} method will* return the given result upon successful completion.** @param task the task to submit* @param result the result to return* @param <T> the type of the result* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/<T> Future<T> submit(Runnable task, T result);/*** Submits a Runnable task for execution and returns a Future* representing that task. The Future's {@code get} method will* return {@code null} upon <em>successful</em> completion.** @param task the task to submit* @return a Future representing pending completion of the task* @throws RejectedExecutionException if the task cannot be*         scheduled for execution* @throws NullPointerException if the task is null*/Future<?> submit(Runnable task);
}

实现在AbstractExecutorService

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

可以看到这里创建了RunnableFuture(而不是基础的worker),顾名思义,RunnableFuture同时实现了Runnable和Future接口,也就意味着可以对该任务执行get操作,看看RunnableFuture的run方法:

    public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

catch块对方法异常做了处理,与执行结果一同在Future中暂存起来;submit()执行完毕后返回Future对象,执行future.get()会触发异常的抛出;

当然了,如果你只是执行了submit,没有获取future,异常就会“神奇地”消失。

参考:

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队

https://zhuanlan.zhihu.com/p/651997713

相关文章:

  • 【Python支持多种数据类型及案列】
  • ROS系统中解析通过CAN协议传输的超声波传感器数据
  • nginx安装环境部署(完整步骤)
  • java如何截取字符串
  • 【亲测可用】docker进入正在运行的容器
  • 【代码随想录训练营】【Day 50】【动态规划-9】| Leetcode 198, 213, 337
  • 访问jlesage/firefox镜像创建的容器中文乱码问题
  • Mac 终端报错 zsh: command not found: brew 解决方案
  • JVM 三色标记算法
  • Linux的操作命令(2)
  • 计算机SCI期刊,IF=13.3+,期刊质量非常高,声誉佳
  • Linux系统学习——指令二
  • 无人机RTMP推流EasyDSS直播平台推流成功,不显示直播按钮是什么原因?
  • 阿三再现强盗行为,vivo、OPPO或彻底失去印度市场
  • 【proteus仿真】基于51单片机的电压检测系统
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • CentOS从零开始部署Nodejs项目
  • Dubbo 整合 Pinpoint 做分布式服务请求跟踪
  • input的行数自动增减
  • Next.js之基础概念(二)
  • swift基础之_对象 实例方法 对象方法。
  • vue:响应原理
  • webpack+react项目初体验——记录我的webpack环境配置
  • 工程优化暨babel升级小记
  • 京东美团研发面经
  • 排序算法之--选择排序
  • 普通函数和构造函数的区别
  • 微信公众号开发小记——5.python微信红包
  •  一套莫尔斯电报听写、翻译系统
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • 函数计算新功能-----支持C#函数
  • ​如何防止网络攻击?
  • ​云纳万物 · 数皆有言|2021 七牛云战略发布会启幕,邀您赴约
  • # Kafka_深入探秘者(2):kafka 生产者
  • #162 (Div. 2)
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • (30)数组元素和与数字和的绝对差
  • (ZT)薛涌:谈贫说富
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)springboot宠物医疗服务网站 毕业设计688413
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (企业 / 公司项目)前端使用pingyin-pro将汉字转成拼音
  • (生成器)yield与(迭代器)generator
  • .bat批处理(十一):替换字符串中包含百分号%的子串
  • .NET Conf 2023 回顾 – 庆祝社区、创新和 .NET 8 的发布
  • .NET值类型变量“活”在哪?
  • .net中的Queue和Stack
  • /使用匿名内部类来复写Handler当中的handlerMessage()方法
  • :中兴通讯为何成功
  • @Autowired和@Resource的区别
  • [ IO.File ] FileSystemWatcher
  • [2]十道算法题【Java实现】
  • [8-27]正则表达式、扩展表达式以及相关实战
  • [AIGC] SQL中的数据添加和操作:数据类型介绍