前言
前兩天和粉絲聊天的時候,粉絲問了我一個挺有意思的問題,說他之前在面試的時候被問到線程池的線程復用原理,當時我跟他簡單的說了一下,沒想到過了幾天又來問我這個問題了,說他最近又被問到了這個問題.......想了想,干脆寫篇文章把這個東西講清楚吧,滿滿的干貨都放在下面了
1.什么是線程復用?
在線程池中,通過同一個線程去執行不同的任務,這就是線程復用。
假設現在有 100 個任務,我們創建一個固定線程的線程池(FixedThreadPool),核心線程數和最大線程數都是 3,那么當這個 100 個任務執行完,都只會使用三個線程。
示例:
public class FixedThreadPoolDemo {
static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "-> 執行");
});
}
// 關閉線程池
executorService.shutdown();
}
}
執行結果:
pool-1-thread-1-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 pool-1-thread-3-> 執行 pool-1-thread-2-> 執行 pool-1-thread-3-> 執行 pool-1-thread-1-> 執行 ...
2.線程復用的原理
線程池將線程和任務進行解耦,線程是線程,任務是任務,擺脫了之前通過 Thread 創建線程時的一個線程必須對應一個任務的限制。
在線程池中,同一個線程可以從阻塞隊列中不斷獲取新任務來執行,其核心原理在於線程池對 Thread 進行了封裝,並不是每次執行任務都會調用 Thread.start() 來創建新線程,而是讓每個線程去執行一個“循環任務”,在這個“循環任務”中不停的檢查是否有任務需要被執行,如果有則直接執行,也就是調用任務中的 run 方法,將 run 方法當成一個普通的方法執行,通過這種方式將只使用固定的線程就將所有任務的 run 方法串聯起來。
3.線程池執行流程
這部分內容在 Java 線程池的各個參數的含義 討論過,這里我們再復習一次,再從中去了解線程復用。
3.1 流程圖
3.2 線程創建的流程
當任務提交之后,線程池首先會檢查當前線程數,如果當前的線程數小於核心線程數(corePoolSize),比如最開始創建的時候線程數為 0,則新建線程並執行任務。
當提交的任務不斷增加,創建的線程數等於核心線程數(corePoolSize),新增的任務會被添加到 workQueue 任務隊列中,等待核心線程執行完當前任務后,重新從 workQueue 中獲取任務執行。
假設任務非常多,達到了 workQueue 的最大容量,但是當前線程數小於最大線程數(maximumPoolSize),線程池會在核心線程數(corePoolSize)的基礎上繼續創建線程來執行任務。
假設任務繼續增加,線程池的線程數達到最大線程數(maximumPoolSize),如果任務繼續增加,這個時候線程池就會采用拒絕策略來拒絕這些任務。
在任務不斷增加的過程中,線程池會逐一進行以下 4 個方面的判斷
核心線程數(corePoolSize)
任務隊列(workQueue)
最大線程數(maximumPoolSize)
拒絕策略
3.3 ThreadPoolExecutor#execute 源碼分析
java.util.concurrent.ThreadPoolExecutor#execute
public void execute(Runnable command) {
// 如果傳入的Runnable的空,就拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 線程池中的線程比核心線程數少
if (workerCountOf(c) < corePoolSize) {
// 新建一個核心線程執行任務
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心線程已滿,但是任務隊列未滿,添加到隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
if (! isRunning(recheck) && remove(command))
// 如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經被銷毀完,新建一個非核心線程
addWorker(null, false);
}
// 核心線程池已滿,隊列已滿,嘗試創建一個非核心新的線程
else if (!addWorker(command, false))
// 如果創建新線程失敗,說明線程池關閉或者線程池滿了,拒絕任務
reject(command);
}
3.4 逐行分析
//如果傳入的Runnable的空,就拋出異常 if (command == null) throw new NullPointerException();
execute 方法中通過 if 語句判斷 command ,也就是 Runnable 任務是否等於 null,如果為 null 就拋出異常。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
判斷當前線程數是否小於核心線程數,如果小於核心線程數就調用 addWorker() 方法增加一個 Worker,這里的 Worker 就可以理解為一個線程。
addWorker 方法的主要作用是在線程池中創建一個線程並執行傳入的任務,如果返回 true 代表添加成功,如果返回 false 代表添加失敗。
第一個參數表示傳入的任務
第二個參數是個布爾值,如果布爾值傳入 true 代表增加線程時判斷當前線程是否少於 corePoolSize,小於則增加新線程(核心線程),大於等於則不增加;同理,如果傳入 false 代表增加線程時判斷當前線程是否少於 maximumPoolSize,小於則增加新線程(非核心線程),大於等於則不增加,所以這里的布爾值的含義是以核心線程數為界限還是以最大線程數為界限進行是否新增非核心線程的判斷
這一段判斷相關源碼如下
private boolean addWorker(Runnable firstTask, boolean core) {
...
int wc = workerCountOf(c);//當前工作線程數
//判斷當前工作線程數>=最大線程數 或者 >=核心線程數(當core = true)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
...
最核心的就是 core ? corePoolSize : maximumPoolSize 這個三目運算。
// 核心線程已滿,但是任務隊列未滿,添加到隊列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任務成功添加到隊列以后,再次檢查是否需要添加新的線程,因為已存在的線程可能被銷毀了
if (! isRunning(recheck) && remove(command))
// 如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經被銷毀完,新建一個非核心線程
addWorker(null, false);
}
如果代碼執行到這里,說明當前線程數大於或等於核心線程數或者 addWorker 失敗了,那么就需要通過
if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態是否為 Running,如果線程池狀態是 Running 就通過 workQueue.offer(command) 將任務放入任務隊列中,
任務成功添加到隊列以后,再次檢查線程池狀態,如果線程池不處於 Running 狀態,說明線程池被關閉,那么就移除剛剛添加到任務隊列中的任務,並執行拒絕策略,代碼如下:
if (! isRunning(recheck) && remove(command))
// 如果線程池處於非運行狀態,並且把當前的任務從任務隊列中移除成功,則拒絕該任務
reject(command);
下面我們再來看后一個 else 分支:
else if (workerCountOf(recheck) == 0)
// 如果之前的線程已經被銷毀完,新建一個非核心線程
addWorker(null, false);
進入這個 else 說明前面判斷到線程池狀態為 Running,那么當任務被添加進來之后就需要防止沒有可執行線程的情況發生(比如之前的線程被回收了或意外終止了),所以此時如果檢查當前線程數為 0,也就是 workerCountOf(recheck) == 0,那就執行 addWorker() 方法新建一個非核心線程。
我們再來看最后一部分代碼:
// 核心線程池已滿,隊列已滿,嘗試創建一個非核心新的線程
else if (!addWorker(command, false))
// 如果創建新線程失敗,說明線程池關閉或者線程池滿了,拒絕任務
reject(command);
執行到這里,說明線程池不是 Running 狀態,又或者線程數 >= 核心線程數並且任務隊列已經滿了,根據規則,此時需要添加新線程,直到線程數達到“最大線程數”,所以此時就會再次調用 addWorker 方法並將第二個參數傳入 false,傳入 false 代表增加非核心線程。
addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任務添加失敗,說明當前線程數已經達到 maximumPoolSize,然后執行拒絕策略 reject 方法。
如果執行到這里線程池的狀態不是 Running,那么 addWorker 會失敗並返回 false,所以也會執行拒絕策略 reject 方法。
4.線程復用源碼分析
java.util.concurrent.ThreadPoolExecutor#runWorker
省略掉部分和復用無關的代碼之后,代碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 釋放鎖 設置work的state=0 允許中斷
boolean completedAbruptly = true;
try {
//一直執行 如果task不為空 或者 從隊列中獲取的task不為空
while (task != null || (task = getTask()) != null) {
task.run();//執行task中的run方法
}
}
completedAbruptly = false;
} finally {
//1.將 worker 從數組 workers 里刪除掉
//2.根據布爾值 allowCoreThreadTimeOut 來決定是否補充新的 Worker 進數組 workers
processWorkerExit(w, completedAbruptly);
}
}
可以看到,實現線程復用的邏輯主要在一個不停循環的 while 循環體中。
通過獲取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中獲取待執行的任務
直接通過 task.run() 來執行具體的任務(而不是新建線程)
在這里,我們找到了線程復用最終的實現,通過取 Worker 的 firstTask 或者 getTask 方法從 workQueue 中取出了新任務,並直接調用 Runnable 的 run 方法來執行任務,也就是如之前所說的,每個線程都始終在一個大循環中,反復獲取任務,然后執行任務,從而實現了線程的復用。
總結
這篇關於線程池的線程復用原理的文章就到這里了,大家看完有什么不懂的歡迎在下方留言評論,也可以私信問我,我看到了一般都會回復的!
