一、線程池定義和使用
jdk 1.5 之后就引入了線程池。
1.1 定義
從上面的空間切換看得出來,線程是稀缺資源,它的創建與銷毀是一個相對偏重且耗資源的操作,而Java線程依賴於內核線程,創建線程需要進行操作系統狀態切換。為避免資源過度消耗需要設法重用線程執行多個任務。線程池就是一個線程緩存,負責對線程進行統一分配、調優與監控。(數據庫連接池也是一樣的道理)
什么時候使用線程池?
單個任務處理時間比較短;需要處理的任務數量很大。
線程池優勢?
- 重用存在的線程,減少線程創建、消亡的開銷,提高性能、提高響應速度。
- 當任務到達時,任務可以不需要等到線程創建就能立即執行。
- 提高線程的可管理性,可統一分配,調優和監控。
1.2 線程池在 jdk 已有的實現
- 在 juc 包下,有一個接口:Executor :
- Executor 又有兩個子接口:ExecutorService 和 ScheduledExecutorService,常用的接口是 ExecutorService。
- 同時常用的線程池的工具類叫 Executors。
例如:
ExecutorService service = Executors.newCachedThreadPool();
Executor 框架雖然提供了如 newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()、newScheduledThreadPool() 等創建線程池的方法,但都有其局限性,不夠靈活。
上面的幾種方式點進去會發現,都是用 ThreadPoolExecutor 進行創建的:
- newSingleThreadExecutor 字面意思 簡單線程執行器。

- newFixedThreadPool 字面意思固定的線程池,傳參就是線程固定數目,適用於執行長期任務的場景。

- newCachedThreadPool 字面意思 緩存線程池,核心線程0,最大線程非常大,動態創建的特點。

- newScheduledThreadPool 字面意思 時間安排線程池,指定核心線程數。


- newSingleThreadScheduledExecutor 字面意思 單線程安排執行器,也就是基於只有一個核心線程的執行器之外,又可以擴展。其中又用 DelegatedExecutorService 委托執行器服務進行了包裝。

可以看到,上面直接用 Executors 工具類默認的一些實現 new 出來的線程池都是用的 ThreadPoolExecutor 線程執行器這個類進行構造的,不過參數不同,導致了效果的側重點不同。
因此,自己創建線程池推薦的方法就是,直接使用 ThreadPoolExecutor 進行個性化的創建:

構造方法種的參數有 7 個:
- corePoolSize:線程池維護線程的最少數量 (core : 核心)
- maximumPoolSize:線程池維護線程的最大數量,顯然必須>=1
- keepAliveTime:線程池維護的多余的線程所允許的空閑時間,最長可以空閑多久,時間到了,如果超過 corePoolSize 的線程一直空閑,他們就會被銷毀。
- unit:線程池維護線程所允許的空閑時間的單位
- workQueue:線程池所使用的緩沖隊列,已經提交但是沒有執行的任務會放進這里
- threadFactory:生成線程池種工作線程的線程工廠,一般使用默認
- handler:線程池對拒絕任務的處理策略,當隊列滿且工作線程已經達到maximumPoolSize。
阿里的 java 開發手冊,強制要求,通過 ThreadPoolExecutor 來自定義,不能使用內置的,避免資源耗盡。這個很好理解,1 的類型就只有一個核心線程和最大現場,2 沒有擴展性,3、4、5的最大線程數太大,內存會爆炸。
1.3 線程池使用方法
這里我們用固定線程池來測試,傳入核心線程數為 5,最大數量自然就也是 5,
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
//模擬10個顧客辦理業務
for (int i = 0; i < 10; i++){
//execute 執行方法,傳入參數為實現了 Runnable 接口的類
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"號線程辦理業務");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
其中,execute 方法就是將任務提交的方法,我們用 lambda 表達式給 execute 方法傳入了參數,實際上相當於一個完整的實現了 Runnable 接口的類。
執行結果:

可以看到,我們循環了 10 次,執行任務,但是線程只用到了 1-5 ,其中有多次復用。
再比如,我們按照各種類型的線程池,自己定義一個線程池,核心線程數 2, 最大線程數 5,阻塞隊列長度為 3:
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
//模擬10個顧客辦理業務
for (int i = 0; i < 10; i++){
//execute 執行方法,傳入參數為實現了 Runnable 接口的類
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"號線程辦理業務");
});
}
} catch (Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
同樣 10 個線程,執行起來:

可以看到,執行了 8 個任務后,就拋出了異常,說明執行了拒絕策略。
上面兩個示例,我們的任務本身都是沒有返回值的,如果創建的任務本身需要有返回值就需要實現 Callable 接口,然后搭配FutureTask 來傳入任務,那么線程池就應該調用 submit 方法而不是 execute。
二、線程池底層原理
2.1 線程池執行邏輯
處理的流程核心就 execute() 方法,他接收一個實現了 Runnable 接口的任務,決定對這個任務的處理策略。

下圖是一個比較形象的策略流程:

可能的情況有四種,也就是圖中的1234:
- 如果線程池中的線程數量少於corePoolSize,就創建新的核心線程來執行新添加的任務
- 如果線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到隊列workQueue中
- 如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的非核心線程來處理被添加的任務
- 如果線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來執行拒絕策略。會拋出異常,一般的拒絕策略是RejectedExecutionException
注意,執行的順序,在 java 里有一個不合理的地方:
在池里安排任務的時候,我們的核心線程,隊列,非核心線程里面排的任務順序應該是 1 2 3;
但是真正實現上,如果三個都滿了,開始執行的時候,依次執行的順序卻是 核心線程,非核心線程,隊列。也就是執行順序會變成 1 3 2
2.2 拒絕策略
有些時候,我們並不希望拒絕策略是直接拋出異常,那么 jdk 里面提供的默認拒絕策略有 4 種,他們體現在代碼中就是 ThreadPoolExecutor 的四個靜態內部類:

2.2.1 CallerRunsPolicy:調用者運行策略。
這種策略不會拋棄任務,也不拋出異常,而是將某些任務回退給調用者,從而降低新任務的流量。

實現非常簡單,那就是如果說 e 這個線程池已經 shutdown 了,那么就什么也不干,也就是這個任務直接丟了;否則,r.run() ,相當於調用這個方法的線程里直接執行了這個 Runnable 任務。
此時我們可以把 1.3 里的代碼修改一下,只修改策略為 CallerRunsPolicy:

可以看到,有些任務會在 main 線程里處理。
2.2.2 AbortPolicy:終止策略。
拋異常。前面已經試過了,這個是默認的拒絕策略。

2.2.3 DiscardPolicy:丟棄任務。
可以看到,源碼里就是是什么也不做。如果場景中允許任務丟失,這個是最好的策略。

2.2.4 DiscardOldestPolicy:拋棄隊列中等待最久的任務。
拋棄隊列中等待最久的任務,然后把當前的任務加入隊列中,嘗試再次提交當前任務。
源碼里也就是利用隊列操作,進行一次出隊操作,然后重新調用 execute 方法。

2.3 線程池的五種狀態
和一個正常的線程的生命周期區別開,這個是線程池里線程的狀態。
- Running,能接受新任務以及處理已添加的任務;
- Shutdown,不接受新任務,可以處理已經添加的任務,也就是不能再調用execute或者submit了;
- Stop,不接受新任務,不處理已經添加的任務,並且中斷正在處理的任務;
- Tidying,所有的任務已經終止,CTL記錄的任務數量為0,CTL負責記錄線程池的運行狀態與活動線程數量;
- Terminated,線程池徹底終止,則線程池轉變為terminated的狀態。

如圖所示,從running狀態轉換為 shutdown,調用 shutdown()方法;如果調用shutdownNow()方法,就直接會變成stop。
terminated()是鈎子函數,默認是什么也不做的,我們可以重寫,然后決定結束之前要做一些別的處理邏輯。這個鈎子函數,就是模板模式的方法。
三、阻塞隊列
線程池里的 BlockingQueue,阻塞隊列,事實上在消費者生產者問題里的管程法實現,我們的策略也是類似阻塞隊列的,用它來做一個緩存池的作用。
阻塞隊列:任意時刻,不管並發有多高,永遠保證只有一個線程能夠進行隊列的入隊或出隊操作。也就意味着他是能夠保證線程安全的。
另外,阻塞隊列分為有界和無界隊列,理論上來說一個是隊列的size有固定,另一個是無界的。對於有界隊列來說,如果隊列存滿,只能出隊了,入隊操作就只能阻塞。
在 juc 包里,阻塞隊列的實現有很多:
- ArrayBlockingQueue:有界阻塞隊列;
- LinkedBlockingQueue:鏈表結構(大小默認值為Integer.MAX_VALUE)的阻塞隊列;
- PriorityBlockingQueue:支持優先級排序的無界阻塞隊列;
- DelayQueue:使用優先級隊列實現的延遲無界阻塞隊列;
- SynchronousQueue:不存儲元素的阻塞隊列,相當於只有一個元素;
- LinkedTransferQueue:鏈表組成的無界阻塞隊列;
- LinkedBlockingDeque:鏈表組成的雙向阻塞隊列。
對於 BlockingQueue 來說,核心操作主要有幾類:插入、刪除、查找。

其中的四種異常策略:
- 拋異常:如果阻塞隊列滿,再往隊列里 add 插入元素會拋 IllegalStateException:Queue full,如果阻塞隊列空,再 remove 就會拋 NoSuchElementException。
- 特殊值:offer 方法:成功 true,失敗 false,poll 方法,成功就返回元素,沒有就返回 null。
- 阻塞:阻塞隊列滿的時候,生產者線程繼續 put 元素,隊列就會阻塞直到可以 put 數據或者響應中斷然后退出,阻塞隊列空的時候,消費者線程繼續 take 元素,隊列就會一直阻塞直到有元素可以 take。
- 超時退出:阻塞隊列滿的時候,會阻塞生產者線程且超時退出,空的時候會阻塞消費者線程且超時退出。
那么使用的時候,增刪的方法按對應的同一組使用比較合理。(其實這個策略的設計對應的在單線程集合里也有,那就是Deque接口的實現類 LinkedList 使用的時候,不同的增刪方法策略不同)