Java中使用線程池技術一般都是使用Executors
這個工廠類,它提供了非常簡單方法來創建各種類型的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
核心的接口其實是Executor
,它只有一個execute
方法抽象為對任務(Runnable接口)的執行, ExecutorService
接口在Executor
的基礎上提供了對任務執行的生命周期的管理,主要是submit
和shutdown
方法, AbstractExecutorService
對ExecutorService
一些方法做了默認的實現,主要是submit和invoke方法,而真正的任務執行 的Executor接口execute
方法是由子類實現,就是ThreadPoolExecutor
,它實現了基於線程池的任務執行框架,所以要了解 JDK的線程池,那么就得先看這個類。
再看execute
方法之前需要先介幾個變量或類。
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
這個變量是整個類的核心,AtomicInteger
保證了對這個變量的操作是原子的,通過巧妙的操作,ThreadPoolExecutor用這一個變量保存了兩個內容:
- 所有有效線程的數量
- 各個線程的狀態(runState)
低29位存線程數,高3位存runState
,這樣runState有5個值:
- RUNNING:-536870912
- SHUTDOWN:0
- STOP:536870912
- TIDYING:1073741824
- TERMINATED:1610612736
線程池中各個狀態間的轉換比較復雜,主要記住下面內容就可以了:
- RUNNING狀態:線程池正常運行,可以接受新的任務並處理隊列中的任務;
- SHUTDOWN狀態:不再接受新的任務,但是會執行隊列中的任務;
- STOP狀態:不再接受新任務,不處理隊列中的任務
圍繞ctl變量有一些操作,了解這些方法是看懂后面一些晦澀代碼的基礎:

/** * 這個方法用於取出runState的值 因為CAPACITY值為:00011111111111111111111111111111 * ~為按位取反操作,則~CAPACITY值為:11100000000000000000000000000000 * 再同參數做&操作,就將低29位置0了,而高3位還是保持原先的值,也就是runState的值 * * @param c * 該參數為存儲runState和workerCount的int值 * @return runState的值 */ private static int runStateOf(int c) { return c & ~CAPACITY; } /** * 這個方法用於取出workerCount的值 * 因為CAPACITY值為:00011111111111111111111111111111,所以&操作將參數的高3位置0了 * 保留參數的低29位,也就是workerCount的值 * * @param c * ctl, 存儲runState和workerCount的int值 * @return workerCount的值 */ private static int workerCountOf(int c) { return c & CAPACITY; } /** * 將runState和workerCount存到同一個int中 * “|”運算的意思是,假設rs的值是101000,wc的值是000111,則他們位或運算的值為101111 * * @param rs * runState移位過后的值,負責填充返回值的高3位 * @param wc * workerCount移位過后的值,負責填充返回值的低29位 * @return 兩者或運算過后的值 */ private static int ctlOf(int rs, int wc) { return rs | wc; } // 只有RUNNING狀態會小於0 private static boolean isRunning(int c) { return c < SHUTDOWN; }
corePoolSize
核心線程池大小,活動線程小於corePoolSize則直接創建,大於等於則先加到workQueue中,隊列滿了才創建新的線程。
keepAliveTime
線程從隊列中獲取任務的超時時間,也就是說如果線程空閑超過這個時間就會終止。
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...
內部類Worker
是對任務的封裝,所有submit的Runnable都被封裝成了Worker,它本身也是一個Runnable, 然后利用AQS框架(關於AQS可以看我這篇文章)實現了一個簡單的非重入的互斥鎖, 實現互斥鎖主要目的是為了中斷的時候判斷線程是在空閑還是運行,可以看后面shutdown
和shutdownNow
方法的分析。
// state只有0和1,互斥 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功獲得鎖 } // 線程進入等待隊列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
之所以不用ReentrantLock是為了避免任務執行的代碼中修改線程池的變量,如setCorePoolSize
,因為ReentrantLock是可重入的。
execute
execute
方法主要三個步驟:
- 活動線程小於corePoolSize的時候創建新的線程;
- 活動線程大於corePoolSize時都是先加入到任務隊列當中;
- 任務隊列滿了再去啟動新的線程,如果線程數達到最大值就拒絕任務。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活動線程數 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接啟動新的線程。第二個參數true:addWorker中會重新檢查workerCount是否小於corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活動線程數 >= corePoolSize // runState為RUNNING && 隊列未滿 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING狀態 則從workQueue中移除任務並拒絕 if (!isRunning(recheck) && remove(command)) reject(command);// 采用線程池指定的策略拒絕任務 // 線程池處於RUNNING狀態 || 線程池處於非RUNNING狀態但是任務移除失敗 else if (workerCountOf(recheck) == 0) // 這行代碼是為了SHUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。 // 添加一個null任務是因為SHUTDOWN狀態下,線程池不再接受新任務 addWorker(null, false); // 兩種情況: // 1.非RUNNING狀態拒絕新的任務 // 2.隊列滿了啟動新的線程失敗(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
注釋比較清楚了就不再解釋了,其中比較難理解的應該是addWorker(null, false);
這一行,這要結合addWorker一起來看。 主要目的是防止HUTDOWN狀態下沒有活動線程了,但是隊列里還有任務沒執行這種特殊情況。
addWorker
這個方法理解起來比較費勁。

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);// 當前線程池狀態 // Check if queue empty only if necessary. // 這條語句等價:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || // workQueue.isEmpty()) // 滿足下列調價則直接返回false,線程創建失敗: // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時不再接受新的任務,且所有任務執行結束 // rs = SHUTDOWN:firtTask != null 此時不再接受任務,但是仍然會執行隊列中的任務 // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null, // false),任務為null && 隊列為空 // 最后一種情況也就是說SHUTDONW狀態下,如果隊列不為空還得接着往下執行,為什么?add一個null任務目的到底是什么? // 看execute方法只有workCount==0的時候firstTask才會為null結合這里的條件就是線程池SHUTDOWN了不再接受新任務 // 但是此時隊列不為空,那么還得創建線程把任務給執行完才行。 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 走到這的情形: // 1.線程池狀態為RUNNING // 2.SHUTDOWN狀態,但隊列中還有任務需要執行 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))// 原子操作遞增workCount break retry;// 操作成功跳出的重試的循環 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)// 如果線程池的狀態發生變化則重試 continue retry; // else CAS failed due to workerCount change; retry inner loop } } // wokerCount遞增成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 並發的訪問線程池workers對象必須加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); // RUNNING狀態 || SHUTDONW狀態下清理隊列中剩余的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將新啟動的線程添加到線程池中 workers.add(w); // 更新largestPoolSize int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動新添加的線程,這個線程首先執行firstTask,然后不停的從隊列中取任務執行 // 當等待keepAlieTime還沒有任務執行則該線程結束。見runWoker和getTask方法的代碼。 if (workerAdded) { t.start();// 最終執行的是ThreadPoolExecutor的runWoker方法 workerStarted = true; } } } finally { // 線程啟動失敗,則從wokers中移除w並遞減wokerCount if (!workerStarted) // 遞減wokerCount會觸發tryTerminate方法 addWorkerFailed(w); } return workerStarted; }
runWorker
任務添加成功后實際執行的是runWorker
這個方法,這個方法非常重要,簡單來說它做的就是:
- 第一次啟動會執行初始化傳進來的任務firstTask;
- 然后會從workQueue中取任務執行,如果隊列為空則等待keepAliveTime這么長時間。

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // Worker的構造函數中抑制了線程中斷setState(-1),所以這里需要unlock從而允許中斷 w.unlock(); // 用於標識是否異常終止,finally中processWorkerExit的方法會有不同邏輯 // 為true的情況:1.執行任務拋出異常;2.被中斷。 boolean completedAbruptly = true; try { // 如果getTask返回null那么getTask中會將workerCount遞減,如果異常了這個遞減操作會在processWorkerExit中處理 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任務執行前可以插入一些處理,子類重載該方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 執行用戶任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 和beforeExecute一樣,留給子類去重載 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 結束線程的一些清理工作 processWorkerExit(w, completedAbruptly); } }
getTask

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 1.rs > SHUTDOWN 所以rs至少等於STOP,這時不再處理隊列中的任務 // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時還需要處理隊列中的任務除非隊列為空 // 這兩種情況都會返回null讓runWoker退出while循環也就是當前線程結束了,所以必須要decrement // wokerCount if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 遞減workerCount值 decrementWorkerCount(); return null; } // 標記從隊列中取任務時是否設置超時時間 boolean timed; // Are workers subject to culling? // 1.RUNING狀態 // 2.SHUTDOWN狀態,但隊列中還有任務需要執行 for (;;) { int wc = workerCountOf(c); // 1.core thread允許被超時,那么超過corePoolSize的的線程必定有超時 // 2.allowCoreThreadTimeOut == false && wc > // corePoolSize時,一般都是這種情況,core thread即使空閑也不會被回收,只要超過的線程才會 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 從addWorker可以看到一般wc不會大於maximumPoolSize,所以更關心后面半句的情形: // 1. timedOut == false 第一次執行循環, 從隊列中取出任務不為null方法返回 或者 // poll出異常了重試 // 2.timeOut == true && timed == // false:看后面的代碼workerQueue.poll超時時timeOut才為true, // 並且timed要為false,這兩個條件相悖不可能同時成立(既然有超時那么timed肯定為true) // 所以超時不會繼續執行而是return null結束線程。(重點:線程是如何超時的???) if (wc <= maximumPoolSize && !(timedOut && timed)) break; // workerCount遞減,結束當前thread if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // 需要重新檢查線程池狀態,因為上述操作過程中線程池可能被SHUTDOWN if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { // 1.以指定的超時時間從隊列中取任務 // 2.core thread沒有超時 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true;// 超時 } catch (InterruptedException retry) { timedOut = false;// 線程被中斷重試 } } }
processWorkerExit
線程退出會執行這個方法做一些清理工作。

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 正常的話再runWorker的getTask方法workerCount已經被減一了 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 累加線程的completedTasks completedTaskCount += w.completedTasks; // 從線程池中移除超時或者出現異常的線程 workers.remove(w); } finally { mainLock.unlock(); } // 嘗試停止線程池 tryTerminate(); int c = ctl.get(); // runState為RUNNING或SHUTDOWN if (runStateLessThan(c, STOP)) { // 線程不是異常結束 if (!completedAbruptly) { // 線程池最小空閑數,允許core thread超時就是0,否則就是corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果min == 0但是隊列不為空要保證有1個線程來執行隊列中的任務 if (min == 0 && !workQueue.isEmpty()) min = 1; // 線程池還不為空那就不用擔心了 if (workerCountOf(c) >= min) return; // replacement not needed } // 1.線程異常退出 // 2.線程池為空,但是隊列中還有任務沒執行,看addWoker方法對這種情況的處理 addWorker(null, false); } }
tryTerminate
processWorkerExit方法中會嘗試調用tryTerminate來終止線程池。這個方法在任何可能導致線程池終止的動作后執行:比如減少wokerCount或SHUTDOWN狀態下從隊列中移除任務。

final void tryTerminate() { for (;;) { int c = ctl.get(); // 以下狀態直接返回: // 1.線程池還處於RUNNING狀態 // 2.SHUTDOWN狀態但是任務隊列非空 // 3.runState >= TIDYING 線程池已經停止了或在停止了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; // 只能是以下情形會繼續下面的邏輯:結束線程池。 // 1.SHUTDOWN狀態,這時不再接受新任務而且任務隊列也空了 // 2.STOP狀態,當調用了shutdownNow方法 // workerCount不為0則還不能停止線程池,而且這時線程都處於空閑等待的狀態 // 需要中斷讓線程“醒”過來,醒過來的線程才能繼續處理shutdown的信號。 if (workerCountOf(c) != 0) { // Eligible to terminate // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。 // ONLY_ONE:這里只需要中斷1個線程去處理shutdown信號就可以了。 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 進入TIDYING狀態 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 子類重載:一些資源清理工作 terminated(); } finally { // TERMINATED狀態 ctl.set(ctlOf(TERMINATED, 0)); // 繼續awaitTermination termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdown和shutdownNow
shutdown這個方法會將runState置為SHUTDOWN
,會終止所有空閑的線程。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 線程池狀態設為SHUTDOWN,如果已經至少是這個狀態那么則直接返回 advanceRunState(SHUTDOWN); // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進入processWorkerExit → // tryTerminate方法中會保證隊列中剩余的任務得到執行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow方法將runState置為STOP。和shutdown方法的區別,這個方法會終止所有的線程。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP狀態:不再接受新任務且不再執行隊列中的任務。 advanceRunState(STOP); // 中斷所有線程 interruptWorkers(); // 返回隊列中還沒有被執行的任務。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
主要區別在於shutdown調用的是interruptIdleWorkers
這個方法,而shutdownNow實際調用的是Worker類的interruptIfStarted
方法:
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能獲取到鎖,說明該線程沒有在運行,因為runWorker中執行任務會先lock, // 因此保證了中斷的肯定是空閑的線程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
void interruptIfStarted() { Thread t; // 初始化時state == -1 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
這就是前面提到的Woker類實現AQS的主要作用。
注意:shutdown方法可能會在finalize被隱式的調用。
這篇博客基本都是代碼跟注釋,所以如果不是分析ThreadPoolExecutor
源碼的話看起來會非常無聊。