ThreadPool线程池
使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
1. 线程池简介
线程池(Thread Pool)是一种基于池化思想管理线程的工具,Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor的继承关系如下: 
1.1 常用参数
corePoolSize线程池的核心线程数maximumPoolSize能容纳的最大线程数keepAliveTime空闲线程存活时间unit存活的时间单位workQueue存放提交但未执行任务的队列threadFactory创建线程的工厂类handler等待队列满后的拒绝策略
1.2 阻塞队列
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:
2. 生命周期管理
线程池内部使用一个变量ctl维护两个值:运行状态(runState)和线程数量 (workerCount):
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));ctl的高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。这样设计因为代码中有要同时判断线程池运行状态和线程数量的情况,而且用一个变量去存储两个值减少了锁资源的消耗。ctl的维护通过位运算的方式比普通方式性能更高。ThreadPoolExecutor的运行状态有5种,分别为:
其生命周期转换如下入所示: 
3. 任务执行机制
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:
3.1 任务申请
务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。其执行流程如下图所示:
3.2 任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}线程池中,有三个重要的参数,决定影响了拒绝策略:
corePoolSize-核心线程数,也即最小的线程数。workQueue-阻塞队列。maximumPoolSize-最大线程数。
当提交任务数大于corePoolSize的时候,会优先将任务放到workQueue阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize最大线程数配置。此时再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size()+maximumPoolSize),就会触发线程池的拒绝策略,拒绝策略如下:
- CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大。
- AbortPolicy: 丢弃任务,并抛出拒绝执行RejectedExecutionException异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
- DiscardPolicy: 直接丢弃,其他啥都没有。
- DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列workQueue中最老的一个任务,并将新任务加入。
4. Worker线程管理
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们来看一下它的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}其中Worker持有thread成员,thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。Worker执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程。
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。
在线程回收过程中就使用到了这种特性,回收过程如下图所示:
4.1 Worker线程增加
增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示: 
4.2 Worker线程回收
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}线程回收的工作是在processWorkerExit方法完成的。
事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
4.3 Worker线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
- while循环不断地通过getTask()方法获取任务。
- getTask()方法从阻塞队列中取任务。
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 执行任务。
- 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

执行流程如下图所示:
5. 线程池的创建
参考阿里开发手册的话,需要摒弃通过Executors创建线程池的方式,而是使用ThreadPoolExecutor的构造函数。 
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数目
3, // 最大线程数量
10L, // 存活时间 15秒
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), // 阻塞等待队列
Executors.defaultThreadFactory(), // 创建线程工厂
new ThreadPoolExecutor.DiscardOldestPolicy()); // 拒绝策略
System.out.println("请求前-----当前线程池中线程总数目: "+ executor.getPoolSize());
for (int i = 0; i < 6; i++) {
int finalI = i + 1;
executor.execute(() -> {
System.out.println("客户" + finalI + "来办理业务, " + Thread.currentThread().getName() + "处理请求");
});
}
System.out.println("请求中------当前线程池中线程总数目: "+ executor.getPoolSize());
TimeUnit.SECONDS.sleep(10);
System.out.println("当前线程池中线程总数目: "+ executor.getPoolSize());
executor.shutdown();
System.out.println("结束-----当前线程池中线程总数目: "+ executor.getPoolSize());
}运行结果:
可以看出线程是在execute()方法执行才创建线程的,shutdown()方法会把所有线程在线程池中清理掉。
4. 线程池底层工作原理

- 在创建了线程池后,线程池中的线程数为零。
- 当调用
execute()方法添加一个请求任务时,线程池会做出如下判断:- 如果正在运行的线程数量<corePoolSize,那么马上创建线程运行这个任务(比如银行去办理业务,窗口还有3个可以直接办);
- 如果正在运行的线程数量≥corePoolSize,那么将这个任务放入阻塞队列(窗口满了,在银行的板凳坐着等);
- 如果这个时候队列满了且正在运行的线程数量<maximumPoolSize, 那么会创建非核心线程立刻运行这个任务(银行窗口和等候的板凳都满了,额外的工作人员拉你去机器上处理)
- 如果队列满了且正在运行的线程数量≥maximumPoolSize,那么线程池会启动饱和拒绝策略来执行(银行处于饱和状态,银行工作人员建议你去其他分行办)。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
- 如果当前运行的线程数>corePoolSize,那么这个线程就被停掉。
- 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
5. 注意事项
- 项目中创建多线程时,实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池。
- 创建线程池推荐适用ThreadPoolExecutor及其7个参数手动创建:
corePoolSize线程池的核心线程数maximumPoolSize能容纳的最大线程数keepAliveTime空闲线程存活时间unit存活的时间单位workQueue存放提交但未执行任务的队列threadFactory创建线程的工厂类handler等待队列满后的拒绝策略
6. 线程池优化
如果阻塞队列满了,会进行拒绝策略执行,如果不想请求不被拒绝,常见的解决办法有:
6.1 调整阻塞队列参数
把阻塞队列设置很大,甚至使用无界队列(不推荐,毕竟服务器资源是有限的,有造成OOM的风险)
6.2 调整拒绝策略
设置拒绝策略为CallerRunsPolicy,使得任务交给调用方去执行,从而避免任务被丢弃。但后果就是导致业务线程被拖慢,没有办法去及时处理请求。所以这种拒绝策略只适用于低并发又不想丢弃任务的场景。
6.3 任务持久化
- 方法1:
通过自定义拒绝策略,把需要拒绝的任务信息放到MySQL、Redis或者Kafka中,然后重新线程池的afterExecute()方法,当线程池执行完一个任务的时候,判断阻塞队列有任务不多以及空闲的线程比较多,就可以读取MySQL、Redis或者Kafka中的任务信息,然后放入线程池中的阻塞队列中再进行处理。 - 方法2:
通过自定义阻塞队列,当线程从阻塞队列中取任务的时候,判断阻塞队列中任务量的多少,如果阻塞队列中任务比较少,那就从MYSQL/REDIS/MQ中去读数据,然后放入线程池中的阻塞队列中再进行处理。
7. 线程池动态调整
7.1 使用get/set方法
线程池ThreadPool提供了get/set方法,方便我们进行监控并动态调整(就是不重启机器)线程池参数:
7.2 使用配置中心
把线程池的参数配置到配置中心,比如Appllo\Nacos, 然后让线程池去监听配置的变化,就能够通过set方法动态的调整线程池参数。
如果需要动态调整阻塞队列的大小,队列大小被final修饰,就需要重写一个阻塞队列。
8. 线程池监控
8.1 监控的具体指标
- 线程维度:核心线程数、最大线程数、活跃线程数、历史最大线程数等
- 任务维度:总任务数、排队任务数、已完成任务数、拒绝任务数等
- 性能维度:任务平均耗时、线程空闲(保活)时间、线程池负载率等
ThreadPoolExecutor类中提供了获取这些指标数据的API:
- poo1.getcorePoosize()- 核心线程数
- poo1.getMaximumPoolsize()-最大线程数
- poo1.getActivecount()- 活跃线程数(正在执行的线程数)
- poo1.getQueue().size()-队列中等待的任务数
- pool.getcompletedTaskcount()-已完成任务数
- pool.getTaskCount()-总任务数
- pool.getLargestPoolSize()-运行的最大任务数
8.2 实时采集方式
- 自定义带监视功能的线程池类(继承ThreadPoolExecutor)
- 监控位埋点(任务执行前、任务执行后、线程池终止后)
注意
实时采集是通过在线程池预先"埋点",对每个运行的任务实时采集统计,对性能有影响
public class MonitorThreadPoolExecutor extends ThreadPoolExecutor {
public MonitorThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
@Override
protected void terminated() {
super.terminated();
}
}8.3 定时采集
SpringBoot项目为演示案例:
2. CommandLineRunner, ApplicationRunner两个接口在SpringBoot应用启动后自动执行,通过实现他们的run()方法可以完成初始化逻辑、提交后台任务等。
3. CommandLineRunner -run- 循环不断的往线程池中提交任务(间隔1s/次) 4. ApplicationRunner -run- 定时采集线程池的指标数据(首次间隔5s、间隔10s/次)
8.4 总结
没有监控的线程池相当于没有消防报警的大厦;
埋点实时监控,迅速反映但对性能有影响; 定时监控,数据有延迟但降低了对性能的影响。
完善的监控体系:高峰期扩容、低谷期节能地健康运行(削峰填谷)。
