理解Java线程池源码
一、线程池的核心设计思想
Java线程池(ThreadPoolExecutor
)的核心目标是通过复用线程资源、控制并发规模和管理任务队列,解决频繁创建/销毁线程的性能开销问题。其设计围绕以下几个关键问题:
- 如何复用线程?通过维护一组常驻工作线程(Worker),循环执行任务。
- 如何控制并发量?通过核心线程数(
corePoolSize
)和最大线程数(maximumPoolSize
)动态调整线程数量。 - 如何管理任务积压?利用阻塞队列(
BlockingQueue
)缓存任务,结合拒绝策略(RejectedExecutionHandler
)处理超限请求。
二、线程池的核心类与参数
1. 构造函数参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(常驻线程)
int maximumPoolSize, // 最大线程数(临时线程上限)
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
2. 关键内部类
- Worker:封装线程和任务,实现
Runnable
接口,是线程池的工作单元。 - RejectedExecutionHandler:拒绝策略接口,包含
AbortPolicy
(默认)、CallerRunsPolicy
等实现
三、线程池的核心流程
1. 任务提交(execute()
方法)
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 1. 当前工作线程数 < corePoolSize → 创建新核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return;
c = ctl.get();
}
// 2. 任务入队(队列未满时)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次检查线程池状态,防止中途关闭
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 队列已满 → 尝试创建非核心线程
else if (!addWorker(command, false))
// 4. 非核心线程创建失败 → 执行拒绝策略
reject(command);
}
2. 线程创建(addWorker()
方法)
- 作用:创建
Worker
对象,并启动其关联的线程。 - 流程:
- 检查线程池状态,确保允许创建新线程。
- 通过CAS操作增加工作线程计数。
- 创建
Worker
实例,将任务作为其首个任务。 - 启动Worker线程,执行
runWorker()
方法。
3. 任务执行(runWorker()
方法)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
// 循环从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查线程池状态,处理中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法(可扩展)
task.run();
afterExecute(task, null); // 钩子方法
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 线程退出处理
}
}
4. 任务获取(getTask()
方法)
- 作用:从任务队列中获取任务,支持超时控制。
- 核心逻辑:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// 检查线程池状态
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty()))
return null;
// 判断是否允许超时回收线程(当前线程数 > corePoolSize)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 尝试从队列获取任务(poll或take)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
else if (timedOut)
return null;
}
}
四、线程池状态管理
线程池通过原子变量ctl
(32位整数)同时维护运行状态和工作线程数:
- 高3位:线程池状态(
RUNNING
,SHUTDOWN
,STOP
,TIDYING
,TERMINATED
)。 - 低29位:当前工作线程数。
状态 | 说明 |
---|---|
RUNNING | 接受新任务,处理队列中的任务。 |
SHUTDOWN | 不接受新任务,继续处理队列中的任务。 |
STOP | 不接受新任务,不处理队列中的任务,中断正在执行的任务。 |
TIDYING | 所有任务终止,工作线程数为0,准备执行terminated() 钩子方法。 |
TERMINATED | terminated() 方法执行完成。 |
五、拒绝策略的触发条件
当以下条件同时满足时触发拒绝策略:
1.任务队列已满。
2.线程池状态为RUNNING
。
3.工作线程数已达到maximumPoolSize
。
六、线程池的优雅关闭
1.shutdownNow()
:立即关闭,尝试中断所有线程,返回未执行的任务列表。
2.shutdown()
:平滑关闭,不再接受新任务,等待队列任务执行完毕。