本文將主要介紹我們平時最常用的線程池 ThreadPoolExecutor
,有可能你平時沒有直接使用這個類,而是使用 Executors
的工廠方法創建線程池,雖然這樣很簡單,但是很可能因為這個線程池發生 OOM ,具體情況文中會詳細介紹;
二、ThreadPoolExecutor 概覽
ThreadPoolExecutor
的繼承關系如圖所示:

其中:
- Executor:定義了
executor(Runnable command)
異步接口,但是沒有強制要求異步; - ExecutorService:提供了生命周期管理的方法,以及有返回值的任務提交;
- AbstractExecutorService:提供了
ExecutorService
的默認實現;
1. 主體結構
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 狀態控制變量,核心
private final BlockingQueue<Runnable> workQueue; // 任務等待隊列
private final HashSet<Worker> workers = new HashSet<Worker>(); // 工作線程集合
private volatile ThreadFactory threadFactory; // 線程構造工廠
private volatile RejectedExecutionHandler handler; // 拒絕策略
private volatile long keepAliveTime; // 空閑線程的存活時間(非核心線程)
private volatile int corePoolSize; // 核心線程大小
private volatile int maximumPoolSize; // 工作線程最大容量
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
...
}
這里已經可以大致看出 ThreadPoolExecutor
的結構了:

2. Worker 結構
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 持有線程,只有在線程工廠運行失敗時為空
Runnable firstTask; // 初始化任務,不為空的時候,任務直接運行,不在添加到隊列
volatile long completedTasks; // 完成任務計數
Worker(Runnable firstTask) {
setState(-1); // AQS 初始化狀態
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this); // 循環取任務執行
}
...
// AQS 鎖方法
}
這里很容易理解的是 thread
和 firstTask
;但是 Worker
還繼承了 AQS
做了一個簡易的互斥鎖,主要是在中斷或者 worker
狀態改變的時候使用;具體 AQS
的詳細說明可以參考,AbstractQueuedSynchronizer 源碼分析 ;
3. ctl 控制變量
ctl 控制變量(簡記 c)是一個 AtomicInteger
類型的變量,由兩部分信息組合而成(兩個值互補影響,又可以通過簡單的大小比較判斷狀態):
- 線程池的運行狀態 (runState,簡記 rs),由 int 高位的前三位表示;
- 線程池內有效線程的數量 (workerCount,簡記 wc),由 int 地位的29位表示;
源碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 用來表示線程數量的位數
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 線程最大容量
// 狀態量
private static final int RUNNING = -1 << COUNT_BITS; // 高位 111,第一位是符號位,1表示負數
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高位 000
private static final int STOP = 1 << COUNT_BITS; // 高位 001
private static final int TIDYING = 2 << COUNT_BITS; // 高位 010
private static final int TERMINATED = 3 << COUNT_BITS; // 高位 011
private static int runStateOf(int c) { return c & ~CAPACITY; } // 運行狀態,取前3位
private static int workerCountOf(int c) { return c & CAPACITY; } // 線程數量,取后29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // 狀態和數量合成
private static boolean runStateLessThan(int c, int s) { return c < s; } // 狀態比較
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
private static boolean isRunning(int c) { return c < SHUTDOWN; } // RUNNING 是負數,必然小於 SHUTDOWN
代碼中可以看到狀態判斷的時候都是直接比較的,這是因為 TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
;他們的狀態變遷關系如下:

其中:
- RUNNING:運行狀態,可接收新任務;
- SHUTDOWN:不可接收新任務,繼續處理已提交的任務;
- STOP:不接收、不處理任務,中斷正在進行的任務
- TIDYING:所有任務清空,線程停止;
- TERMINATED:鈎子方法,執行后的最終狀態;
三、ThreadPoolExecutor 源碼分析
1. 增加工作線程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 這里正常情況下,只要大於SHUTDOWN,則必然不能添加線程;但是這里做了一個優化,
// 如果線程池還在繼續處理任務,則可以添加線程加速處理,
// SHUTDOWN 表示不接收新任務,但是還在繼續處理,
// firstTask 不為空時,是在添加線程的時候,firstTask 不入隊,直接處理
// workQueue 不為空時,則還有任務需要處理
// 所以連起來就是 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || // 容量超出,則返回
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry; // 線程數增加成功,則跳出循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 如果線程狀態改變時,重頭開始重試
continue retry;
}
}
// 此時線程計數,增加成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) { // 線程創建失敗時,直接退出
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 這里同樣檢查上面的優化條件
if (t.isAlive()) // 如果線程已經啟動,則狀態錯誤;
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s; // 記錄工作線程的最大數,統計峰值用
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 啟動線程
workerStarted = true;
}
}
} finally {
if (! workerStarted) addWorkerFailed(w); // 添加失敗清除
}
return workerStarted;
}
2. 提交任務
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 如果小於核心線程,直接添加
if (addWorker(command, true)) return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 任務入隊
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) // 再次檢查,狀態不是RUNNING的時候,拒絕並移除任務
reject(command);
else if (workerCountOf(recheck) == 0) // 這里是防止狀態為SHUTDOWN時,已經添加的任務無法執行
addWorker(null, false);
}
else if (!addWorker(command, false)) // 任務入隊失敗時,直接添加線程,並運行
reject(command);
}
流程圖如下:

所以影響任務提交的因數就有:
- 核心線程的大小;
- 是否為阻塞隊列;
- 線程池的大小;
3. 處理任務
工作線程啟動之后,首先處理 firstTask 任務(特別注意,這個任務是沒有入隊的),然后從 workQueue 中取出任務處理,隊列為空時,超時等待 keepAliveTime ;
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 獲取任務
w.lock();
// 總體條件表示線程池停止的時候,需要中斷線程,
// 如果沒有停止,則清除中斷狀態,確保未中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 回調方法
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 回調方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 退出時清理
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 此處保證 SHUTDOWN 狀態繼續處理任務,STOP 狀態停止處理
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否關閉空閑線程
if ((wc > maximumPoolSize || (timed && timedOut)) // 如果線程大於最大容量,或者允許關閉,且第一次沒取到
&& (wc > 1 || workQueue.isEmpty())) { // 返回空,最后由 processWorkerExit 清理
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 是否超時獲取
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4. 停止線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 檢查停止權限
advanceRunState(SHUTDOWN); // 設置線程池狀態
interruptIdleWorkers(); // 設置所有線程中斷
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 繼續執行等待隊列中的任務,完畢后設置 TERMINATED 狀態
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue(); // 清空所有等待隊列的任務,並返回
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
可以看到 shutdownNow
只比 shutdown
多了,清空等待隊列,但是正在執行的任務還是會繼續執行;
四、拒絕策略
之前提到了,提交任務失敗的時候,會執行拒絕操作,在 JDk 中為我們提供了四種策略:
- AbortPolicy:直接拋出
RejectedExecutionException
異常,這是默認的拒絕策略; - CallerRunsPolicy:由調用線程本身運行任務,以減緩提交速度;
- DiscardPolicy:不處理,直接丟棄掉;
- DiscardOldestPolicy:丟棄最老的任務,並執行當前任務;
五、Executors 工廠方法
另外就是根據線程池參數的不同,Executors
為我們提供了4種典型的用法:
SingleThreadExecutor:單線程的線程池,提交任務順序執行;
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
如代碼所示,就是最大線程、核心線程都是1,和無界隊列組成的線程池,提交任務的時候就會,直接將任務加入隊列順序執行;
FixedThreadPool:固定線程數量線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
同 SingleThreadExecutor
一樣,只是線程數量由用戶決定;
CachedThreadPool:動態調節線程池;
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
這里核心線程為0,隊列是 SynchronousQueue
容量為1的阻塞隊列,而線程數最大,存活60s,所以有任務的時候直接創建新的線程,超時空閑60s;
ScheduledThreadPool:定時任務線程池,功能同 Timer
類似,具體細節后續還會講到;
總結
- 決定線程池運行邏輯的主要有三個變量,核心線程大小,隊列容量,線程池容量
- 最后發現其實 Executors 提供的幾種實現,都很典型;但是卻容易發生 OOM ,所以最好還是自己手動創建比較好;