Java中的Fork /Join框架指南


1.概述

fork / join框架在Java 7中提供。它提供了一些工具,通過嘗試使用所有可用的處理器內核來幫助加速並行處理 - 這是通過分而治之的方法實現的

實際上,這意味着框架首先“forks”,遞歸地將任務分解為較小的獨立子任務,直到它們足夠簡單以便異步執行。

之后,“join”部分開始,其中所有子任務的結果以遞歸方式連接到單個結果中,或者在返回void的任務的情況下,程序只是等待直到執行完每個子任務。

為了提供有效的並行執行,fork / join框架使用一個名為ForkJoinPool的線程池,它管理ForkJoinWorkerThread類型的工作線程

2. ForkJoinPool

ForkJoinPool是框架的心臟。它是ExecutorService一個實現,它管理工作線程(ForkJoinWorkerThread)並為我們提供工具來獲取有關線程池狀態和性能的信息。

一個工作線程同一時間只能執行一個任務,但是ForkJoinPool不會為所有子任務創建單獨的線程。

相反,池中的每個線程都有自己的雙端隊列(deque),用於存儲任務。

這種架構對於在工作竊取算法的幫助下平衡線程的工作負載至關重要

大致描述:

  就是建立一個線程池,每個線程有一個任務隊列,

  提交的任務被算法(自己代碼定義的)拆分成很多個小任務(任務實現這兩個接口之一:RecursiveAction或者RecursiveTask),放到各個線程的雙端隊列中,

  每個線程從自己的隊列中取任務執行,自己的隊列執行完畢則會去同線程池的其它線程隊列中取任務來自行。

 

 

圖片來源:https://www.cnblogs.com/cjsblog/p/9078341.html

2.1.工作竊取算法

簡單地說 - 空閑線程試圖從繁忙線程的deques中“竊取”工作。

默認情況下,工作線程從其自己的雙端隊列中獲取任務。

當它為空時,線程從另一個忙線程的雙端隊列尾部或全局入口隊列中獲取任務,因為這是待完成任務可能位於的位置。

這種方法最大限度地減少了線程競爭任務的可能性。它還減少了線程必須尋找工作的次數,因為它首先在最大可用工作塊上工作。

2.2.ForkJoinPool實例化

在Java 8中,訪問ForkJoinPool實例的最方便方法是使用其靜態方法commonPool()顧名思義,這將提供對公共池的引用,公共池是每個ForkJoinTask的默認線程池

根據Oracle的文檔,使用預定義的公共池可以減少資源消耗,因為這會阻止為每個任務創建單獨的線程池。

 
ForkJoinPool commonPool = ForkJoinPool.commonPool();

通過創建ForkJoinPool並將其分配給實用程序類公共靜態字段,可以在Java 7中實現相同的行為

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

使用ForkJoinPool的構造函數,可以創建具有特定級別的並行性,線程工廠和異常處理程序的自定義線程池。在上面的示例中,池的並行度為2.這意味着池將使用2個處理器核心。

3. ForkJoinTask <V>

ForkJoinTaskForkJoinPool中執行的任務的基本類型

在實踐中,它的兩個子類之一應該被繼承:RecursiveAction返回值為空,RecursiveTask <V>有返回值。 

它們都有一個抽象方法compute(),其中定義了任務的邏輯,也就是要覆寫的方法。

3.1.RecursiveAction - 一個例子

在下面的示例中,要處理的工作單元由稱為工作負載String表示出於演示目的,該任務是一個荒謬的任務:它只是j簡單的轉為大寫並打印它。

為了演示框架的分支行為,如果workload.length() 大於指定的閾值則使用createSubtask()方法該示例將分割任務

String被遞歸地划分為子串,創建基於這些子串的CustomRecursiveTask實例。

因此,該方法返回List <CustomRecursiveAction>。

使用invokeAll()方法將列表提交給ForkJoinPool

 
public class CustomRecursiveAction extends RecursiveAction {
 
    private String workload = "";
    private static final int THRESHOLD = 4;
 
    private static Logger logger = 
      Logger.getAnonymousLogger();
 
    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }
 
    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }
 
    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();
 
        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());
 
        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));
 
        return subtasks;
    }
 
    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
          + Thread.currentThread().getName());
    }
}

此模式可用於開發自己的RecursiveAction要執行此操作,請創建一個表示工作總量的對象,選擇合適的閾值,定義分割工作的方法,並定義執行工作的方法。

3.2.RecursiveTask <V>

對於返回值的任務,此處的邏輯類似,除了每個子任務的結果合並在一個結果中:

 
public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;
 
    private static final int THRESHOLD = 20;
 
    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }
 
    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }
 
    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }
 
    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

在此示例中,工作由存儲在CustomRecursiveTaskarr數組中

createSubtask() 方法遞歸地將所述任務分成小塊任務,直到每個任務是小於閾值

然后,invokeAll() 方法將子任務提交給公共線程池並返回Future列表

要觸發執行,為每個子任務調用join()方法。

4.將任務提交給ForkJoinPool

要將任務提交到線程池,可以使用很少的方法。

submit()execute() 方法(它們的使用情況是一樣的):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke()方法拆分任務並等待結果,並且不需要任何手動join:

int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll()方法是提交多個orkJoinTasksForkJoinPool的便捷方式。

它將任務作為參數,forks它們將按照生成它們的順序返回Future對象的集合

或者,您可以使用單獨的fork()join()方法。

fork()方法將任務提交到一個線程池中,但它不會觸發它的執行。

join()方法被用於觸發執行。RecursiveAction的情況下join()只返回null ; 對於RecursiveTask <V>,它返回任務執行的結果:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在我們的RecursiveTask <V>示例中,我們使用invokeAll()方法向池提交一系列子任務。使用fork()join()可以完成相同的工作,但這會對結果的排序產生影響。

為避免混淆,使用invokeAll()方法向ForkJoinPool提交多個任務通常是個好主意

5.結論

使用fork / join框架可以加速處理大型任務,但要實現這一結果,應遵循一些指導原則:

  • 使用盡可能少的線程池 - 在大多數情況下,最好的決定是為每個應用程序或系統使用一個線程池
  • 如果不需要特定調整,請使用默認的公共線程池
  • 使用合理的閾值ForkJoingTask拆分為子任務
  • 避免在 ForkJoingTasks中出現任何阻塞


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM