当前位置: 首页 > news >正文

线程池异常处理之重启线程处理任务

线程池异常处理之重启线程处理任务

本文记录一下在使用线程池过程中,如何处理 while(true)循环长期运行的任务,在业务处理逻辑中,如果抛出了运行时异常时怎样重新提交任务。

这种情形在Kafka消费者中遇到,当为每个Consumer开启一个线程时, 在线程的run方法中会有while(true)循环中消费Topic数据。

本文会借助Google Guava包中的com.google.common.util.concurrent.ThreadFactoryBuilder类创建线程工厂,因为它能不仅很方便地为线程池设置一个易读的名称,而且很方便地设置线程执行过程中出现异常时 用来处理异常的 异常处理器,示例如下:

 MyExceptionHandler exceptionHandler = new MyExceptionHandler();
//设置线程名称
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
//设置异常处理器
                .setUncaughtExceptionHandler(exceptionHandler).build();

当线程执行过程中出现了异常,MyExceptionHandler#uncaughtException(...)方法就会由JVM调用。在java.lang.ThreadGroup#uncaughtException方法注释提到:由于每个线程都隶属于某个线程组,如果该线程所属的线程组有父线程组,则调用父线程组中指定的异常处理器;若没有父线程组,则判断 有没有 为线程自定义 异常处理器,而在本文中,定义了自己的异常处理器:MyExceptionHandler,因此线程执行异常时就会调用MyExceptionHandler#uncaughtException(...)

创建好了线程工厂,接下来就是创建线程池了。

CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

CustomThreadPoolExecutor 继承ThreadPoolExecutor扩展线程池的功能:若线程执行某任务失败时 需要重新提交该任务,可以重写CustomThreadPoolExecutor#afterExecute方法,在该方法中实现提交任务。

public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        //若线程执行某任务失败了,重新提交该任务
        if (t != null) {
            Runnable task =  r;
            System.out.println("restart task...");
            execute(task);
        }
    }
}

如果在new ThreadPoolExecutor时未传入 ThreadFactory参数,如下:

BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue);

其实是调用Executors.defaultThreadFactory()创建默认的ThreadFactory:

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

它为每个创建的线程设置了名字:"pool-xxx-thread-xxx"。而采用默认的ThreadFactory时相应的默认的异常处理器执行逻辑是由java.lang.ThreadGroup#uncaughtException方法来处理的,其中处理异常的相关源码如下:

else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }

如果线程执行过程中抛出的错误 不是 ThreadDeath对象,那么只是简单地:打印线程名称,并将堆栈信息记录到控制台中,任务结束。如果是一个ThreadDeath对象,看ThreadDeath类的源码注释可知:异常处理器不会被调用,程序不会输出任何日志信息。(有木有碰到这种情况,线程池中的线程不知不觉地消失了……)

The ThreadGroup#uncaughtException top-level error handler does not print out a message if ThreadDeath is never caught.

在本文的示例程序CustomThreadPoolExecutorTest.java中,为了模拟在while(true)循环中抛出异常,定义一个 Boolean 变量 stop 使得线程执行一段时间抛出一个异常:也即先让test线程运行一段时间,然后主线程设置 stop 变量的值,使得test线程抛出运行时异常。(完整代码可参考文末)

if (stop) {
    throw new RuntimeException("running encounter exception");
 }

线程池提交 while(true)循环任务:

        threadPoolExecutor.execute(()->{
            //提交的是一个while(true)任务,正常运行时这类任务不会结束
            while (true) {
                System.out.println("start processing");
                try {
                    //模拟任务每次执行耗时1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

threadPoolExecutor.execute提交了一个任务,这会耗费一个线程来执行该任务,由于任务是个while(true)循环,正常情况下该任务不会终止。换句话说,这个任务会"永久"占用线程池中的一个线程。因此,对于while(true)循环的任务需要注意:

创建线程池new ThreadPoolExecutor(...)时,指定的 corePoolSize 不能小于 需要提交的任务个数,否则有些任务不能立即启动,线程池需要增加线程(最大增加到maximumPoolSize 个线程)来处理。如果 maximumPoolSize 小于需要提交的任务个数,由于每个任务永久地占用一个线程执行,那么有些任务就只能一直堆积在taskQueue 任务队列中了

而在本示例中,main 线程通过设置 stop 变量让 test 线程抛出异常,自定义的异常处理器MyExceptionHandler就会处理该异常,并且在该任务执行“完成”后,JVM会调用线程池的afterExecute(...)方法,又重新提交该任务。

总结

这篇文章总结了本人在使用JAVA线程池中的一些理解,写代码以线程池方式提交任务,程序跑一段时间,没有数据输出了,好像暂停了,看堆栈信息线程莫名其妙地消失了,或者阻塞在任务队列上拿不到Task了……因此需要明白线程池底层执行的机制。

  1. 在实现Kafka消费者过程中,每个消费者一个线程,使用线程池来管理线程、提交任务。但总过一段时间后Kafka Broker Rebalance,看后台日志是Kafka Consumer在解析一些消息时抛出了运行时异常。这样线程池就结束了这个任务,由于没有重写afterExecute()方法 当任务出现异常时重新提交任务。因此,这意味着永久丢失了一个消费者线程。而少了一个消费者,Kafka就发生了Rebalance。
  2. 尽量使用线程池来管理线程,而不是自己 new Thread(),一方面是采用线程池可方便地为每个线程设置合理的名称,这样便于debug。另一方面,通过 implements Thread.UncaughtExceptionHandler自定义线程运行时异常处理器,可方便地打印出线程异常日志。
  3. 可继承ThreadPoolExecutor扩展线程池功能,比如在任务执行完成后,执行一些额外的操作。关于如何扩展线程池,ElasticSearch源码中线程池模块很值得借鉴。
  4. 上文中提到的异常处理器 和 向线程池提交任务的拒绝策略RejectedExecutionHandler是两回事。另外,为了图方便,直接在main方法中创建线程池了,实际应用中肯定不能这样。这里给出的代码只是Examples。

最后给出一个思考问题:针对需要长期运行的任务,比如每隔一段时间从Redis中读取若干条数据。是提交一个Runnable任务,这个Runnable任务里是个while(true)循环读取数据:

        executor.execute(()->{
            while (true) {
                //读若干条数据
                read();
                sleep(1000);
            }
        });

还是:在一个外部while循环中,不断地向 taskQueue 任务队列中提交任务呢?

        while (true) {
            executor.execute(()->{
                read();
            });
            sleep(1000);
        }

CustomThreadPoolExecutorTest.java 完整代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class CustomThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args)throws InterruptedException {
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        //定义 线程执行过程中出现异常时的 异常处理器
        MyExceptionHandler exceptionHandler = new MyExceptionHandler();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("test-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();
        CustomThreadPoolExecutor threadPoolExecutor = new CustomThreadPoolExecutor(1, 2, 1, TimeUnit.HOURS, taskQueue,threadFactory);

        threadPoolExecutor.execute(()->{
            //提交的是一个while(true)任务,正常运行时这类任务不会结束
            while (true) {
                System.out.println("start processing");
                try {
                    //模拟任务每次执行耗时1000ms
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");

                if (stop) {
                    throw new RuntimeException("running encounter exception");
                }
            }
        });

        Thread.sleep(2000);
        //模拟 test- 线程 在执行任务过程中抛出异常
        stop = true;
        Thread.sleep(1000);
        stop = false;
    }

    private static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println(String.format("thread name %s, msg %s", t.getName(), e.getMessage()));
        }
    }
}

ThreadPoolExecutorTest.java 测试线程在执行过程中抛出ThreadDeath对象:

import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
    private static volatile boolean stop = false;
    public static void main(String[] args) throws InterruptedException{
        BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, taskQueue);
        executor.execute(()->{
            while (true) {
                System.out.println("start processing");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    //ignore
                }
                System.out.println("finish processing");
                if (stop) {
                    throw new ThreadDeath();
//                    throw new RuntimeException("runtime exception");
                }
            }
        });
        Thread.sleep(3000);
        stop = true;
        Thread.sleep(2000);

        executor.execute(()->{
            //能够继续提交任务执行
            System.out.println("continue submit runnable task,is All thread in thread pool dead?");
        });
    }
}

参考资料:

  • Reexecute task from within UncaughtExceptionHandler?

原文:https://www.cnblogs.com/hapjin/p/10240863.html

转载于:https://www.cnblogs.com/hapjin/p/10240863.html

相关文章:

  • ==和equals
  • Windows + Ubuntu 16.04 双系统安装
  • 蚂蚁课堂:压测工具
  • 简单的登录注册逻辑。
  • nginx 301跳转https后post请求失效问题解决
  • 解析范式(1NF-4NF)
  • P4345 [SHOI2015]超能粒子炮·改 Lucas
  • boost库:字符串处理
  • OpenSSL生成私钥和公钥
  • centos7.5配置双网卡上网
  • 工作总结报告
  • 孤荷凌寒自学python第七十八天开始写Python的第一个爬虫8
  • java 多线程
  • .NET Core IdentityServer4实战-开篇介绍与规划
  • Matplotlib中plt.rcParams用法(设置图像细节)
  • [译]如何构建服务器端web组件,为何要构建?
  • 「前端早读君006」移动开发必备:那些玩转H5的小技巧
  • classpath对获取配置文件的影响
  • Git初体验
  • IDEA常用插件整理
  • mac修复ab及siege安装
  • Sublime text 3 3103 注册码
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 简析gRPC client 连接管理
  • 将 Measurements 和 Units 应用到物理学
  • 爬虫模拟登陆 SegmentFault
  • 腾讯优测优分享 | Android碎片化问题小结——关于闪光灯的那些事儿
  • 在Mac OS X上安装 Ruby运行环境
  • 阿里云ACE认证之理解CDN技术
  • ​LeetCode解法汇总2670. 找出不同元素数目差数组
  • ​创新驱动,边缘计算领袖:亚马逊云科技海外服务器服务再进化
  • #1015 : KMP算法
  • #AngularJS#$sce.trustAsResourceUrl
  • #laravel 通过手动安装依赖PHPExcel#
  • #在 README.md 中生成项目目录结构
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (echarts)echarts使用时重新加载数据之前的数据存留在图上的问题
  • (ZT)出版业改革:该死的死,该生的生
  • (附源码)node.js知识分享网站 毕业设计 202038
  • (附源码)springboot课程在线考试系统 毕业设计 655127
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (篇九)MySQL常用内置函数
  • (十) 初识 Docker file
  • (译) 理解 Elixir 中的宏 Macro, 第四部分:深入化
  • (转)清华学霸演讲稿:永远不要说你已经尽力了
  • (转)使用VMware vSphere标准交换机设置网络连接
  • (转载)微软数据挖掘算法:Microsoft 时序算法(5)
  • (自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .net用HTML开发怎么调试,如何使用ASP.NET MVC在调试中查看控制器生成的html?
  • @Autowired自动装配
  • @private @protected @public
  • @WebServiceClient注解,wsdlLocation 可配置