线程池
线程回顾
创建线程的方式
- 继承 Thread 类
- 实现 Runnable 接口
创建后的线程有如下状态:
NEW:新建的线程,无任何操作
1 | public static void main(String[] args) { |
RUNNABLE:可执行的线程,在 JVM 中执行但是在等待操作系统的资源
1 | public static void main(String[] args) { |
BOLCKED:阻塞,获取不到锁
1 | public static void main(String[] args) { |
WAITING:等待,等待其他线程进行操作,时间不确定
1 | public static void main(String[] args) { |
TIMED_WAITING:等待,等待的时间是确定的
1 | public static void main(String[] args) { |
TERMINATED:终止,线程已经运行完毕
1 | public static void main(String[] args) { |
引入线程池
上述创建线程的方式存在如下缺陷:
- 线程使用完后会被销毁,高并发场景下频繁创建和销毁线程的性能开销不可忽略
- 无法控制线程并发数量,线程过多会导致 JVM 宕机
线程池是一种池化思想,由于创建和销毁线程需要时间,以及系统资源开销,我们需要一个“管理者”来统一管理线程及任务分配,减少这些开销,解决资源不足的问题。
在主要大厂的编程规范中,不允许在应用中自行显式地创建线程,线程必须通过线程池提供。
线程池解决了什么问题
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗:
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行:
- 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
- 节省CPU切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
- 提供更强大的功能,延时定时线程池
线程池引发了什么问题
- 异步任务提交后,如果JVM宕机,已提交的任务会丢失,需要考虑确认机制
- 使用不合理可能导致内存溢出问题
- 参数过多,代码结构引入数据结构与算法,增加使用难度
线程池概述
线程池继承结构

- 最常用的是 ThreadPoolExecutor
- 调度用 ScheduledThreadPoolExecutor,类似 Timer 和 TimerTask。
- 任务拆分合并用 ForkJoinPool
- Executors是工具类,协助创建线程池的
线程池工作状态

- RUNNING(运行状态):这是线程池的初始状态。
- 在此状态下,线程池接受新任务,并且也会处理等待队列中的任务。
- 线程池的线程会一直运行,直到被转换到其他状态。
- SHUTDOWN(关闭状态):当调用线程池的shutdown()方法时,线程池会进入此状态。
- 在此状态下,线程池不接受新任务,但会继续处理等待队列中的任务。
- 等待队列中的任务处理完毕后,线程池中的线程会逐渐结束,直到所有线程结束运行。
- STOP(停止状态):当调用线程池的shutdownNow()方法时,线程池会进入此状态。
- 在此状态下,线程池不接受新任务,同时也不处理等待队列中的任务
- 而是尝试停止所有正在执行的任务,并且回收线程池中的所有线程。
- TIDYING(整理状态):当所有的任务已经终止,workerCount(工作线程数)为0时,线程池会进入此状态。
- 此时,会执行terminated()钩子方法,允许线程池执行一些收尾工作。
- TERMINATED(终止状态):terminated()钩子方法执行完毕后,线程池会进入此状态。
- 在终止状态下,线程池的任务完全结束,不再有任何活动。
七个核心参数
| 参数名 | 描述 |
|---|---|
corePoolSize |
核心线程池基本大小,核心线程数 |
maximumPoolSize |
线程池最大线程数 |
keepAliveTime |
线程空闲后的存活时间 |
TimeUnit unit |
线程空闲后的存活时间单位 |
BlockingQueue workQueue |
存放任务的阻塞队列 |
ThreadFactory threadFactory |
创建线程的工厂 |
RejectedExecutionHandler handler |
当阻塞队列和最大线程池都满了之后的饱和策略 |
corePoolSize
核心线程数
- 线程池刚创建时,线程数量为0,当每次执行 execute 添加新的任务时会在线程池创建一个新的线
程,直到线程数量达到 corePoolSize 为止。 - 核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程
池也会优先创建新线程处理 - 设置 allowCoreThreadTimeout=true (默认false)时,核心线程超时会关闭
maximumPoolSize
最大线程数
- 当池中的线程数 >= corePoolSize ,且任务队列已满时。线程池会创建新线程来处理任务
- 当池中的线程数 = maximumPoolSize ,且任务队列已满时,线程池会拒绝处理任务而抛出异常
如果使用无界的阻塞队列,该参数不生效
BlockingQueue
阻塞队列
- 当线程池正在运行的线程数量已经达到 corePoolSize ,那么再通过 execute 添加新的任务则会被加到 workQueue 队列中
- 任务会在队列中排队等待执行,而不会立即执行
- 一般来说,这里的阻塞队列有以下几种选择:ArrayBlockingQueue , LinkedBlockingQueue , SynchronousQueue
keepAliveTime & TimeUnit
保活时间及其单位
- 当线程空闲时间达到 keepAliveTime 时,线程会退出,直到线程数量 =corePoolSize
- 如果 allowCoreThreadTimeout=true ,则会直到线程数量=0
keepAliveTime 是时间的大小,TimeUnit 是时间单位
ThreadFactory
线程工厂
- 主要用来创建线程
- 通过newThread()方法提供创建线程,该方法创建的线程都是“非守护线程”而且“线程优先级都是默认优先级”
RejectedExecutionHandler
拒绝策略
- 当线程数已经达到 maxPoolSize ,且队列已满,会拒绝新任务
- 当线程池被调用 shutdown() 后,会等待线程池里的任务执行完毕,再 shutdown 。如果在调用shutdown() 和线程池真正 shutdown 之间提交任务,会拒绝新任务
- 当拒绝处理任务时线程池会调用 rejectedExecutionHandler 来处理这个任务。
如果没有设置默认是 AbortPolicy ,另外在 ThreadPoolExecutor 类有几个内部实现类来处理这类情况:
- ThreadPoolExecutor.AbortPolicy :丢弃任务并抛出 RejectedExecutionException 异常。
- ThreadPoolExecutor.CallerRunsPolicy :由调用线程处理该任务
- ThreadPoolExecutor.DiscardPolicy :也是丢弃任务,但是不抛出异常
- ThreadPoolExecutor.DiscardOldestPolicy :丢弃队列最前面的任务,然后重新尝试执行任务
另外实现 RejectedExecutionHandler 接口即可实现自定义的拒绝策略
线程池逻辑结构
线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的华线程。
一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。

当一个任务被提交,线程池会进行如下工作:
- 首先判断当前的核心线程数量如果小于核心线程数,创建一个核心线程并执行任务
- 如果大于核心线程数,则尝试将其放入等待队列,如果队列没有满则放入队列等待执行
- 如果队列已满,则判断非核心线程数的数量+核心线程数是否小于最大线程数量
- 小于:则创建一个非核心线程并执行任务(并不会取队列中的任务)
- 大于:执行拒绝策略
线程池线程数设置
虽然使用线程池的好处很多,但是如果其线程数配置得不合理,不仅可能达不到预期效果,反而可能降低应用的性能。
因此按照任务类型分类,对不同的任务类型确定不同的线程数量。
任务类型分类
- IO密集型任务:
- 此类任务主要是执行 IO 操作。由于执行 lO 操作的时间较长,导致 CPU 的利用率不高,这类任务 CPU 常处于空闲状态。
- Netty 的 IO 读写操作为此类任务的典型例子
- CPU 密集型任务:
- 此类任务主要是执行计算任务。由于响应时间很快,CPU 一直在运行,这种任务 CPU 的利用率很高。
- 例如设计加密解密算法等大量需要 CPU 运算的场景
- 混合型任务:
- 此类任务既要执行逻辑计算,又要进行 IO 操作(如 RPC 调用、数据库访问)相对来说,由于执行 IO 操作的耗时较长(一次网络往返往往在数百毫秒级别),这类任务的 CPU 利用率也不是太高。
- Web 服务器的 HTTP 请求处理操作为此类任务的典型例子
确定任务线程数
IO 密集型任务:
- 由于 IO 密集型任务的CPU使用率较低,导致线程空余时间很多,因此通常需要开 CPU 核心数两倍的线程
- Netty 的 IO 处理任务就是典型的 IO 密集型任务,所以,Netty 的 Reactor 实现类(定制版的线程池)的 IO 处理线程数默认正好为 CPU 核数的两倍
CPU密集型任务:
- CPU 密集型的任务并行的任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,一般开等于 CPU 的核心数的线程数量
- 比如 4 个核心的 CPU,通过 4 个线程并行地执行 4 个 CPU 密集型任务,此时的效率是最高的。但是如果线程数远远超出 CPU 核心数量,就需要频繁地切换线程,线程上下文切换时需要消耗时间,反而会使得任务效率下降。
混合型任务:
业界有一个比较成熟的估算公式,具体如下:
最佳线程数 = ((线程等待时间 + 线程 CPU 时间) / 线程 CPU 时间) * CPU核数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
- 通过公式可以看出:等待时间所占的比例越高,需要的线程就越多,CPU 耗时所占的比例越高,需要的线程就越少
- 比如在 Web 服务器处理 HTTP 请求时,假设平均线程 CPU 运行时间为 100 毫秒,而线程等待时间(比如包括 DB 操作、RPC 操作作、缓存操作等)为 900 毫秒,如果 CPU 核数为 8 那么根据上面这个公式,估算如下:`(900 毫秒+100 毫秒) / 100 毫秒 * 8 = 10 * 8 = 80`,最好开 80 个线程
------
## 关键源码剖析
------
### 属性 & 构造方法
------
对于 ThreadPoolExecutor 有几个关键的属性,这里需要先大致了解:
```java
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制变量-存放状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任务队列,必须是阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// awaitTermination方法使用的等待条件变量
private final Condition termination = mainLock.newCondition();
// 线程工厂,用于创建新的线程实例
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器,对应不同的拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程等待任务的时间周期,单位是纳秒
private volatile long keepAliveTime;
// 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 线程池容量
private volatile int maximumPoolSize;
......
}
对于其构造方法如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
由此可见,当我们手动创建线程池时,线程池并不会直接工作,而是将我们手动设置的关键参数进行初始化。
execute 方法
ThreadPoolExecutor 的最基本使用方式就是通过 execute 方法提交一个 Runnable 任务,首先看图理解 execute 的执行逻辑

查看源码中的 execute 方法,这里我们分层次来看,先看最主要的外部的几个条件判断:
1 | public void execute(Runnable command) { |
其实源码里面的注释解释的已经很明白了,这里的逻辑其实已经说明了一些常见的问题:
- 当提交任务时,只要核心线程数没有满,都会新建核心线程执行任务,不会使用以前创建好的空闲的核心线程去执行
- 当无法向队列提交任务时,如果可以新建非核心线程,则会新建非核心线程并执行任务,不会在创建之后从队列中取任务执行
addWorker 方法
接下来我们灌注一下 addWorker 方法:
1 | // 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功 |
先看看 addWork 的两个参数,第一个是需要提交的线程Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。
其中三处调用 addWork:
- 第一次,条件if (workerCountOf(c) < corePoolSize):这个很好理解,工作线程数少于核心线程数,提交任务。所以addWorker(command, true)
- 第二次,如果 workerCountOf(recheck) == 0: 如果worker的数量为0,那就 addWorker(null,false),由于之前已经把command提交到阻塞队列了workQueue.offer(command)。所以提交一个空线程,直接从阻塞队列里面取就可以了。
- 第三次,如果线程池没有RUNNING或者offer阻塞队列失败,addWorker(command,false):说明阻塞队列满了,将任务提交到非核心线程池。
Worker 对象
线程池中的每一个线程被封装成了内部的一个Worker对象,ThreadPool维护的其实就是一组Worker对象:
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ |
runWorker 方法
Worker 方法中的 run 方法实际调用的是线程池中的 runWorker 方法:
1 | final void runWorker(Worker w) { |
runWorker 方法的本质其实就是通过一个 while 循环不断地通过 getTask() 方法获取任务,在调用方法执行的时候会先获取外部传递的任务,如果没有获取到外部传递的任务则调用 getTask() 方法获取任务队列中的任务并执行
getTask 方法
1 | private Runnable getTask() { |
综上所述,线程池内部将我们提交的任务封装成了一个个 Woker 对象来管理,我们调用线程池的 execute 方法,实际上:
- execute 根据策略调用 -> addWoker 方法
- addWoker 方法调用 Worker 的 run 方法
- Worker 的 run 方法调用 runWorker 方法
- runWorker 方法不断去用 getTask 获取任务执行