当前位置: 首页 > news >正文

Java 线程及线程池的创建方式

文章目录

    • 创建线程的三种方式
      • 继承Thread类
      • 实现Runable接口
      • 实现Callable接口(有返回值)
    • 线程池
      • 池化技术
      • 优点
      • 核心类介绍
        • Executor
        • ExecutorService
        • AbstractExecutorService
        • ThreadPoolExecutor
        • Executors
        • ThreadPoolTaskExecutor
      • 创建线程池的几种方法
        • 一、通过ThreadPoolExecutor构造方法创建
        • 二、通过Executors工具类创建
        • 三、通过ThreadPoolTaskExecutor构建
      • 线程池拒绝策略
        • 执行时机
        • JDK实现
        • 自定义实现举例
          • 实现RejectedExecutionHandler接口
          • 继承AbortPolicy类
          • 匿名内部类
      • 线程池队列类型

创建线程的三种方式

继承Thread类

步骤:

  1. 继承Thread类;
  2. 重写run()方法;
  3. 新建ExtendThread对象实例,并调用start()方法启动线程 ;

代码实现:

@Slf4j
public class ExtendThread extends Thread {

    @SneakyThrows
    @Override
    public void run() {
        Thread.sleep(1000);
        log.info("I'm thread:{} ", this.getName());
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            log.info("main ==> {}", i);
            if (i % 2 == 0) {
                new ExtendThread().start();
            }
        }
        Thread.sleep(2_1000);
    }
}

结果:

 main ==> 0
 main ==> 1
 main ==> 2
 main ==> 3
 main ==> 4
 main ==> 5
 main ==> 6
 main ==> 7
 main ==> 8
 main ==> 9
 I'm thread:Thread-0 
 I'm thread:Thread-1 
 I'm thread:Thread-4 
 I'm thread:Thread-2 
 I'm thread:Thread-3 

实现Runable接口

步骤:

  1. 实现Runnable接口及其run()方法;
  2. 创建该类的对象实例;
  3. 新建一个Thread对象实例,并将Runnable实例作为其Target;
  4. 调用线程对象的start()方法启动该线程;

代码实现:

@Slf4j
public class RunnableThread implements Runnable {
    private int index = 0;

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            log.info("Thread:{} index = {}", Thread.currentThread().getName(), index++);
        }

    }
    public static void main(String[] args) throws InterruptedException {
        RunnableThread runnableThread = new RunnableThread();
        new Thread(runnableThread).start();
        new Thread(runnableThread).start();
        Thread.sleep(2_1000);
    }

}

结果:可以看到Thread-3和Thread-4共享了 RunnableThread 实例的index变量。实际使用中需要加锁,否则会有线程安全问题。

    - Thread:Thread-3 index = 0
    - Thread:Thread-4 index = 1
    - Thread:Thread-3 index = 2
    - Thread:Thread-4 index = 3
    - Thread:Thread-3 index = 4
    - Thread:Thread-4 index = 5
    - Thread:Thread-3 index = 6
    - Thread:Thread-4 index = 7
    - Thread:Thread-3 index = 8
    - Thread:Thread-4 index = 9
    - Thread:Thread-3 index = 10
    - Thread:Thread-4 index = 11
    - Thread:Thread-3 index = 12
    - Thread:Thread-4 index = 13
    - Thread:Thread-3 index = 14
    - Thread:Thread-4 index = 15
    - Thread:Thread-3 index = 16
    - Thread:Thread-4 index = 17
    - Thread:Thread-3 index = 18
    - Thread:Thread-4 index = 19
    - Thread:Thread-3 index = 20
    

实现Callable接口(有返回值)

步骤:

  1. 实现Callable接口,并实现call()方法,该方法有返回值,再创建Callable实现类的实例;
  2. 使用FutureTask类来包装Callable对象,FutureTask实现了Runnable接口,并在run() 方法中调用了Callable实例的call()方法。
  3. 使用FutureTask对象作为Thread对象的target创建并启动新线程;
  4. 调用FutureTask对象的get() 或者 get(long timeout, TimeUnit unit)方法来获得子线程执行结束后的返回值。

代码实现:

@Slf4j
public class CallableThread implements Callable<String> {
    @Override
    public String call() throws Exception {
        log.info("invoke callable thread...");
        Thread.sleep(1000);
        return "Kleven";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableThread myCallable = new CallableThread();
        FutureTask futureTask = new FutureTask(myCallable);

        new Thread(futureTask).start();
        log.info("invoke main thread...");

        String result = (String) futureTask.get();
        log.info("the futureTask result is {}", result);
    }
}

结果:该实现方式可以有返回值。

16:52:36.302 [Thread-0] INFO ******.CallableThread - invoke callable thread...
16:52:36.302 [main] INFO ******.CallableThread - invoke main thread...
16:52:37.309 [main] INFO ******.CallableThread - the futureTask result is Kleven

线程池

池化技术

池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。
在编程领域,比较典型的池化技术有:线程池、连接池、内存池、对象池等。

优点

提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁的创建和销毁线程所带来的开销。

核心类介绍

Executor

任务执行器接口,解耦任务的提交与执行,提交任务的时候无需关注任务执行细节。
提供了一个execute()方法用于执行Runnable任务,可以在主线程中直接执行,也可以新建一个线程执行。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

执行器服务接口,Executor接口的一个扩展,主要增加了以下两大类方法:

  1. 管理termination的相关方法
  2. 支持Callable任务(有返回值Future)的相关方法,单个任务或批量任务
    ExecutorService
public interface ExecutorService extends Executor {

    /**
     * 增加termination 相关方法
     */
    void shutdown();
    
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


	/**
	* 增加有返回值Future的相关方法
	*/
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
        
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
        
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
        
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

实现执行器服务的一个抽象类。提供了ExecutorService中submit, invokeAny 和 invokeAll相关方法的默认实现,这些默认实现中调用execute时需要Runable实例,所以均需要调用newTaskFor方法将Runnable+value 和 Callable 参数转换成 RunnableFuture(同时实现了Runnable 和 Future 接口)。
AbstractExecutorService
RunnableFuture

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

	/**
	* submit,invokeAny,invokeAll 方法的基本实现,最终调用的是execute方法。
	* 没有实现termination相关方法和execute,所以子类需要实现。
	* 其他代码省略......
	*/
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
	/**
	* 其他代码省略......
	*/
	
}

ThreadPoolExecutor

线程池执行器,执行器的线程池方式实现类。
ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService {

	/**
	* 工作队列,用于存放任务。
	*/
	private final BlockingQueue<Runnable> workQueue;

	/**
	* 核心线程数,默认一直存活不会回收。allowCoreThreadTimeOut设置为true时,可以回收。
	*/
	 private volatile int corePoolSize;

	/**
	* 最大线程数,受内部容量限制。
	*/
	private volatile int maximumPoolSize;

	/**
	* 空闲线程存活时间,单位纳秒。
	*/
	private volatile long keepAliveTime;

	/**
	* 线程工厂类,用于创建线程池中的线程。
	*/
	private volatile ThreadFactory threadFactory;

    /**
     * 拒绝策略执行器
     */
    private volatile RejectedExecutionHandler handler;

	/**
	* 实现核心方法 execute
	* 1. 如果当前线程数小于核心线程数,启动一个新线程执行提交的任务。
	* 2. 如果当前线程数大于等于核心线程数,则将提交的任务加入到工作队列当中。
	* 3. 如果工作队列已满,则新建非核心线程执行提交的任务。
	* 4. 如果线程数已达到最大线程数限制且队列已满,则拒绝新任务,执行RejectedExecutionHandler。
	*
	*/
	public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
                             
}

Executors

用于构建 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, 和 Callable 实例的一个工具类。

ThreadPoolTaskExecutor

此类由Spring提供,对 ThreadPoolExecutor 进行了封装,提供了核心参数的默认值,并添加了任务装饰器TaskDecorator,默认创建的工作队列是LinkedBlockingQueue。

ThreadPoolExecutor

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

	private final Object poolSizeMonitor = new Object();

	private int corePoolSize = 1;

	private int maxPoolSize = Integer.MAX_VALUE;

	private int keepAliveSeconds = 60;

	private int queueCapacity = Integer.MAX_VALUE;

	private boolean allowCoreThreadTimeOut = false;

	@Nullable
	private TaskDecorator taskDecorator;

	@Nullable
	private ThreadPoolExecutor threadPoolExecutor;

    @Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
		if (this.taskDecorator != null) {
			//创建ThreadPoolExecutor实例
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
				@Override
				public void execute(Runnable command) {
				    // 装饰器装饰任务
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

	protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
		if (queueCapacity > 0) {
			return new LinkedBlockingQueue<>(queueCapacity);
		}
		else {
			return new SynchronousQueue<>();
		}
	}
}

创建线程池的几种方法

一、通过ThreadPoolExecutor构造方法创建

构造方法参数说明:

  1. int corePoolSize: 核心线程数
  2. int maximumPoolSize: 最大线程数
  3. long keepAliveTime: 空闲线程存活时间
  4. TimeUnit unit: 时间单位,与keepAliveTime对应
  5. BlockingQueue workQueue: 工作队列
  6. threadFactory: 线程工厂类,用于创建线程池中的线程
  7. RejectedExecutionHandler handler: 拒绝策略执行器

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
     	// some code ......

    }

二、通过Executors工具类创建

  1. newFixedThreadPool:创建执行只有n个固定核心线程的线程池。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
  1. newWorkStealingPool: 创建ForkJoinPool线程池。
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

  1. newSingleThreadExecutor: 创建只有一个核心线程的执行器,严格意义上说不是线程池。可以保证所有任务按照先进先出的顺序执行。
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  1. newCachedThreadPool: 创建一个可缓存的线程池,核心线程为0,但最大线程为整形最大值。相当于没有线程数限制,只要需要就创建新的线程,如果线程空闲60秒就回收。
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
  1. ScheduledExecutorService : 创建一个单线程的可以执行延迟任务的线程池。
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
  1. newScheduledThreadPool: 创建一个指定核心线程数的可以执行延迟任务的线程池。
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

实际项目中不推荐使用工具类创建线程池,我们必须了解ThreadPoolExecutor构造方法中的每一个参数的含义,并为其设置合适的值,以规避不可预测的风险。

三、通过ThreadPoolTaskExecutor构建

如果是Spring项目,一般使用这种方式创建线程池。
代码示例:

    @Bean
    public Executor asyncExecutorService() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(1000);
        executor.setKeepAliveSeconds(300);
        executor.setThreadNamePrefix("My-Executor-");
        // 执行装饰器
        executor.setTaskDecorator((runnable) -> {
           	// 获取上级线程中的一些信息,一般存放在ThreadLocal中。
            long tid = Thread.currentThread().getId();
            return () -> {
                try {
                    if (Thread.currentThread().getId() != tid) {
                        // 将上级线程中获取的信息存到子线程中
                    }

                    runnable.run();
                } finally {
                    if (Thread.currentThread().getId() != tid) {
                        // 清空子线程ThreadLocal
                    }

                }

            };
        });
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

线程池拒绝策略

执行时机

当线程池队列已满,且达到最大线程数时,或者线程池已经中断,无法再执行新提交的任务,这时就会触发线程池拒绝策略。

JDK实现

  1. CallerRunsPolicy:如果线程池没有shutdown,则使用主线程执行task。
  2. AbortPolicy(ThreadPoolExecutor
    的默认拒绝策略):中断并抛出异常RejectedExecutionException。
  3. DiscardPolicy:忽略新任务。
  4. DiscardOldestPolicy:如果线程池没有shutdown,则从队列中删除一个最老的任务,然后再执行新任。

自定义实现举例

实现RejectedExecutionHandler接口
 private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }
继承AbortPolicy类
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {


    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        throw new RejectedExecutionException(msg);
    }}
匿名内部类
new RejectedExecutionHandler() { 
    @Override 
    public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { 
        //1. 记录日志 
        log.warn... 
        //2 将task插入的redis中 
        redis.lpush... 
    } 
}

线程池队列类型

当任务数大于corePoolSize核心线程数时,workQueue就会起作用,用来保存等待被执行的任务的阻塞队列,JDK提供了多种阻塞队列:

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务
  2. LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务
  3. SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
  4. PriorityBlockingQuene:具有优先级的无界阻塞队列
  5. DelayQueue: 支持延时获取元素的无界阻塞队列,在创建元素时可以指定多久才能从队列中获取当前元素
  6. LinkedTransferQueue: 由一个链表结构组成的无界阻塞TransferQueue队列
  7. LinkedBlockingDeque: 由一个链表结构组成的双向阻塞队列,可以从队列的两端插入和移出元素

相关文章:

  • 分布式ID生成服务
  • Vue中的条件渲染v-if、v-show
  • 【Spring Boot】响应JSON实现原理
  • 基于51单片机交通信号灯仿真_东西管制+南北管制
  • 2022“杭电杯”中国大学生算法设计超级联赛(4)
  • AngelScript -- C++程序最好的脚本语言
  • 如何编写整洁的代码
  • leetcode: 122. 买卖股票的最佳时机II
  • 字符串习题总结3
  • Java 操作RestHighLevelClient查询详解
  • 有效 TCP RST
  • 46.全排列 | 51.N皇后
  • 正则表达式的说明》
  • 【Vue】基础系列(三三)指令语法-事件及其修饰符,动态样式,v-model的用法,数据持久化存在本地localStorage,自定义指令
  • 3D感知技术(3)相机成像模型及相机标定
  • CoolViewPager:即刻刷新,自定义边缘效果颜色,双向自动循环,内置垂直切换效果,想要的都在这里...
  • C语言笔记(第一章:C语言编程)
  • Hexo+码云+git快速搭建免费的静态Blog
  • JS数组方法汇总
  • js数组之filter
  • js作用域和this的理解
  • laravel with 查询列表限制条数
  • LeetCode算法系列_0891_子序列宽度之和
  • rc-form之最单纯情况
  • XML已死 ?
  • 搞机器学习要哪些技能
  • 后端_ThinkPHP5
  • 简析gRPC client 连接管理
  • 两列自适应布局方案整理
  • 普通函数和构造函数的区别
  • 赢得Docker挑战最佳实践
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • RDS-Mysql 物理备份恢复到本地数据库上
  • 交换综合实验一
  • #HarmonyOS:Web组件的使用
  • #include
  • #大学#套接字
  • #我与虚拟机的故事#连载20:周志明虚拟机第 3 版:到底值不值得买?
  • (9)目标检测_SSD的原理
  • (八)光盘的挂载与解挂、挂载CentOS镜像、rpm安装软件详细学习笔记
  • (附源码)计算机毕业设计ssm本地美食推荐平台
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (五)大数据实战——使用模板虚拟机实现hadoop集群虚拟机克隆及网络相关配置
  • (转)fock函数详解
  • (转)程序员技术练级攻略
  • * CIL library *(* CIL module *) : error LNK2005: _DllMain@12 already defined in mfcs120u.lib(dllmodu
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .net mvc部分视图
  • .net反混淆脱壳工具de4dot的使用
  • .net下的富文本编辑器FCKeditor的配置方法
  • @RequestParam,@RequestBody和@PathVariable 区别
  • [ Linux 长征路第五篇 ] make/Makefile Linux项目自动化创建工具
  • []sim300 GPRS数据收发程序
  • [20171102]视图v$session中process字段含义
  • [Android Studio 权威教程]断点调试和高级调试