1、介紹
Fork/Join 框架是 Java7 提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在多核計算機中正確使用可以很好的發揮cpu的作用,提高程序的執行效率。框架采用工作竊取算法,當有子任務線程處理完當前任務時,它會從其他線程執行的任務隊列里竊取任務來執行,從而提高整體的執行效率。為了減少線程間的任務資源競爭,隊列通常使用雙端隊列,別竊取任務線程永遠從啥UN廣大UN隊列的嘔吐不獲取任務執行,而竊取任務的線程永遠從雙端隊列的尾部獲取任務執行。
2、使用
根據業務場景來考慮是否需要使用Fork/Join框架來進行任務的拆分和匯總操作。當需要時,比如說需要執行一個很大的業務計算之類的,此時使用Fork/Join框架分以下兩步:
- 對任務進行分割 把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小
- 執行分割的子任務並匯總結果 分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據
具體實現以上兩步:
- 創建ForkJoinTask 它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join 框架提供了以下兩個子類: RecursiveAction:用於沒有返回結果的任務 RecursiveTask :用於有返回結果的任務
- 使用ForkJoinPool執行ForkJoinTask 任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務
3、示例
3.1 使用fork
public class SumTask extends RecursiveTask <Integer>{ private static final Integer THRESHOLD = 1000; private int start; private int end; public SumTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { Integer sum = 0; boolean isOk = (end - start) <= THRESHOLD; if(isOk) { for(int i = start; i <= end; i ++) { sum += i; } return sum; } int middle = (end + start) / 2; //子任務遞歸 SumTask sumSubTask = new SumTask(start, middle); SumTask sumSubTask1 = new SumTask(middle + 1, end); //fork子任務 sumSubTask.fork(); sumSubTask1.fork(); //join子任務 Integer join = sumSubTask.join(); Integer join1 = sumSubTask1.join(); sum = join + join1; //計算結果 return sum; } }
3.2 使用invokeAll
public class SumTask2 extends RecursiveTask <Integer>{ private static final Integer THRESHOLD = 1000; private int start; private int end; public SumTask2(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { Integer sum = 0; boolean isOk = end - start <= THRESHOLD; if(isOk) { for(int i = start; i <= end; i ++) { sum += i; } // System.out.println(String.format("compute %d-%d = %d", start, end, sum)); return sum; } //除以2 int middle = (end + start) / 2; //子任務遞歸 // System.out.println(String.format("fork %d-%d => %d-%d&%d-%d", start, end, start, middle - 1, middle, end)); SumTask2 sumSubTask = new SumTask2(start, middle - 1); SumTask2 sumSubTask1 = new SumTask2(middle, end); //fork子任務 invokeAll(sumSubTask, sumSubTask1); //join子任務 Integer join = sumSubTask.join(); Integer join1 = sumSubTask1.join(); sum = join + join1; //計算結果 return sum; } }
測試
ForkJoinPool fjp2 = new ForkJoinPool(); SumTask2 sumTask2 = new SumTask2(start, end); long begin3 = System.currentTimeMillis(); Integer invoke = fjp2.invoke(sumTask2); long end3 = System.currentTimeMillis(); System.out.println("計算結果3為 sum = " + invoke + ",計算時長為" + begin3 + "-" + end3 + "--- " + (end3 - begin3) + "ms"); ForkJoinPool fjp = new ForkJoinPool(); long begin2 = System.currentTimeMillis(); SumTask sumTask = new SumTask(start, end); ForkJoinTask<Integer> submit = fjp.submit(sumTask); Integer join = submit.join(); long end2 = System.currentTimeMillis(); System.out.println("計算結果2為 sum = " + join + ",計算時長為" + begin2 + "-" + end2 + "--- " + (end2 - begin2) + "ms");
結果
從結果(可以多次運行測試)可以看出,使用invokeAll方式效率比使用單獨fork方式高,所以在使用時盡量采用invokeAll方式,這樣可以充分利用線程池中的線程去執行任務。