作為一名Java開發工程師,想必性能問題是不可避免的。通常,在遇到性能瓶頸時第一時間肯定會想到利用緩存來解決問題,然而緩存雖好用,但也並非萬能,某些場景依然無法覆蓋。比如:需要實時、多次調用第三方API時,該場景緩存則無法適用。
然
多線程並發的方式則很好的解決了上述問題。
但若每次都在任務開始時創建新的線程,在任務結束后銷毀線程,這樣增加了資源的消耗,也降到了響應速度。針對此,線程池通過維護多個線程的方式來降低創建線程的次數,並且對線程的創建、銷毀以及數量加以很好的控制,保證對內核的充分利用。
總結起來:
- 降低了資源的消耗:通過池化技術,重復利用已創建好的線程。
- 增加了響應的速度:若有空閑線程時,直接執行任務,無需等待創建新的線程。
- 增加了系統的穩定性和可管理性:對系統而言,過多的線程會導致資源調度失衡,降低了系統的穩定性。線程池進行統一的管理(調優,監控和分配等),減少碎片化信息。
I. 線程池的類關系圖
【JDK1.8】

從線程池類圖關系可見,Java中線程池核心實現類是ThreadPoolExecutor, ThreadPoolExecutor實現的頂層接口是Executor,從圖中可以發現:
- Executor接口只有一個execute()方法,將任務的提交和運行進行解耦。
- ExecutorService接口在Executor的基礎上,增加了生命周期的控制(線程池狀態轉換)和submit()方法生成Future結果。
- AbstractExecutorService是一個抽象類,實現了ExecutorService接口中的submit()方法,並實現了任務的執行流程。
- ThreadPoolExecutor主要實現兩個功能:線程池生命周期管理,任務的執行execute()方法。
作為整個架構中最核心的東西-
ThreadPoolExecutor,接下來便從源碼的角度來分析一二。

II. 生命周期管理
線程池的信息變量ctl是一個原子整型變量,封裝了兩個概念行字段:
- workerCount: 表示線程池中有效的線程數量
- runState: 表示線程池當前的運行狀態
ctl信息
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor

- COUNT_BITS=29(十進制)
- CAPACITY=0000 1111 1111 1111 1111 1111 1111 1111(二進制)
信息獲取:
- int c = ctl.get();
- workerCount = workerCountOf(c); //ctl的低28位表示線程數量
- runState = runStateOf(c); //ctl的高四位表示狀態
運行狀態
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor

從源碼中可見,線程池主要有5種狀態:
RUNNING,
SHUTDOWN,
STOP,
TIDYING,
TERMINATED,狀態之間的關系如下圖:

- RUNNING: 可以接受新的任務請求,也可處理阻塞隊列中的任務
- SHUTDOWN: 不接受新的任務請求,但會處理阻塞隊列中的任務
- STOP: 不接受新的任務請求,阻塞隊列也會直接清空,正在處理的任務也會被直接中斷
- TIDYING: 所有的任務都已經終止,線程池中不存在有效線程
- TERMINATED: 線程池終止
從上述狀態轉換圖,我們發現狀態的切換主要由
shutdown(),
shutdownNow(), tryTerminate()以及
terminated()幾個方法實現。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdown()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptIdleWorkers(): 終止空閑線程。

從代碼中可以發現,執行
shutdown()函數會將線程池狀態置為
SHUTDOWN狀態,並且會終止所有的空閑線程。這里通過
java.util.concurrent.ThreadPoolExecutor.Worker.tryLock()方法嘗試獲取當前線程的
AQS獨占模式鎖,如果目標線程處於空閑狀態,則可成功獲取鎖;反之,若線程正在執行task,則獲取鎖失敗,以此來判斷線程是否處於空閑狀態。具體的流程圖如下:

注意:
shutdown()函數僅僅是終止了線程池中
空閑的線程,正在執行任務的線程依舊可以正常工作,所以處於
SHUTDOWN狀態下的線程池依舊可以從阻塞隊列中獲取任務並執行,但不會接受新的task請求(詳情見添加worker部分)。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.shutdownNow()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.interruptWorkers(): 終止線程池中所有的線程。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.interruptIfStarted(): 中斷當前的線程。

從代碼中可見,執行
shutdownNow()函數會將線程池置為
STOP狀態,終止線程池中所有的線程,由於線程池無法執行task,所以這里會將阻塞隊列中所有的task取出並返回。具體的流程圖如下:

注意:由於所有的線程都被終止了,所以處於
STOP狀態下的線程池會中斷當前正在執行的任務,無法從阻塞隊列中獲取任務並執行,也無法接受新任務的請求。
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.tryTerminate()

從代碼中可見,
tryTerminate()函數僅在線程池處於
STOP
狀態或者
SHUTDOWN
狀態下阻塞隊列為空的場景下,才會進一步執行,否則執行結束。需要注意的是,處於
SHUTDOWN狀態下,雖然阻塞隊列為空,但是依然存在線程正在執行任務的可能,所以需要進行二次檢查。在將線程池狀態置為
TIDYING狀態前,如果線程池信息發生任何變化,都需要重新檢查,避免一些突發異常的發生。具體流程圖如下:

注意:線程池處於
TIDYING狀態下會直接執行
terminated()方法,默認該方法為空(當然用戶可進行自定義重寫),之后將線程池狀態置為
TERMINATED,理論上處於該狀態下的線程池已經結束,所以喚醒所有等待線程池結束的線程,執行其任務。
III. 運行機制

從圖中可見整個運行機制主要分為以下幾個模塊:
- 任務請求
- 任務分配
- 添加worker
- 運行worker
- 任務拒絕
接下來,我們看看對應的源碼是如何處理的。
任務請求
ThreadPoolExecutor實現了任務請求和執行的解耦,用戶無需關心是如何執行的任務,只需要提供一個可執行的任務,然后調用
execute()方法即可。但在實際編碼中,我們常常需要獲取任務執行的結果,因此,ExecutorService接口在Executor的基礎上增加
submit()方法,將任務封裝成
RunnableFuture對象,將執行結果保存為
Future對象。綜上所言,任務請求有兩種方法,分別如下:
- void execute(Runnable command):不需要獲取任務結果。
- <T> Future<T> submit(Callable<T> task):需要獲取任務結果。
execute(Runnable command)見任務分配模塊。
submit(Callalbe<T> task)見獲取任務結果模塊。
任務分配
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.execute(Runnable command)

從代碼中可見,執行
execute(Runnable command)函數分配任務時主要有以下四個選擇:
- 線程池線程數<核心線程數:創建一條新的線程,並在該線程上直接執行任務;
- 線程池線程數>=核心線程數 && 阻塞隊列未滿: 將任務推入阻塞隊列中,並創建一條新的線程(若此時線程池存在有效線程則不創建),該線程獲取阻塞隊列頭部的任務並執行;
- 線程池線程數>=核心線程數 && 阻塞隊列已滿 && 線程池線程數<最大線程數:創建一條新的線程,並在該線程直接執行任務;
- 線程池線程數>最大線程數 && 阻塞隊列已滿:任務拒絕。
詳細流程圖如下:

注意:
線程池線程數>=核心線程數 && 阻塞隊列未滿場景下僅當線程池無有效線程時才會創建新的線程,因為當線程池中線程執行完任務后會再次嘗試去阻塞隊列獲取任務並執行,直到阻塞隊列為空才會處於空閑狀態。所以,多數場景下此時線程池的線程數=核心線程數,無需創建新的線程。當然,也存在臨界場景,比如:此時正好所有的線程都恰好執行完任務並被銷毀,為了避免運行異常,則需創建一個新的線程,從阻塞隊列中獲取任務並執行。
添加worker
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.addWorker(Runnable firstTask, boolean core)

從代碼可見,
addWorker(Runnable firstTask, boolean core)函數擁有兩個入參:
- firstTask:新創建線程執行的第一個任務,這里特指新提交的任務
- core:ture表示核心線程數,false表示最大線程數
此外,只有線程池處於(
RUNNING狀態)或者處於(
SHUTDOWN狀態+阻塞隊列不為空)時,才可以創建worker對象,並且在
SHUTDOWN狀態下
firstTask(新提交的任務)必須為空,才會將新創建的worker對象加入到workers列表中,否則添加worker任務失敗,即
SHUTDOWN狀態下不接受新提交的任務請求。

注意:當使用
java.util.concurrent.ThreadPoolExecutor.compareAndIncrementWorkerCount()方法嘗試登記線程失敗時,需要判斷下是否已經影響了線程池的狀態,如果有,則重新獲取線程池狀態進行相關校驗,若沒有,則重新獲取線程池線程數量,並進行線程池線程數量的檢測。
運行線程任務通過
thread.start()方法,即調用了worker對象的
run()方法,所以執行worker線程任務會調用
worker.run()方法。
運行worker
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.Worker.run()

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.runWorker(Worker w)

從代碼可見,
1)
runWorker(Worker w)函數會先執行
firstTask(如果不為空),等第一個任務執行完會繼續嘗試從阻塞隊列中獲取任務,然后繼續執行;直到阻塞隊列為空。
2)這里並沒有使用前面常見的
可重入鎖ReentranLock,而是使用了自身繼承的
AQS(AbstractQueuedSynchronizer)鎖。當獲取任務成功后,目標線程獲取
AQS獨占模式鎖,表示該線程處於忙碌狀態,直到任務執行完畢才會釋放。如果上個任務執行完畢后,目標線程一直無法獲取新的任務(阻塞隊列為空),則不會獲取
AQS獨占模式鎖,表示目標線程處於空閑狀態。
3)任務的最終執行還是調用任務的
run()方法,所以請求的任務是
可執行命令Runnable task.
具體的流程圖如下:

注意:在實際執行任務前,需要獲取線程池的狀態,確保此時調用
shutdownNow()方法可以及時中斷任務。此外,前置任務和后置任務默認為空,用戶可自定義重寫。
任務拒絕
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.reject(Runnable command)

從代碼可見,拒絕任務時調用了
handler對象的
rejectedExecution()方法。
ThreadPoolExecutor實現了
四種不同的策略類,分別為:
源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy:調用主線程(調用ThreadPoolExecuotr的線程)直接執行任務。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.AbortPolicy: 直接放棄任務,拋出java.util.concurrent.RejectedExecutionException異常。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardPolicy: 直接放棄任務,不做任何處理。

源碼分析(JDK1.8)java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy: 獲取阻塞隊列頭部的任務,並直接將其拋棄;然后,重新請求任務。

獲取任務結果
在實際編碼中,我們常常需要獲取任務執行的結果,因此,
ExecutorService接口在
Executor的基礎上增加
submit()方法,將任務封裝成
RunnableFuture對象,將執行結果保存為
Future對象。
源碼分析(JDK1.8)java.util.concurrent.AbstractExecutorService.submit(Callalbe<T> task)

注意:
AbstractExecutorrService同樣實現
submit(Runnable<T> task)方法,這里僅以
submit(Callalbe<T> task)方法進行分析。
源碼分析(JDK1.8)java.util.concurrent.AbstractExecutorService.newTaskFor(Callable<T> callable): 將任務封裝成RunableFuture對象。

從上述運行worker模塊可知,線程上執行任務時,會調用任務的
run()方法,這里封裝成
java.util.concurrent.FutureTask對象,在線程執行任務時,會調用
FutureTask.run()方法
源碼分析(JDK1.8)java.util.concurrent.FutureTask.run()

源碼分析(JDK1.8)java.util.concurrent.FutureTask.set():將結果保存到outcome屬性中,以便后面獲取結果。

源碼分析(JDK1.8)java.util.concurrent.FutureTask.get():取出運行結果。

注意:這里調用
get()方法會一直等待任務執行結束或拋出異常,然后返回結果。
源碼分析(JDK1.8)java.util.concurrent.FutureTask.get(long timeout, TimeUnit unit):取出運行結果,超時拋出
java.util.concurrent.TimeoutException異常。

注意:
get(long timeout, TimeUnit unit)中
unit表示時間單位(例如:SECONDS),
timeout表示時間值。調用該函數獲取運行結果時,如果等待時間超時,會直接拋出
java.util.concurrent.TimeoutException異常。
具體流程圖如下:

IV.實際案例
submit(Callable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static String task(String args) throws Exception {
Thread.sleep(1000); System.out.println(String.format("submit - thread name: %s, args: %s", Thread.currentThread().getName(), args)); return args; } private static void submitTask() throws InterruptedException, ExecutionException, TimeoutException { Map<Future, String> futures = new ConcurrentHashMap<>(); final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); Future f = executor.submit(() -> task(str)); futures.put(f, str); } for (Future f: futures.keySet()) { String result = (String) f.get(60, TimeUnit.SECONDS); System.out.println(String.format("submit - thread name: %s, result: %s", Thread.currentThread().getName(), result)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { submitTask(); } }
輸出結果:
submit - thread name: pool-1-thread-1, args: h
submit - thread name: pool-1-thread-3, args: l
submit - thread name: pool-1-thread-4, args: l
submit - thread name: pool-1-thread-2, args: e
submit - thread name: main, result: h
submit - thread name: main, result: e
submit - thread name: main, result: l
submit - thread name: main, result: l
submit - thread name: pool-1-thread-3, args: o
submit - thread name: main, result: o
execute(Runnable task)
public class MultiThreadPool { private static List<String> hello = Arrays.asList("h", "e", "l", "l", "o"); private static class Task implements Runnable { private String arg; Task(String arg) { this.arg = arg; } public void run() { System.out.println(String.format("execute - thread name: %s, args: %s", Thread.currentThread().getName(), arg)); } } private static void executeTask() throws InterruptedException {final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()); try { for (String str : hello) { Thread.sleep(1); executor.execute(new Task(str)); } } finally { executor.shutdown(); } } public static void main(String[] args) throws Exception { executeTask(); } }
輸出結果:
execute - thread name: pool-1-thread-3, args: l
execute - thread name: pool-1-thread-1, args: h execute - thread name: pool-1-thread-4, args: l execute - thread name: pool-1-thread-2, args: e execute - thread name: pool-1-thread-3, args: o