前言
在上一篇《java線程池,阿里為什么不允許使用Executors?》中我們談及了線程池,同時又發現一個現象,當最大線程數還沒有滿的時候耗時的任務全部堆積給了單個線程, 代碼如下:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, //corePoolSize
100, //maximumPoolSize
100, //keepAliveTime
TimeUnit.SECONDS, //unit
new LinkedBlockingDeque<>(100));//workQueue
for (int i = 0; i < 5; i++) {
final int taskIndex = i;
executor.execute(() -> {
System.out.println(taskIndex);
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 輸出: 0
下圖很形象的說明了這個問題:

那么有沒有一種機制,在線程池中還有線程可以提供服務的時候幫忙分擔一些已經被分配給某一個線程的耗時任務呢?
答案當然是有的:工作竊取算法
工作竊取 (Work stealing)
這邊大家先不要將這個跟java掛鈎,因為這個屬於算法,一種思想和套路,並不是特定語言特有的東西,所以不同的語言對應的實現也不盡一樣,但核心思想一致。
這邊會用“工作者”來代替線程的說法,如果在java中這個工作者就是線程。
工作竊取核心思想是,自己的活干完了去看看別人有沒有沒干完的活,如果有就拿過來幫他干。
大多數實現機制是:為每個工作者程分配一個雙端隊列(本地隊列)用於存放需要執行的任務,當自己的隊列沒有數據的時候從其它工作者隊列中獲得一個任務繼續執行。
我們來看一張圖,這張圖是發生了工作竊取時的狀態。

可以看到工作者B的本地隊列中沒有了需要執行的規則,它正嘗試從工作者A的任務隊列中偷取一個任務。
為什么說嘗試?因為涉及到並行編程肯定涉及到並發安全的問題,有可能在偷取過程中工作者A提前搶占了這個任務,那么B的偷取就會失敗。大多數實現會盡量避免發生這個問題,所以大多數情況下不會發生。
並發安全的問題是怎么避免的呢?
一般是自己的本地隊列采取LIFO(后進先出),偷取時采用FIFO(先進先出),一個從頭開始執行,一個從尾部開始執行,由於偷取的動作十分快速,會大量降低這種沖突,也是一種優化方式。
Java中的工作竊取算法線程池
在Java 1.7新增了一個ForkJoinPool類,主要是實現了工作竊取算法的線程池,該類在1.8中被優化了,同時1.8在Executors類中還新增了兩個newWorkStealingPool工廠方法。
java7中的fork/join task 和 java8中的並行stream都是基於ForkJoinPool。
// 使用當前處理器數, 相當於調用 newWorkStealingPool(Runtime.getRuntime().availableProcessors());
public static ExecutorService newWorkStealingPool();
public static ExecutorService newWorkStealingPool(int parallelism);
同時 ForkJoinPool 還在全局建立了一個公共的線程池
ForkJoinPool.commonPool();
默認的並行度是當前JVM識別到的處理器數。這個值也是可以通過參數進行變更的,下面是可以通過JVM熟悉進行commonPool設置的參數。
前綴統一為: java.util.concurrent.ForkJoinPool.common.
比如 parallelism 就要寫為 java.util.concurrent.ForkJoinPool.common.parallelism
| 參數 | 描述 | 默認值 |
|---|---|---|
| parallelism | 並行級別 | JVM識別到的處理器數 |
| threadFactory | 線程工廠類名 | ForkJoinPool.DefaultForkJoinWorkerThreadFactory |
| exceptionHandler | 錯誤處理程序 | null |
| maximumSpares | 最大允許額外線程數 | 256 |
使用工作竊取算法的線程池來優化之前的代碼
ExecutorService executor = Executors.newWorkStealingPool(8);
for (int i = 0; i < 5; i++) {
final int taskIndex = i;
executor.execute(() -> {
System.out.println(taskIndex);
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 無序輸出 0~4
如果將Executors.newWorkStealingPool(8)改成ForkJoinPool.commonPool()會輸出什么?
如果你能知道輸出什么那么你對這個機制就算掌握了,會輸出當前運行環境中處理器(cpu)數量的次數(如果核算大於5就只會輸出5個結果)。
newWorkStealingPool 和 ForkJoinPool.commonPool 該優先選擇哪個?
這個沒有最優解,推薦執行的小任務(零散的)使用commonPool,而有特定目的的則使用newWorkStealingPool或 new ForkJoinPool。
使用ForkJoinPool.commonPool 需要注意的問題
commonPool默認使用當前環境的處理器格式來當做並行程度,如果遇上堵塞形任務一樣會遇到浪費算力的問題。
這點在容器化時需要特別注意,因為容器化的cpu個數限制往往不會太大。
這種時候可以通過設置默認的並行度或者使用newWorkStealingPool來手動指定並行度。
最后
為什么ForkJoinPool極少出現線程關鍵字?
現在許多語言淡化了線程這個概念,而golang中更是直接去掉了線程能力改為提供協程goroutine。
目的還是線程是OS的資源,OS對程序內部運行其實並沒有太了解,為了避免線程資源的浪費許多語言會自己管理線程。
對於程序來說我們關心的主要還是任務的並行運行,並不關心是線程還是協程。
下面是一些對應關系:
- CPU : 線程 (1:N)
- 線程 : 協程 (1:N)
CPU由OS管理,OS提供線程給程序使用,程序利用線程提供協程能力給應用使用。
ForkJoinPool一定更快嗎?
不,大家都知道做的事情越多邏輯越復雜效率會越低。
ForkJoinPool中的工作隊列,工作竊取都是需要額外管理的,同時也對線程調度和GC帶來了壓力。
所以ForkJoinPool並不是萬能葯大家根據具體需要去使用。
后面可能會跟大家分享下 Spring 中的 @Async。
