前言
线程池是多并发编程中经常用到,了解是线程池的使用和原理是java程序员的必修课。编写多线程的程序推荐使用线程池而不是自己创建线程,因为线程池中的线程可以复用,复用线程可以降低线程创建和销毁的资源消耗,线程池帮助管理、调度、监控线程,可以防止无限制的创建线程,消耗完系统资源
简单使用
几种常用的线程池:
- newCachedThreadPool,可缓存的线程池,线程池容量几乎为无限(Interger. MAX_VALUE),当一个新线程任务提交,如果线程池没有空闲的线程,则创建一个新线程执行,否则使用空闲线程。
- newFixedThreadPool,定长的线程池,线程池维持定长的线程,即使没有任务运行也不会关闭线程,当新任务提交,线程池中线程都在执行,则将任务放入阻塞队列中排队等待空闲线程运行。
- newScheduledThreadPool,定长的线程池,支持定时及周期性执行任务,newFixedThreadPool的特殊化,线程池容量为1
- newSingleThreadExecutor,线程池只有一个线程,所有任务按照提交的先后顺序执行
使用例子
1 | public class TreadPoolTest { |
线程池实现
线程池类的关系图:
Executor接口:定义了运行新任务的execute接口
ExecutorService接口:继承了Executor接口,增加添加了一些用来管理执行器生命周期和任务生命周期的方法、支持Future任务的方法
AbstractExecutorService抽象类:实现了一些ExecutorService的通用接口
ThreadPoolExecutor类:实现了线程池的完整的方法
Scheduled相关的类:有关定期执行任务
Executors:是个工厂类,内部实际上是根据不同的线程池选择不同的参数生产ThreadPoolExecutor对象
因此线程池的实现方法主要在ThreadPoolExecutor,接下来会重点分析ThreadPoolExecutor来了解java线程池的实现
Executors
当我们使用线程池通常是通过Executors的静态方法得到上述的几种线程池,其实Executors是个工厂类,内部是返回ThreadPoolExecutor的实例化对象,ScheduledThreadPoolExecutor提供定时启动线程执行任务的线程池,这里先不做介绍。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
可以看出是通过在实例化ThreadPoolExecutor对象时传入不同的参数得到不同的线程池,接下来我们具体看ThreadPoolExecutor类
ThreadPoolExecutor
常用变量
ctl是ThreadPoolExecutor的重要变量,记录了线程池的运行状态runState和线程池有效的线程数workerCount(这里的workerCount指的正在运行任务的线程数,不算入空闲的线程数)。
ctl是一个AtomicInteger类,其后29位记录workerCount的数量,因此总共可以记录(2^29)-1 (约5亿),第30位记录了线程池的运行状态,总共有五种状态:
- RUNNING:接受新任务并且能够处理阻塞队列中的任务
- SHUTDOWN:不能接受新任务但是能够继续处理阻塞队列中的任务,当线程池处于RUNNING状态时,调用shutdown()方法进入该状态,这时线程池不接受新的任务,但是任然会继续处理完池中正在运行的任务和阻塞队列中的任务
STOP:不能接受新任务也不处理队列中的任务,并且会中断正在运行的任务,当线程池处于RUNNING状态时,调用shutdownNow()方法进入该状态
TIDYING:所有的任务都结束,线程池的数量为0,由SHUTDOWN或STOP状态转化而来,线程池进入该状态会调用terminated()的钩子方法
- TERMINATED:terminated()方法调用完毕,线程池生命周期结束
线程池状态转化图:
线程池同时维护了线程的集合(其中存放Worker对象,即线程池中线程的封装对象)和一个ReentrantLock锁,后文会看到其使用
1 | private final ReentrantLock mainLock = new ReentrantLock(); |
构造函数
1 | // 线程池的重要参数,前三位表示线程池的状态,后29位表示池中的线程数 |
构造函数传入的字段主要为:
- corePoolSize:线程池的核心线程数,线程池维持的定长线程数,如果线程池的线程数量小于线程池的核心线程数,即使线程上没有任务也不会关闭
- maximumPoolSize:线程池中的最大的线程数量
- keepAliveTime:当线程池中的线程大于corePoolSize,空闲线程将会等待新任务,如果等待时间超过keepAliveTime,将会结束该空闲线程
- unit:keepAliveTime的时间单位
workQueue:阻塞队列,当任务超过corePoolSize,将会进入等待队列,该队列是一个阻塞队列
threadFactory:用来创建新线程的工厂类,用户可以传入定制的threadFactory来创建定制的线程
- handler:线程池的饱和策略,当线程池线程数量大于等于maximumPoolSize并且等待队列满时,新的任务将会交给handler处理
当新的任务提交时,先后会通过与corePoolSize,workQueue,maximumPoolSize,handler判断任务的去向
如果线程池的线程数小于corePoolSize,则会创建新线程运行任务,即使线程池中还有空闲线程。
如果线程池的线程数大于等于corePoolSize
- 如果等待队列中还没满,则将其放入等待队列中
- 如果等待队列满
- 如果线程池的线程数小于maximumPoolSize,则创建新线程运行任务
- 如果线程池的线程数大于等于maximumPoolSize,则交给handler来处理
线程池的线程如果执行完任务处于空闲状态时:
- 如果线程池的线程数小于corePoolSize,则继续保留空闲线程,等待新任务
- 如果线程池的线程数大于等于corePoolSize,则空闲线程等待新任务,等待时间超过keepAliveTime,则会回收空闲线程
任务提交
任务通过execute方法提交 ,如上述所说的,如果线程池中线程个数小于corePoolSize,调用addWorker判断并新建一个线程执行任务,否则尝试将任务放入等待队列中,如果等待队列满了,则需要和线程池的最大容量比较,如果比其小,则新建一个线程运行,addWorker方法中的第二个参表示比较是corePoolSize还是maximumPoolSize,从下文的adddWorker方法中可以看到。如果都失败,则调用reject方法将任务交给handle处理。
1 | public void execute(Runnable command) { |
addWorker用于创建一个新的线程用来执行任务,firstTask是线程的第一个任务,core为true时表示当前线程池的线程数量小于corePoolSize,false表示数量大于corePoolSize且小于maximumPoolSize,因为不论是线程池的数量小于corePoolSize,还是等待队列满,线程池的线程数量大于corePoolSize且小于maximumPoolSize都需要创建新线程,这是两种情况下创建新线程。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
addWorker方法用于新建一个线程,启动并加入到workers当中
通过addWorker方法看来,只有当线程池中线程的数量符合要求,并且线程池的状态处于RUNNING或者处于SHUTDOWN并且新任务为空的时候才会新建线程运行任务,新线程在运行之前需要加锁并判断是否符合运行条件,加锁防止线程池的状态在这之间发生了变化,最后通过t.start()启动新线程,实际上Worker是一个Runnable,t是新建Worker对象w的Thread封装,start启动的是Worker线程,运行worker的run方法,通过接下来查看Worker代码可以看到。
Worker
1 | private final class Worker |
可以看到Worker实现了Runnable,在构造函数中通过getThreadFactory().newThread(this)创建了新线程,因此在addWorker方法中启动的实际上是Worker对象的线程,线程调用start启动实际上是运行run方法中的代码,Worker中的run方法中调用了runWorker方法,在看runWorker方法之前,我们先来看Worker还继承了AQS,使用AQS实现独占锁的功能。
- 为什要设置锁:因为Worker对象不管是在初始化还是在运行过程当中都不希望被中断,使用AQS独占锁在获取独占锁之后,线程是不会被中断,只有线程运行结束才会进行中断的判断(详见AQS独占锁实现),同时我们可以通过判断线程是否占有独占锁来判断线程是否在运行,通过isHeldExclusively方法返回false说明所已经被占有,线程正在运行。
- 为什么不用ReentrantLock:通过tryAcquire方法可以看到,Worker的锁是不可重入的,这是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
同时Worker在初始化时是不能被中断,也不能获得其锁,getState(-1),所以在runWorker方法中一开始需要unlock。
接下来让我们看看runWorker方法,了解线程池中的线程是怎么被复用的
runWorker
1 | final void runWorker(Worker w) { |
runWorker方法是将worker中的任务作为第一个任务,while循环不断从等待队列取出任务执行,在循环中的过程:
首先判断线程池的状态是不是STOP或者后续转态,如果是且线程不是中断转态,则需要中断线程,if语句的写法有点看不懂。
调用beforeExecute方法,执行任务运行前的操作
调用任务的run方法,执行任务
调用afterExecute方法,执行任务结束后的操作
ThreadPoolExecutor中的beforeExecute和afterExecute方法都为空,留给子类实现在任务执行前后的操作。最后如果等待队列没有元素或者等待任务超过了等待的最长时间,退出循环,执行processWorkerExit方法。
可以从上面看出,虽然传入的任务是Runnable类型,但是实际运行并没有将任务作为一个独立的线程运行(没有将任务封装成Thread,调用start运行),而是worker对象的线程不断从等待队列中获取任务,运行任务的run方法,线程池就是通过这样的方式复用线程:启用一个线程,调用任务的run方法,而不是将任务当成线程启动。
接下来我们来看从等待队列获取任务的方法getTask
getTask
1 | /** |
在getTask中线程的获取任务的最大等待时间实际上转变为阻塞队列调用poll上设置的最长等待时间。
回到runWorker的最后,我们来看如何销毁线程
processWorkerExit
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
如果线程是正常结束,只要从worker集合中移除就好了,线程池维持的工作线程数量如果是线程正常结束,已经getTask减去1了,如果线程是中断意外结束的,需要在processWorkerExit中减去1(if (completedAbruptly){decrementWorkerCount()}
)。
每一个线程结束之后都需要调用tryTerminate判断是否线程池可以结束,尝试将线程池的转态转为TERMINATED,后文的tryTerminate方法中可以看到
最后要确保线程池中线程的数量,对于中断意外退出线程的情况,需要新建一个线程
如果没有设置过allowCoreThreadTimeOut,则要保证线程池中的线程数量不小于corePoolSize,否则需要新建一个线程
1 | // 常数将线程池的转态转为TERMINATED |
其余方法
剩下的线程池的方法不一一细讲,介绍一下方法的作用
- shutdown:将线程池的状态转为SHUTDOWN,不接受新任务,中断空闲线程,但是会将正在执行的任务和等待队列中的任务执行完
shutdownNow:将线程池的状态转为SHUTDOWN,不接受新任务,并将线程池运行的线程全部中断,等待队列清空
getTaskCount:线程池已经执行和未执行的任务总数
getCompletedTaskCount:线程池已经执行完的线程数
getLargestPoolSize:线程池中同时存在最大线程数
getPoolSize:线程池当前的线程数量
getActiveCount:当前线程池中正在执行任务的线程数量
总结
线程池中任务提交处理和线程的处理方式:
当一个新任务通过execute提交
- 首先判断线程池线程数量,如果小于corePoolSize,则创建一个新线程处理,该任务作为其第一个任务
- 如果大于corePoolSize并且等待队列中未满,放入等待队列中
- 如果大于corePoolSize并且等待队列中已经满了,则需要和maximumPoolSize比较
- 如果大于maximumPoolSize,则交给饱和策略handle处理
- 如果小于maximumPoolSize,则创建一个新线程处理,该任务作为其第一个任务
当创建一个新的Worker对象线程时:
- 在新建一个Worker对象之前首先需要将workCount通过CAS加1,接着新建一个Worker对象,将其放入workers中
- Worker初始化将自身对象传入Thread中,创建一个新线程,调用start,启动线程
- 线程不断从等待队列中取出任务,调用其run运行任务
- 在从等待队列中取任务会发生阻塞,如果当前线程池数量大于corePoolSize或者设置allowCoreThreadTimeOut为true,当阻塞时间超过keepAliveTime,就销毁线程
- 每一个线程结果判断线程池是否可以结束,如果是则转移线程池的转态到TERMINATED,同时需要判断线程池的线程是否小于corePoolSize或者设置allowCoreThreadTimeOut为true下的1,如果小则addWorker