理解Java线程池源码

理解Java线程池源码

一、线程池的核心设计思想

Java线程池(ThreadPoolExecutor)的核心目标是通过复用线程资源控制并发规模管理任务队列,解决频繁创建/销毁线程的性能开销问题。其设计围绕以下几个关键问题:

  1. 如何复用线程?通过维护一组常驻工作线程(Worker),循环执行任务。
  2. 如何控制并发量?通过核心线程数(corePoolSize)和最大线程数(maximumPoolSize)动态调整线程数量。
  3. 如何管理任务积压?利用阻塞队列(BlockingQueue)缓存任务,结合拒绝策略(RejectedExecutionHandler)处理超限请求。

二、线程池的核心类与参数

1. 构造函数参数

public ThreadPoolExecutor(
int corePoolSize, // 核心线程数(常驻线程)
int maximumPoolSize, // 最大线程数(临时线程上限)
long keepAliveTime, // 非核心线程空闲存活时间
TimeUnit unit, // 时间单位
BlockingQueue workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)

2. 关键内部类

  • Worker:封装线程和任务,实现Runnable接口,是线程池的工作单元。
  • RejectedExecutionHandler:拒绝策略接口,包含AbortPolicy(默认)、CallerRunsPolicy等实现

三、线程池的核心流程

1. 任务提交(execute()方法)

public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 1. 当前工作线程数 < corePoolSize → 创建新核心线程
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) return;
    c = ctl.get();
}

// 2. 任务入队(队列未满时)
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 二次检查线程池状态,防止中途关闭
    if (!isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}

// 3. 队列已满 → 尝试创建非核心线程
else if (!addWorker(command, false))
    // 4. 非核心线程创建失败 → 执行拒绝策略
    reject(command);
}

2. 线程创建(addWorker()方法)

  • 作用:创建Worker对象,并启动其关联的线程。
  • 流程
    1. 检查线程池状态,确保允许创建新线程。
    2. 通过CAS操作增加工作线程计数。
    3. 创建Worker实例,将任务作为其首个任务。
    4. 启动Worker线程,执行runWorker()方法。

3. 任务执行(runWorker()方法)

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        // 循环从队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查线程池状态,处理中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 钩子方法(可扩展)
                task.run();
                afterExecute(task, null); // 钩子方法
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly); // 线程退出处理
    }
}

4. 任务获取(getTask()方法)

  • 作用:从任务队列中获取任务,支持超时控制。
  • 核心逻辑
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        // 检查线程池状态
        if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty()))
            return null;
        
        // 判断是否允许超时回收线程(当前线程数 > corePoolSize)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 尝试从队列获取任务(poll或take)
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        else if (timedOut)
            return null;
    }
}

四、线程池状态管理

线程池通过原子变量ctl(32位整数)同时维护运行状态工作线程数

  • 高3位:线程池状态(RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED)。
  • 低29位:当前工作线程数。
状态说明
RUNNING接受新任务,处理队列中的任务。
SHUTDOWN不接受新任务,继续处理队列中的任务。
STOP不接受新任务,不处理队列中的任务,中断正在执行的任务。
TIDYING所有任务终止,工作线程数为0,准备执行terminated()钩子方法。
TERMINATEDterminated()方法执行完成。

五、拒绝策略的触发条件

当以下条件同时满足时触发拒绝策略:

1.任务队列已满。

2.线程池状态为RUNNING

3.工作线程数已达到maximumPoolSize

六、线程池的优雅关闭

1.shutdownNow():立即关闭,尝试中断所有线程,返回未执行的任务列表。

2.shutdown():平滑关闭,不再接受新任务,等待队列任务执行完毕。

留下回复