經典關閉線程池代碼
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.shutdown();
while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("線程池中還有任務在處理");
}
shutdown 做了什么?
先上源碼
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 線程安全性檢查
checkShutdownAccess();
// 更新線程池狀態為 SHUTDOWN
advanceRunState(SHUTDOWN);
// 嘗試關閉空閑線程
interruptIdleWorkers();
// 空實現
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試中止線程池
tryTerminate();
}
每個方法都有特定的目的,其中 checkShutdownAccess()
和 advanceRunState(SHUTDOWN)
比較簡單,所以這里不再描述了,而 interruptIdleWorkers()
和 tryTerminate()
。
interruptIdleWorkers 做了什么?
關閉當前空閑線程。
onlyOne = true:至多關閉一個空閑worker,可能關閉0個。
onlyOne = false:遍歷所有的worker,只要是空閑的worker就關閉。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 嘗試獲得鎖,如果獲得鎖則表明該worker是空閑狀態
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
w.tryLock()
:該方法並不會阻塞,嘗試一次如果不成功就返回false,成功則放回true。
在方法 runWorker
中一旦work獲得了任務,就會調用調用了 w.lock()
,從而倘若 worker 是無鎖狀態,就是空閑狀態。
因此 Worker 之所以繼承 AbstractQueuedSynchronizer
實際上是為了達到用無鎖,有鎖標識worker空閑態與忙碌態,從而方便控制worker的銷毀操作。
tryTerminate 做了什么?
這個只是嘗試將線程池的狀態置為 TERMINATE
態,如果還有worker在執行,則嘗試關閉一個worker。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || // 是運行態
runStateAtLeast(c, TIDYING) || // 是 TIDYING 或者 TERMINATE 態
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // SHUTDOWN 態且隊列中有任務
return;
if (workerCountOf(c) != 0) {
// 還存在 worker,則嘗試關閉一個worker
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通過CAS,先置為 TIDYING 態
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// TIDYING 態
try {
// 空方法
terminated();
} finally {
// 最終更新為 TERMINATED 態
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
通過如下的方法簡單做一些過濾。因為狀態是從 TIDYING 態往后才到 TERMINATE 態。從這個過濾條件,可以看出如果是STOP態也是會通過的。不過如果線程池到了STOP應該就不會再使用了吧,所以也是不會有什么影響。
if (isRunning(c) || // 是運行態
runStateAtLeast(c, TIDYING) || // 是 TIDYING 或者 TERMINATE 態
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // SHUTDOWN 態且隊列中有任務
return;
如果該線程池中有worker,則中止1個worker。通過上面的過濾條件,到了這一步,那么肯定是 SHUTDOWN 態(忽略 STOP 態),並且任務隊列已經被處理完成。
if (workerCountOf(c) != 0) {
// 還存在 worker,則嘗試關閉一個worker
interruptIdleWorkers(ONLY_ONE);
return;
}
如果該線程池中有多個worker,終止1個worker之后 tryTerminate() 方法就返回了,那么剩下的worker在哪里被處理的呢?(看后面的解答)
假設線程池中的worker都已經關閉並且隊列中也沒有任務,那么后面的代碼將會將線程池狀態置為 TERMINATE 態。terminate()
是空實現,用於有需要的自己實現處理,線程池關閉之后的邏輯。
awaitTermination 做了什么
這個方法只是判斷當前線程池是否為 TERMINATED 態,如果不是則睡眠指定的時間,如果睡眠中途線程池變為終止態則會被喚醒。這個方法並不會處理線程池的狀態變更的操作,純粹是做狀態的判斷,所以得要在循環里邊做判斷。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED)) // 是否為 TERMINATED 態
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos); // native 方法,睡一覺
}
} finally {
mainLock.unlock();
}
}
問題
從 shutdown() 方法源碼來看有很大概率沒有完全線程池,而awaitTermination() 方法則只是判斷線程池狀態,並沒有關閉線程池狀態,那么剩下的worker什么時候促發關閉呢?關鍵代碼邏輯在一個worker被關閉之后,觸發了哪些事情。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 省略其他不需要的代碼
try{
task.run();
} finally {
task = null;
w.unlock();
}
}
// 用於標識是否是task.run報錯
completedAbruptly = false;
} finally {
// 線程關閉的時候調用
processWorkerExit(w, completedAbruptly);
}
}
completedAbruptly
:用於標識是否是task.run報錯,如果值為 TRUE,則是task.run 異常;反之,則沒有發生異常
從上面的核心代碼來看,當一個worker被關閉之后會調用 processWorkerExit() 方法。看看它做了什么。
processWorkerExit 做了什么
對一個worker退出之后做善后工作,比如統計完成任務數,將線程池的關閉態傳播下去,根據條件補充 worker。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 如果有worker則嘗試關閉一個,否則置為TERMINATE態
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
線程池關閉的關鍵就在於一個worker退出之后,會調用 tryTerminate()
方法,將退出的信號傳遞下去,這樣其他的線程才能夠被依次處理,最后線程池會變為 TERMINATE 態。