线程池工具类
线程池简述
为什么需要一个线程池工具类?
答:整个项目,用到线程执行任务的地方很多,不可能哪里用到就在那里直接new一个线程执行,这样资源得不到重复利用,一旦线程过多就会导致内存不足。
线程池的好处是什么?
答:使用线程池执行线程任务,当一个线程执行完成一个任务之后,线程资源回到线程池,资源得到重复利用。
线程池为什么使用自定义方式?
阿里文档推荐使用自定义线程池,因为java自带线程池都会有可能造成内存不足的问题。自定义线程池,根据服务器配置定制线程池核心线程、最大线程等,是最好的方式。
二、工具类代码和测试代码
导包
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency>
工具类
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;/*** 自定义线程创建工具类,创建线程池后不需要关闭** @author liangxn*/
public class ThreadPoolUtils {private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolUtils.class);private static ThreadPoolExecutor threadPool = null;private static final String POOL_NAME = "myPool";// 等待队列长度private static final int BLOCKING_QUEUE_LENGTH = 20000;// 闲置线程存活时间private static final int KEEP_ALIVE_TIME = 60 * 1000;private ThreadPoolUtils() {throw new IllegalStateException("utility class");}/*** 无返回值直接执行** @param runnable 需要运行的任务*/public static void execute(Runnable runnable) {getThreadPool().execute(runnable);}/*** 有返回值执行* 主线程中使用Future.get()获取返回值时,会阻塞主线程,直到任务执行完毕** @param callable 需要运行的任务*/public static <T> Future<T> submit(Callable<T> callable) {return getThreadPool().submit(callable);}public static synchronized ThreadPoolExecutor getThreadPool() {if (threadPool == null) {// 获取处理器数量int cpuNum = Runtime.getRuntime().availableProcessors();// 根据cpu数量,计算出合理的线程并发数int maximumPoolSize = cpuNum * 2 + 1;// 核心线程数、最大线程数、闲置线程存活时间、时间单位、线程队列、线程工厂、当前线程数已经超过最大线程数时的异常处理策略threadPool = new ThreadPoolExecutor(maximumPoolSize - 1,maximumPoolSize,KEEP_ALIVE_TIME,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(BLOCKING_QUEUE_LENGTH),new ThreadFactoryBuilder().setNameFormat(POOL_NAME + "-%d").build(),new ThreadPoolExecutor.AbortPolicy() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {LOGGER.warn("线程爆炸了,当前运行线程总数:{},活动线程数:{}。等待队列已满,等待运行任务数:{}",e.getPoolSize(),e.getActiveCount(),e.getQueue().size());}});}return threadPool;}
}
线程池的七个参数讲解
- corePoolSize:线程池核心大小。线程池中会维护最小的线程线程数量,即使这些线程处于空闲状态,他们也不会销毁,除非设置了allowCoreThreadTimeOut。
- maximumPoolSize:线程池最大线程数量。一个任务被提交到线程池以后,首先会找有没有空闲存活的线程,如果有则直接将任务提交给空闲线程,如果没有,就会缓存到工作队列中。如果工作队列满了,才会新建一个线程,然后从工作队列的头部去除一个任务交由新线程来处理,而将刚提交的任务放入工作队列的尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量限制,这个数量即由maximunPoolSize指定。
- keepAliveTime:空闲线程存活时间。一个线程如果处于空闲状态,且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程就会被销毁。
- unit:空闲线程存活时间单位。keepAliveTime的计量单位。
- workQueue:工作队列。新任务被提交以后,会进入到此工作队列中,任务调度时再从队列中取出任务。
- threadFactory:线程工厂。创建一个新线程使用的工厂,可以用来设定线程名
- hander:拒绝策略。当工作队列中的任务已达到最大限制,并且线程池中的线程数量已达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题。
四种工作队列
- ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中的线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经满了,则创建一个新的线程,如果线程数量已经达到了maxPoolSize,则会执行拒绝策略。
- LinkedBlockingQuene:基于链表的无界阻塞队列(其实最大容量为Integer.MAX)。按FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到了corePoolSize之后,再有新任务进来,会一直存在该队列,而不会去创建新的线程,知道maxPoolSize,因此使用该工作队列时,参数ma'xPoolSize其实是不起作用的。
- SynchronousQuene:一个不缓存任务的阻塞队列。生产者放入一个任务,必须等到消费者取出这个任务,也就是说,新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
- PriorityBlockingQueue:具有优先级的无界阻塞队列,直到资源耗尽。默认情况下,元素采用自然排序升序排列。也可以自定义实现类compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是,不能保证同优先级元素的顺序。
四种拒绝策略
ThreadPoolExecutor.AbortPolicy | 直接抛出异常(默认拒绝策略) |
ThreadPoolExecutor.DiscardPolicy | 丢弃当前被拒绝的任务,而不抛出异常 |
ThreadPoolExecutor.DiscardOldestPolicy | 将工作任务中最老的任务丢弃,然后重新尝试接纳被拒绝的任务 |
ThreadPoolExecutor.CallerRunsPolicy | 在客户端中执行被拒绝的任务 |
例子1
Future<String> future = ThreadPoolUtils.submit(() -> {return "我有返回值哦";});try {logger.info(future.get());} catch (InterruptedException | ExecutionException e) {logger.error("任务超过指定时间未返回值,线程超时退出");}// 控制台打印日志:21:04:19.428 [main] INFO - 我有返回值哦
例子2
Future<String> future = ThreadPoolUtils.submit(() -> {return "我有返回值哦";});try {logger.info(future.get());} catch (InterruptedException | ExecutionException e) {logger.error("任务超过指定时间未返回值,线程超时退出");}// 控制台打印日志:21:04:19.428 [main] INFO - 我有返回值哦
例子3
int loop = 40;
for (int i = 0; i < loop; i++) {logger.info("任务{}", i);ThreadPoolUtils.execute(() -> {logger.info("干活好累");try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}logger.info("终于干完了");});}logger.info("我在这儿等着你回来等你回来");// 控制台打印:21:08:08.494 [main] INFO - 任务0............21:08:08.540 [main] INFO - 任务521:08:08.541 [main] INFO - 任务621:08:08.540 [myPool-4] INFO - 干活好累21:08:08.540 [myPool-1] INFO - 干活好累21:08:08.540 [myPool-3] INFO - 干活好累............21:08:08.543 [main] INFO - 任务2121:08:08.548 [main] INFO - 任务2221:08:08.548 [main] INFO - 任务2321:08:08.548 [myPool-21] INFO - 干活好累21:08:08.549 [main] INFO - 任务2421:08:08.549 [myPool-22] INFO - 干活好累21:08:08.549 [main] INFO - 任务2521:08:08.549 [myPool-23] INFO - 干活好累21:08:08.549 [main] INFO - 任务26............21:08:08.551 [myPool-1] INFO - 干活好累21:08:08.551 [myPool-6] INFO - 终于干完了21:08:08.551 [myPool-7] INFO - 终于干完了21:08:08.551 [myPool-5] INFO - 干活好累21:08:08.551 [main] INFO - 任务3521:08:08.551 [main] INFO - 任务3621:08:08.551 [main] INFO - 任务3721:08:08.551 [main] INFO - 任务3821:08:08.551 [main] INFO - 任务3921:08:08.551 [main] INFO - 我在这儿等着你回来等你回来21:08:08.551 [myPool-2] INFO - 干活好累21:08:08.551 [myPool-3] INFO - 干活好累21:08:08.551 [myPool-8] INFO - 干活好累21:08:08.551 [myPool-6] INFO - 干活好累21:08:08.551 [myPool-7] INFO - 干活好累21:08:08.552 [myPool-13] INFO - 终于干完了21:08:08.552 [myPool-12] INFO - 终于干完了............21:08:08.561 [myPool-7] INFO - 终于干完了21:08:08.561 [myPool-3] INFO - 终于干完了
例子4
// 测试10个线程使用工具类
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {executorService.submit(new Runnable() {@Overridepublic void run() {final String name = Thread.currentThread().getName();ThreadPoolUtils.execute(() -> {logger.info("[{}],干活好累", name);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}logger.info("[{}],终于干完了", name);});}});}logger.info("不用等他,我们先干");// 控制台打印:21:11:49.946 [main] INFO - 不用等他,我们先干21:11:49.991 [myPool-4] INFO - [pool-2-thread-7],干活好累21:11:49.991 [myPool-3] INFO - [pool-2-thread-2],干活好累21:11:49.991 [myPool-5] INFO - [pool-2-thread-5],干活好累21:11:49.991 [myPool-8] INFO - [pool-2-thread-6],干活好累21:11:49.991 [myPool-1] INFO - [pool-2-thread-3],干活好累21:11:49.991 [myPool-2] INFO - [pool-2-thread-9],干活好累21:11:49.991 [myPool-9] INFO - [pool-2-thread-10],干活好累21:11:49.991 [myPool-7] INFO - [pool-2-thread-1],干活好累21:11:49.991 [myPool-6] INFO - [pool-2-thread-4],干活好累21:11:49.991 [myPool-0] INFO - [pool-2-thread-8],干活好累21:11:50.091 [myPool-7] INFO - [pool-2-thread-1],终于干完了21:11:50.091 [myPool-4] INFO - [pool-2-thread-7],终于干完了21:11:50.091 [myPool-5] INFO - [pool-2-thread-5],终于干完了21:11:50.091 [myPool-2] INFO - [pool-2-thread-9],终于干完了21:11:50.091 [myPool-0] INFO - [pool-2-thread-8],终于干完了21:11:50.091 [myPool-1] INFO - [pool-2-thread-3],终于干完了21:11:50.091 [myPool-8] INFO - [pool-2-thread-6],终于干完了21:11:50.091 [myPool-6] INFO - [pool-2-thread-4],终于干完了21:11:50.091 [myPool-3] INFO - [pool-2-thread-2],终于干完了21:11:50.091 [myPool-9] INFO - [pool-2-thread-10],终于干完了
例子5
int loop = 2000;
for (int i = 0; i < loop; i++) {ThreadPoolUtils.execute(() -> {logger.info("干活好累");try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}logger.info("终于干完了");});}logger.info("不用等他,我们先干");// 控制台打印:............21:13:25.083 [myPool-19] INFO - 干活好累21:13:25.083 [myPool-8] INFO - 干活好累21:13:25.083 [myPool-30] INFO - 干活好累21:13:25.085 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.085 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.085 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.085 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.106 [myPool-7] INFO - 干活好累21:13:25.106 [myPool-11] INFO - 干活好累21:13:25.106 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.106 [myPool-6] INFO - 干活好累21:13:25.106 [myPool-4] INFO - 干活好累21:13:25.106 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:100021:13:25.106 [main] WARN - 线程爆炸了,当前运行线程总数:33,活动线程数:33。等待队列已满,等待运行任务数:1000............