轉載自 https://baeldung-cn.com/java-fork-join
1. 概述
fork/join 框架在 Java 7 中引入。它基於分而治之的思想,通過嘗試利用所有可用處理器內核來幫助加速並行計算。
什么是分而治之?它分為任務分解,和結果合並兩個階段。
首先 fork 通過遞歸方式將一個復雜任務分解為更小的獨立子任務,直至子任務簡單到無需再分。
分完之后, “join” 部分開始,將所有子任務結果遞歸地合並為一個結果。如果任務的返回值為 void
類型,那么程序只需等待所有子任務執行完畢。
為了提高並行計算效率,fork/join 框架使用一個名為 ForkJoinPool
的線程池。該線程池負責管理類型為 ForkJoinWorkerThread
的工作線程。
ForkJoinPool
ForkJoinPool
是整個框架的核心,它實現了 ExecutorService 接口。
我們知道一個工作線程(Worker Thread)同一時間只能執行一個任務, ForkJoinPool
不會為每個子任務創建一個獨立的線程,而是每個線程都維護了一個雙端隊列(deque),用來存儲需要執行的任務。
這種架構對於借助工作竊取算法平衡線程的工作負載至關重要。
2.1. 工作竊取算法
何為工作竊取算法?
簡單來說 – 空閑的線程嘗試從其他繁忙線程的deque雙端隊列中竊取一個任務來執行。
默認情況下,一個工作線程從 deque 頭部讀取任務。如果隊列為空,則該線程會從其他繁忙線程的 deque 尾部或全局隊列中獲取一個任務。
這種算法最大限度地避免發生線程競爭任務,同時減少線程尋找任務的次數。
2.2. 實例化 ForkJoinPool
在 Java 8 中,獲取 ForkJoinPool
實例最便捷的方法是使用其靜態方法 commonPool()
。顧名思義,它返回公共池的引用,公共池是每個 ForkJoinTask 的默認線程池。
根據 Oracle文檔,建議使用預定義的公共線程池以減少資源消耗,避免每個任務都創建一個單獨的線程池。
Java 7 中,需要我們自己實現單例模式,例如用餓漢式:
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
獲取實例:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
通過 ForkJoinPool
構造函數我們可以創建自定義線程池,自定義並行級別(parallelism),線程創建工廠(ThreadFactory),異常處理器(ExceptionHandler)。上面例子中,parallelism參數為2, 表示該線程池將使用2個處理器內核。
3. ForkJoinTask <V>
ForkJoinTask
是我們任務的基類。實際中,我們應該繼承它的兩個子類:無返回值的 RecursiveAction
和帶返回值的 RecursiveTask<V>。兩者都有一個抽象方法 compute()
,在里面實現我們的任務執行邏輯。
3.1. RecursiveAction 示例
下面例子中,我們將變量 workload
的所有字母轉為大寫並打印日志。本例僅僅是用於演示目的,這個任務沒有實際意義。
為了演示框架任務分解行為,本例使用 createSubtask()
方法在workload.length()大於設定閾值時分解任務。
workload
被遞歸地分解為子串,並創建基於這些子串的 CustomRecursiveTask
實例。
結果返回一個子任務集合 List<CustomRecursiveAction>。
使用 invokeAll()
將集合中的任務提交到 ForkJoinPool
。
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
可以套用此模版開發我們自己的 RecursiveAction
類。創建一個對象表示我們的總任務,選擇一個合適的閾值,定義一個用於分解任務的方法,以及實際處理任務的方法。
3.2. RecursiveTask<V>
對於帶返回值的任務,實現邏輯類似,只是需要把每個子任務的結果合並到一個結果中:
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}
本例中,變量 arr
表示我們的任務。createSubtasks()
方法遞歸地將一個大任務分解為小任務,直到小於閾值時不再分解。然后,invokeAll()
方法將子任務提交到公共池,並返回一個 Future
集合。
為了觸發執行,為每個子任務調用 join()
方法。
這里使用了 Java 8 中的 Stream API 實現。sum()
方法將子結果合並為最終結果。
4. 提交任務到 ForkJoinPool 中
要將任務提交到 ForkJoinPool
線程池中,可以使用:
submit() 或 execute() 方法 :
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();
invoke()
方法 fork 任務並等待返回結果,不需要手動 join 操作。
int result = forkJoinPool.invoke(customRecursiveTask);
invokeAll()
方法將 ForkJoinTask
任務批量提交到 ForkjoinPool
。將任務作為參數傳入(該方法有多個重載方法,可以傳2個任務,或變長參數,或集合形式),fork然后按順序返回一個 Future
集合。
或者,你也可以單獨使用 fork() 和 join() 方法。fork()
提交一個任務到線程池中,但不觸發執行。這種情況下,必須手動調用 join
方法。如果是 RecursiveAction
類型的任務,join()
返回 null
,如果是RecursiveTask<V>類型,則返回任務執行結果。
customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();
在上面 RecursiveTask<V> 例子中我們使用 invokeAll()
批量提交子任務到線程池中。 同樣的工作也可以通過 fork()
和 join()
來完成,不過這會影響結果的排序。
為了避免混淆,通常最好使用 invokeAll()
方法將多個任務提交到 ForkJoinPool
。
5. 總結
在處理大型任務時,使用 fork/join 框架能加快處理速度。但前提是遵守以下幾個原則:
- 盡可能少使用線程池 – 大多數場景下,一個應用最好使用一個線程池
- 使用默認的公共線程池l, 如果不需要特殊調優
- 使用一個合理的閾值 將 ForkJoinTask 拆分為子任務
- 避免在ForkJoinTask中編寫阻塞代碼