一、介紹
使用 java8 lambda 表達式大半年了,一直都知道底層使用的是 Fork/Join 框架,今天終於有機會來學學 Fork/Join 框架了。
Fork/Join 框架是 Java 7 提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
Fork/Join 的運行流程示意圖:
比如,一個 1+2+3+...+100 的工作任務,我們可以把它 Fork 成 10 個子任務,分別計算這 10 個子任務的運行結果。最后再把 10 個子任務的結果 Join 起來,匯總成最后的結果。
為了減少線程間的競爭,通常把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等着,不如去幫其它線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。線程的這種執行方式,我們稱之為“工作竊取”算法。
二、設計
實現 Fork/Join 框架的設計,大抵需要兩步:
1. 分割任務
首先我們需要創建一個 ForkJoin 任務,把大任務分割成子任務,如果子任務不夠小,則繼續往下分,直到分割出的子任務足夠小。
在 Java 中我們可以使用 ForkJoinTask 類,它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下,我們只需要繼承它的子類:
- RecursiveAction — 用於沒有返回結果的任務
- RecursiveTask — 用於有返回結果的任務
2. 任務執行並返回結果
分割的子任務分別放在雙端隊列里,然后啟動幾個線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。
在 Java 中任務的執行需要通過 ForkJoinPool 來執行。
三、示例
來一個阿里面試題:百萬級 Integer 數據量的一個 array 求和。
public class ArrayCountTask extends RecursiveTask<Long> {
/**
* 閾值
*/
private static final Integer THRESHOLD = 10000;
private Integer[] array;
private Integer start;
private Integer end;
public ArrayCountTask(Integer[] array, Integer start, Integer end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
// 最小子任務計算
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
sum += array[i];
}
} else {
// 把大於閾值的任務繼續往下拆分,有點類似遞歸的思維。 recursive 就是遞歸的意思。
int middle = (start + end) >>> 1;
ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle);
ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end);
// 執行子任務
//leftArrayCountTask.fork();
//rightArrayCountTask.fork();
// invokeAll 方法使用
invokeAll(leftArrayCountTask, rightArrayCountTask);
//等待子任務執行完,並得到其結果
Long leftJoin = leftArrayCountTask.join();
Long rightJoin = rightArrayCountTask.join();
// 合並子任務的結果
sum = leftJoin + rightJoin;
}
return sum;
}
}
public static void main(String[] args) {
// 1. 造一個 int 類型的百萬級別數組
Integer[] array = new Integer[150000000];
for (int i = 0; i < array.length; i++) {
array[i] = new Random().nextInt(100);
}
// 2.普通方式計算結果
long start = System.currentTimeMillis();
long sum = 0;
for (int i = 0; i < array.length; i++) {
sum += array[i];
}
long end = System.currentTimeMillis();
System.out.println("普通方式計算結果:" + sum + ",耗時:" + (end - start));
long start2 = System.currentTimeMillis();
// 3.fork/join 框架方式計算結果
ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length);
ForkJoinPool forkJoinPool = new ForkJoinPool();
sum = forkJoinPool.invoke(arrayCountTask);
long end2 = System.currentTimeMillis();
System.out.println("fork/join 框架方式計算結果:" + sum + ",耗時:" + (end2 - start2));
// 結論:
// 1. 電腦 i5-4300m,雙核四線程
// 2. 數組量少的時候,fork/join 框架要進行線程創建/切換的操作,性能不明顯。
// 3. 數組量超過 100000000,fork/join 框架的性能才開始體現。
}
ForkJoinTask 與一般任務的主要區別在於它需要實現 compute 方法,在這個方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用 fork 方法時,又會進入 compute 方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用 join 方法會等待子任務執行完並得到其結果。
在執行子任務時調用 fork 方法並不是最佳的選擇,最佳的選擇是 invokeAll 方法。因為執行 compute() 方法的線程本身也是一個 worker 線程,當對兩個子任務調用 fork() 時,這個worker 線程就會把任務分配給另外兩個 worker,但是它自己卻停下來等待不干活了!這樣就白白浪費了 Fork/Join 線程池中的一個 worker 線程,導致了4個子任務至少需要7個線程才能並發執行。
比如甲把 400 分成兩個 200 后,fork() 寫法相當於甲把一個 200 分給乙,把另一個 200 分給丙,然后,甲成了監工,不干活,等乙和丙干完了他直接匯報工作。乙和丙在把 200 分拆成兩個 100 的過程中,他倆又成了監工,這樣,本來只需要 4 個工人的活,現在需要 7 個工人才能完成,其中有3個是不干活的。
ForkJoinPool 由 ForkJoinTask 數組和 ForkJoinWorkerThread 數組組成。ForkJoinTask 數組負責將存放程序提交給 ForkJoinPool 的任務;而 ForkJoinWorkerThread 數組負責執行這些任務,ForkJoinWorkerThread 體現的就是“工作竊取”算法。
- 當我們調用 ForkJoinTask 的 fork 方法時,程序會調用 ForkJoinWorkerThread 的 pushTask 方法異步地執行這個任務,然后立即返回結果。
- 當我們調用 ForkJoinTask 的 join 方法時,程序會阻塞當前線程並等待獲取結果。
ForkJoinPool 使用 submit 或 invoke 提交的區別:invoke 同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit 是異步執行,只有在 Future 調用 get 的時候會阻塞。
ForkJoinPool 繼承自 AbstractExecutorService, 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。