Java ForkJoinPool 用法及原理


轉載自 https://baeldung-cn.com/java-fork-join

1. 概述

fork/join 框架在 Java 7 中引入。它基於分而治之的思想,通過嘗試利用所有可用處理器內核來幫助加速並行計算。

什么是分而治之?它分為任務分解,和結果合並兩個階段。

首先 fork 通過遞歸方式將一個復雜任務分解為更小的獨立子任務,直至子任務簡單到無需再分。

分完之后, “join” 部分開始,將所有子任務結果遞歸地合並為一個結果。如果任務的返回值為 void類型,那么程序只需等待所有子任務執行完畢。

為了提高並行計算效率,fork/join 框架使用一個名為 ForkJoinPool 的線程池。該線程池負責管理類型為 ForkJoinWorkerThread 的工作線程。

ForkJoinPool

ForkJoinPool 是整個框架的核心,它實現了 ExecutorService 接口。

我們知道一個工作線程(Worker Thread)同一時間只能執行一個任務, ForkJoinPool 不會為每個子任務創建一個獨立的線程,而是每個線程都維護了一個雙端隊列(deque),用來存儲需要執行的任務。

這種架構對於借助工作竊取算法平衡線程的工作負載至關重要。

2.1. 工作竊取算法

何為工作竊取算法?

簡單來說 – 空閑的線程嘗試從其他繁忙線程的deque雙端隊列中竊取一個任務來執行

默認情況下,一個工作線程從 deque 頭部讀取任務。如果隊列為空,則該線程會從其他繁忙線程的 deque 尾部或全局隊列中獲取一個任務。

這種算法最大限度地避免發生線程競爭任務,同時減少線程尋找任務的次數。

2.2. 實例化 ForkJoinPool

在 Java 8 中,獲取 ForkJoinPool 實例最便捷的方法是使用其靜態方法 commonPool()。顧名思義,它返回公共池的引用,公共池是每個 ForkJoinTask 的默認線程池。

根據 Oracle文檔,建議使用預定義的公共線程池以減少資源消耗,避免每個任務都創建一個單獨的線程池。

Java 7 中,需要我們自己實現單例模式,例如用餓漢式:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

獲取實例:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

通過 ForkJoinPool 構造函數我們可以創建自定義線程池,自定義並行級別(parallelism),線程創建工廠(ThreadFactory),異常處理器(ExceptionHandler)。上面例子中,parallelism參數為2, 表示該線程池將使用2個處理器內核。

3. ForkJoinTask <V>

ForkJoinTask 是我們任務的基類。實際中,我們應該繼承它的兩個子類:無返回值的 RecursiveAction 和帶返回值的 RecursiveTask<V>。兩者都有一個抽象方法 compute(),在里面實現我們的任務執行邏輯。

3.1. RecursiveAction 示例

下面例子中,我們將變量 workload 的所有字母轉為大寫並打印日志。本例僅僅是用於演示目的,這個任務沒有實際意義。

為了演示框架任務分解行為,本例使用 createSubtask() 方法在workload.length()大於設定閾值時分解任務

workload 被遞歸地分解為子串,並創建基於這些子串的 CustomRecursiveTask 實例。

結果返回一個子任務集合 List<CustomRecursiveAction>。

使用 invokeAll() 將集合中的任務提交到 ForkJoinPool

    public class CustomRecursiveAction extends RecursiveAction {
    
        private String workload = "";
        private static final int THRESHOLD = 4;
    
        private static Logger logger = 
          Logger.getAnonymousLogger();
    
        public CustomRecursiveAction(String workload) {
            this.workload = workload;
        }
    
        @Override
        protected void compute() {
            if (workload.length() > THRESHOLD) {
                ForkJoinTask.invokeAll(createSubtasks());
            } else {
               processing(workload);
            }
        }
    
        private List<CustomRecursiveAction> createSubtasks() {
            List<CustomRecursiveAction> subtasks = new ArrayList<>();
    
            String partOne = workload.substring(0, workload.length() / 2);
            String partTwo = workload.substring(workload.length() / 2, workload.length());
    
            subtasks.add(new CustomRecursiveAction(partOne));
            subtasks.add(new CustomRecursiveAction(partTwo));
    
            return subtasks;
        }
    
        private void processing(String work) {
            String result = work.toUpperCase();
            logger.info("This result - (" + result + ") - was processed by " 
              + Thread.currentThread().getName());
        }
    }

可以套用此模版開發我們自己的 RecursiveAction類。創建一個對象表示我們的總任務,選擇一個合適的閾值,定義一個用於分解任務的方法,以及實際處理任務的方法。

3.2. RecursiveTask<V>

對於帶返回值的任務,實現邏輯類似,只是需要把每個子任務的結果合並到一個結果中:

    public class CustomRecursiveTask extends RecursiveTask<Integer> {
        private int[] arr;
    
        private static final int THRESHOLD = 20;
    
        public CustomRecursiveTask(int[] arr) {
            this.arr = arr;
        }
    
        @Override
        protected Integer compute() {
            if (arr.length > THRESHOLD) {
                return ForkJoinTask.invokeAll(createSubtasks())
                  .stream()
                  .mapToInt(ForkJoinTask::join)
                  .sum();
            } else {
                return processing(arr);
            }
        }
    
        private Collection<CustomRecursiveTask> createSubtasks() {
            List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
            dividedTasks.add(new CustomRecursiveTask(
              Arrays.copyOfRange(arr, 0, arr.length / 2)));
            dividedTasks.add(new CustomRecursiveTask(
              Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
            return dividedTasks;
        }
    
        private Integer processing(int[] arr) {
            return Arrays.stream(arr)
              .filter(a -> a > 10 && a < 27)
              .map(a -> a * 10)
              .sum();
        }
    }

本例中,變量 arr 表示我們的任務。createSubtasks() 方法遞歸地將一個大任務分解為小任務,直到小於閾值時不再分解。然后,invokeAll() 方法將子任務提交到公共池,並返回一個 Future 集合。

為了觸發執行,為每個子任務調用 join() 方法。

這里使用了 Java 8 中的 Stream API 實現。sum()方法將子結果合並為最終結果。

4. 提交任務到 ForkJoinPool 中

要將任務提交到 ForkJoinPool 線程池中,可以使用:

submit()execute() 方法 :

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke() 方法 fork 任務並等待返回結果,不需要手動 join 操作。

int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll() 方法將 ForkJoinTask 任務批量提交到 ForkjoinPool。將任務作為參數傳入(該方法有多個重載方法,可以傳2個任務,或變長參數,或集合形式),fork然后按順序返回一個 Future 集合。

或者,你也可以單獨使用 fork() 和 join() 方法。fork() 提交一個任務到線程池中,但不觸發執行。這種情況下,必須手動調用 join方法。如果是 RecursiveAction 類型的任務,join() 返回 null,如果是RecursiveTask<V>類型,則返回任務執行結果。

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在上面 RecursiveTask<V> 例子中我們使用 invokeAll() 批量提交子任務到線程池中。 同樣的工作也可以通過 fork()join() 來完成,不過這會影響結果的排序。

為了避免混淆,通常最好使用 invokeAll() 方法將多個任務提交到 ForkJoinPool

5. 總結

在處理大型任務時,使用 fork/join 框架能加快處理速度。但前提是遵守以下幾個原則:

  • 盡可能少使用線程池 – 大多數場景下,一個應用最好使用一個線程池
  • 使用默認的公共線程池l, 如果不需要特殊調優
  • 使用一個合理的閾值 將 ForkJoinTask 拆分為子任務
  • 避免在ForkJoinTask中編寫阻塞代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM