Fork/Join是什么?
Fork意思是分叉,Join為合並。Fork/Join是一個將任務分割並行運行,然后將最終結果合並成為大任務的結果的框架,父任務可以分割成若干個子任務,子任務可以繼續分割,提供我們一種方便的並行任務功能,滿足實際場景的業務需求,思想類似於MapReduce。任務的分割必須保證子任務獨立,不會相互依賴結果。
從哪里開始?
Fork/Join框架主要有如下接口和類:
- ForkJoinPool:一個線程池,用於執行調度分割的任務,實現了ExecutorService接口。提供三種執行任務的方式:
1、execute:最原生的執行方式,以異步執行,並且無返回結果。
2、submit:異步執行,有返回結果,返回結果是封裝后的Future對象。
3、invoke和invokeAll:異步執行,有返回結果,會等待所有任務執行執行完成,返回的結果為無封裝的泛型T。
- ForkJoinTask:抽象的分割任務,提供以分叉的方式執行,以及合並執行結果。
- RecursiveAction:異步任務,無返回結果。通常自定義的任務要繼承,並重寫compute方法,任務執行的就是compute方法。
- RecursiveTask:異步任務,有返回結果。通常自定義的任務要繼承,並重寫compute方法,任務執行的就是compute方法。
核心類圖
從核心類圖看出,要想開始一個分割的並行任務,可以創建一個ForkJoinPool線程池,同時創建無返回結果的任務RecursiveAction或有返回結果的任務RecursiveTask,最后調用線程池ForkJoinPool的execute或submit或invoke方法執行任務,完成后合並結果。
實例
我們以一個有返回結果的並行任務實例進行測試。計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架。並對比普通計算和並行計算的耗時差異。
package com.misout.forkjoin; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; /** * 計算從起始值到結束值得連續數的累加結果,利用Fork/Join框架 * @author Misout * @date 2018-01-13 16:06:44 */ public class SumTask extends RecursiveTask<Long> { private static final long serialVersionUID = 4828818665955149519L; /** 每個任務最多允許計算的數字個數閾值,超過這個閾值,任務進行拆分 */ private static final long THRESHOLD = 1000L; /** 起始值 */ private Long startNumber; /** 結束值 */ private Long endNumber; public SumTask(Long startNumber, Long endNumber) { this.startNumber = startNumber; this.endNumber = endNumber; } /** * 累加數的個數超過閾值1000個,拆分成2個子任務執行。子任務繼續作拆分。計算完,合並結果。 */ @Override protected Long compute() { if(startNumber > endNumber) { System.out.println("start number should be smaller than end number"); return 0L; } if(endNumber - startNumber < THRESHOLD) { return this.getCount(startNumber, endNumber); } else { Long mid = (startNumber + endNumber) / 2; RecursiveTask<Long> subTask1 = new SumTask(startNumber, mid); RecursiveTask<Long> subTask2 = new SumTask(mid + 1, endNumber); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } /** * 普通累加執行方法 * @param start 起始數 * @param end 結束數 * @return 累加和 */ protected Long getCount(Long start, Long end) { Long sum = 0L; for(long i = start; i <= end; i++) { sum += i; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); Long start = 5L; Long end = 3463434L; SumTask task = new SumTask(start, end); Long startTime = System.currentTimeMillis(); Long sum = forkJoinPool.invoke(task); Long endTime = System.currentTimeMillis(); System.out.println("fork/join : sum = " + sum + ", cost time = " + (endTime - startTime) + "ms"); startTime = System.currentTimeMillis(); Long sum2 = task.getCount(start, end); endTime = System.currentTimeMillis(); System.out.println("normal : sum = " + sum2 + ", cost time = " + (endTime - startTime) + "ms"); } }
說明:SumTask繼承RecursiveTask,並實現了compute方法。在compute方法中會進行任務分割,並繼續生成子任務,子任務仍然以分割的方式運行。
運行結果對比:
fork/join : sum = 5997689267885, cost time = 290ms
normal : sum = 5997689267885, cost time = 41ms
注意事項:任務拆分的深度最好不要太多,否則很容易因創建的線程過多影響系統性能。
work-stealing規則
在Java的API說明中提到,ForkJoinPool線程池與ThreadPoolExecutor線程池不同的地方在於,ForkJoinPool善於利用竊取工作執行加快任務的總體執行速度。實際上,在ForkJoinPool線程池中,若一個工作線程的任務隊列為空沒有任務執行時,便從其他工作線程中獲取任務主動執行。為了實現工作竊取,在工作線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行並行計算,減少了線程競爭。但是當隊列中只存在一個任務了時,兩個線程去取反而會造成資源浪費。
