Java 线程及线程池的创建方式
文章目录
- 创建线程的三种方式
- 继承Thread类
- 实现Runable接口
- 实现Callable接口(有返回值)
- 线程池
- 池化技术
- 优点
- 核心类介绍
- Executor
- ExecutorService
- AbstractExecutorService
- ThreadPoolExecutor
- Executors
- ThreadPoolTaskExecutor
- 创建线程池的几种方法
- 一、通过ThreadPoolExecutor构造方法创建
- 二、通过Executors工具类创建
- 三、通过ThreadPoolTaskExecutor构建
- 线程池拒绝策略
- 执行时机
- JDK实现
- 自定义实现举例
- 实现RejectedExecutionHandler接口
- 继承AbortPolicy类
- 匿名内部类
- 线程池队列类型
创建线程的三种方式
继承Thread类
步骤:
- 继承Thread类;
- 重写run()方法;
- 新建ExtendThread对象实例,并调用start()方法启动线程 ;
代码实现:
@Slf4j
public class ExtendThread extends Thread {
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000);
log.info("I'm thread:{} ", this.getName());
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
log.info("main ==> {}", i);
if (i % 2 == 0) {
new ExtendThread().start();
}
}
Thread.sleep(2_1000);
}
}
结果:
main ==> 0
main ==> 1
main ==> 2
main ==> 3
main ==> 4
main ==> 5
main ==> 6
main ==> 7
main ==> 8
main ==> 9
I'm thread:Thread-0
I'm thread:Thread-1
I'm thread:Thread-4
I'm thread:Thread-2
I'm thread:Thread-3
实现Runable接口
步骤:
- 实现Runnable接口及其run()方法;
- 创建该类的对象实例;
- 新建一个Thread对象实例,并将Runnable实例作为其Target;
- 调用线程对象的start()方法启动该线程;
代码实现:
@Slf4j
public class RunnableThread implements Runnable {
private int index = 0;
@Override
public void run() {
for (int i = 0; i < 10; i++) {
log.info("Thread:{} index = {}", Thread.currentThread().getName(), index++);
}
}
public static void main(String[] args) throws InterruptedException {
RunnableThread runnableThread = new RunnableThread();
new Thread(runnableThread).start();
new Thread(runnableThread).start();
Thread.sleep(2_1000);
}
}
结果:可以看到Thread-3和Thread-4共享了 RunnableThread 实例的index变量。实际使用中需要加锁,否则会有线程安全问题。
- Thread:Thread-3 index = 0
- Thread:Thread-4 index = 1
- Thread:Thread-3 index = 2
- Thread:Thread-4 index = 3
- Thread:Thread-3 index = 4
- Thread:Thread-4 index = 5
- Thread:Thread-3 index = 6
- Thread:Thread-4 index = 7
- Thread:Thread-3 index = 8
- Thread:Thread-4 index = 9
- Thread:Thread-3 index = 10
- Thread:Thread-4 index = 11
- Thread:Thread-3 index = 12
- Thread:Thread-4 index = 13
- Thread:Thread-3 index = 14
- Thread:Thread-4 index = 15
- Thread:Thread-3 index = 16
- Thread:Thread-4 index = 17
- Thread:Thread-3 index = 18
- Thread:Thread-4 index = 19
- Thread:Thread-3 index = 20
实现Callable接口(有返回值)
步骤:
- 实现Callable接口,并实现call()方法,该方法有返回值,再创建Callable实现类的实例;
- 使用FutureTask类来包装Callable对象,FutureTask实现了Runnable接口,并在run() 方法中调用了Callable实例的call()方法。
- 使用FutureTask对象作为Thread对象的target创建并启动新线程;
- 调用FutureTask对象的get() 或者 get(long timeout, TimeUnit unit)方法来获得子线程执行结束后的返回值。
代码实现:
@Slf4j
public class CallableThread implements Callable<String> {
@Override
public String call() throws Exception {
log.info("invoke callable thread...");
Thread.sleep(1000);
return "Kleven";
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableThread myCallable = new CallableThread();
FutureTask futureTask = new FutureTask(myCallable);
new Thread(futureTask).start();
log.info("invoke main thread...");
String result = (String) futureTask.get();
log.info("the futureTask result is {}", result);
}
}
结果:该实现方式可以有返回值。
16:52:36.302 [Thread-0] INFO ******.CallableThread - invoke callable thread...
16:52:36.302 [main] INFO ******.CallableThread - invoke main thread...
16:52:37.309 [main] INFO ******.CallableThread - the futureTask result is Kleven
线程池
池化技术
池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。
在编程领域,比较典型的池化技术有:线程池、连接池、内存池、对象池等。
优点
提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁的创建和销毁线程所带来的开销。
核心类介绍
Executor
任务执行器接口,解耦任务的提交与执行,提交任务的时候无需关注任务执行细节。
提供了一个execute()方法用于执行Runnable任务,可以在主线程中直接执行,也可以新建一个线程执行。
public interface Executor {
void execute(Runnable command);
}
ExecutorService
执行器服务接口,Executor接口的一个扩展,主要增加了以下两大类方法:
- 管理termination的相关方法
- 支持Callable任务(有返回值Future)的相关方法,单个任务或批量任务
public interface ExecutorService extends Executor {
/**
* 增加termination 相关方法
*/
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 增加有返回值Future的相关方法
*/
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService
实现执行器服务的一个抽象类。提供了ExecutorService中submit, invokeAny 和 invokeAll相关方法的默认实现,这些默认实现中调用execute时需要Runable实例,所以均需要调用newTaskFor方法将Runnable+value 和 Callable 参数转换成 RunnableFuture(同时实现了Runnable 和 Future 接口)。
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* submit,invokeAny,invokeAll 方法的基本实现,最终调用的是execute方法。
* 没有实现termination相关方法和execute,所以子类需要实现。
* 其他代码省略......
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* 其他代码省略......
*/
}
ThreadPoolExecutor
线程池执行器,执行器的线程池方式实现类。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 工作队列,用于存放任务。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 核心线程数,默认一直存活不会回收。allowCoreThreadTimeOut设置为true时,可以回收。
*/
private volatile int corePoolSize;
/**
* 最大线程数,受内部容量限制。
*/
private volatile int maximumPoolSize;
/**
* 空闲线程存活时间,单位纳秒。
*/
private volatile long keepAliveTime;
/**
* 线程工厂类,用于创建线程池中的线程。
*/
private volatile ThreadFactory threadFactory;
/**
* 拒绝策略执行器
*/
private volatile RejectedExecutionHandler handler;
/**
* 实现核心方法 execute
* 1. 如果当前线程数小于核心线程数,启动一个新线程执行提交的任务。
* 2. 如果当前线程数大于等于核心线程数,则将提交的任务加入到工作队列当中。
* 3. 如果工作队列已满,则新建非核心线程执行提交的任务。
* 4. 如果线程数已达到最大线程数限制且队列已满,则拒绝新任务,执行RejectedExecutionHandler。
*
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
Executors
用于构建 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, 和 Callable 实例的一个工具类。
ThreadPoolTaskExecutor
此类由Spring提供,对 ThreadPoolExecutor 进行了封装,提供了核心参数的默认值,并添加了任务装饰器TaskDecorator,默认创建的工作队列是LinkedBlockingQueue。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
private boolean allowCoreThreadTimeOut = false;
@Nullable
private TaskDecorator taskDecorator;
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
//创建ThreadPoolExecutor实例
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
// 装饰器装饰任务
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);
}
else {
return new SynchronousQueue<>();
}
}
}
创建线程池的几种方法
一、通过ThreadPoolExecutor构造方法创建
构造方法参数说明:
- int corePoolSize: 核心线程数
- int maximumPoolSize: 最大线程数
- long keepAliveTime: 空闲线程存活时间
- TimeUnit unit: 时间单位,与keepAliveTime对应
- BlockingQueue workQueue: 工作队列
- threadFactory: 线程工厂类,用于创建线程池中的线程
- RejectedExecutionHandler handler: 拒绝策略执行器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// some code ......
}
二、通过Executors工具类创建
- newFixedThreadPool:创建执行只有n个固定核心线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- newWorkStealingPool: 创建ForkJoinPool线程池。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- newSingleThreadExecutor: 创建只有一个核心线程的执行器,严格意义上说不是线程池。可以保证所有任务按照先进先出的顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- newCachedThreadPool: 创建一个可缓存的线程池,核心线程为0,但最大线程为整形最大值。相当于没有线程数限制,只要需要就创建新的线程,如果线程空闲60秒就回收。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- ScheduledExecutorService : 创建一个单线程的可以执行延迟任务的线程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
- newScheduledThreadPool: 创建一个指定核心线程数的可以执行延迟任务的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
实际项目中不推荐使用工具类创建线程池,我们必须了解ThreadPoolExecutor构造方法中的每一个参数的含义,并为其设置合适的值,以规避不可预测的风险。
三、通过ThreadPoolTaskExecutor构建
如果是Spring项目,一般使用这种方式创建线程池。
代码示例:
@Bean
public Executor asyncExecutorService() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(300);
executor.setThreadNamePrefix("My-Executor-");
// 执行装饰器
executor.setTaskDecorator((runnable) -> {
// 获取上级线程中的一些信息,一般存放在ThreadLocal中。
long tid = Thread.currentThread().getId();
return () -> {
try {
if (Thread.currentThread().getId() != tid) {
// 将上级线程中获取的信息存到子线程中
}
runnable.run();
} finally {
if (Thread.currentThread().getId() != tid) {
// 清空子线程ThreadLocal
}
}
};
});
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
线程池拒绝策略
执行时机
当线程池队列已满,且达到最大线程数时,或者线程池已经中断,无法再执行新提交的任务,这时就会触发线程池拒绝策略。
JDK实现
- CallerRunsPolicy:如果线程池没有shutdown,则使用主线程执行task。
- AbortPolicy(ThreadPoolExecutor
的默认拒绝策略):中断并抛出异常RejectedExecutionException。 - DiscardPolicy:忽略新任务。
- DiscardOldestPolicy:如果线程池没有shutdown,则从队列中删除一个最老的任务,然后再执行新任。
自定义实现举例
实现RejectedExecutionHandler接口
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
继承AbortPolicy类
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
throw new RejectedExecutionException(msg);
}}
匿名内部类
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
//1. 记录日志
log.warn...
//2 将task插入的redis中
redis.lpush...
}
}
线程池队列类型
当任务数大于corePoolSize核心线程数时,workQueue就会起作用,用来保存等待被执行的任务的阻塞队列,JDK提供了多种阻塞队列:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务
- LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务
- SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
- PriorityBlockingQuene:具有优先级的无界阻塞队列
- DelayQueue: 支持延时获取元素的无界阻塞队列,在创建元素时可以指定多久才能从队列中获取当前元素
- LinkedTransferQueue: 由一个链表结构组成的无界阻塞TransferQueue队列
- LinkedBlockingDeque: 由一个链表结构组成的双向阻塞队列,可以从队列的两端插入和移出元素