为什么80%的码农都做不了架构师?>>>
- 原文地址:Thread Pools
- 作者: Jakob Jenkov
当需要在应用中限制同时运行的线程数量时,线程池就显得很有用了。 不论是从线程创建的开销,还是内存堆栈方面,线程池都有着不错的性能。
对比,为每一个任务都创建一个新线程的方式。线程池让任务可以在线程池中的线程间进行传递。 一旦线程池中有空闲的线程,任务就会分配给其中的一个线程来执行。 在内部,任务会被放置到一个阻塞队列中,线程池中的线程会依次从队列中获取任务。 当一个新的任务呗放入队列时,一个空闲线程就会从队列中获取这个任务,并开始执行任务。 其他的空闲线程,则继续在队列上阻塞等待下一个放入队列的任务。
线程池常常被用于多线程服务端的应用场景。 每一个连接到服务端的网络连接都会包装成一个任务放入线程池。 线程池中的线程来并发处理连接的请求。 下文会进一步讨论在Java中如何实现多线程服务端程序。
Java 5 在
java.util.concurrent
包中,引入了线程池API。 所以一般使用者,无需自己实现线程池了。 可以通过java.util.concurrent.ExecutorService
的API来了解更多内容。 但是想要用好线程池,还是有必要来了解它是如何实现的。
下面这个例子,就是一个简单的线程池实现类。 请注意,例子中使用了之前阻塞队列中实现的
BlockingQueue
。 实际中,一般还是会首选Java中的阻塞队列。
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThread> threads = new ArrayList<PoolThread>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new BlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
threads.add(new PoolThread(taskQueue));
}
for(PoolThread thread : threads){
thread.start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if(this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.enqueue(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThread thread : threads){
thread.doStop();
}
}
}
public class PoolThread extends Thread {
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThread(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.dequeue();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
this.interrupt(); //break pool thread out of dequeue() call.
}
public synchronized boolean isStopped(){
return isStopped;
}
}
线程池的实现包含两个部分。
ThreadPool
类,提供了线程池的公用API;PoolThread
类,实现了线程执行任务的逻辑。
当通过
ThreadPool.execute(Runnable r)
方法来执行一个任务时,需要通过参数提供一个Runnable
实现类来增加一个任务。 阻塞队列中放置的都是待处理的Runnable
任务。
空闲的
PoolThread
会从队列中取出一个Runnable
来执行。 可以通过PoolThread.run()
方法看到取任务的执行逻辑。PoolThread
会循环着从队列中获取任务,直至停止。
可以通过
ThreadPool.stop()
方法,来停止一个ThreadPool
的运行。 停止动作会通过内部的isStopped
变量来通知线程。 依次调用每一个线程的doStop()
方法,让所有线程停止执行。 注意通过中断的方式让线程从阻塞队列的等待中退出,此时会抛出IllegalStateException
异常,所以需要处理一下。
这样的停止逻辑,会让当前正在执行的任务执行完成之后再停止。 注意
PoolThread.doStop()
中调用的this.interrupt()
方法。 这让阻塞队列上等待的线程得到一个中断异常InterruptedException
,处理这个异常之后,线程就有机会去判断isStopped
的状态。这样,线程才有机会退出。