並發編程之 Fork-Join 分而治之框架


前言

“分而治之” 一直是一個有效的處理大量數據的方法。著名的 MapReduce 也是采取了分而治之的思想。簡單來說,就是如果你要處理1000個數據,但是你並不具備處理1000個數據的能力,那么你可以只處理其中的10個,然后,分階段處理100次,將100次的結果進行合成,那就是最終想要的對原始的1000個數據的處理結果。

Fork & Join 的具體含義

Fork 一詞的原始含義是吃飯用的叉子,也有分叉的意思。在Linux 平台中,函數 fork()用來創建子進程,使得系統進程可以多一個執行分支。在 Java 中也沿用了類似的命名方式。

而 Join() 的含義和 Thread 類的 join 類似,表示等待。也就是使用 fork() 后系統多了一個執行分支(線程),所以需要等待這個執行分支執行完畢,才有可能得到最終的結果,因此 join 就是表示等待。

在實際使用中,如果毫無顧忌的使用 fork 開啟線程進行處理,那么很有可能導致系統開啟過多的線程而嚴重影響性能。所以,在JDK中,給出一個 ForkJoinPool 線程池,對於 fork() 方法並不急着開啟線程,而是提交給 ForkJoiinPool 線程池進行處理,以節省系統資源。

由於線程池的優化,提交的任務和線程數量並不是一對一的關系。在絕大多數情況下,一個物理線程實際上是需要處理多個邏輯任務的。因此,每個線程必然需要擁有一個任務隊列。因此,在實際執行過程中,可能遇到這么一種情況:線程A已經把自己的任務都處理完了,而線程B還有一堆任務等着處理,此時,線程A就會“幫助” 線程B,從線程 B的任務隊列中拿一個任務來處理,盡可能的達到平衡。值得注意的是:當線程試圖幫助別人時,總是從任務隊列的底部開始拿數據,而線程試圖執行自己的任務時,則從相反的頂部開始拿。因此這種行為也十分有利於避免數據競爭。

我們看看線程池 ForkJoinPool 的一個接口:

    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

你可以向 ForkJoinPool 線程池提交一個 ForkJoinTask 任務。所謂 ForkJoinTask 任務就是支持 fork () 分解以及 join()等待的任務。 ForkJoinTask 有兩個重要的子類,RecursiveAction 和 RecursiveTask。他們粉筆表示沒有返回值的任務和可以攜帶返回值的任務。有點像 Rannable 和 Callable。

下面來要給簡單的例子展示 Fork/Join 框架的使用。這里用來計算求和。

/**
 *  Fork/Join 核心思想:分而治之
 *
 * 著名的 MapReduce 也是這個思想。將任務進行分解,然后合並所有的結果。
 *
 */
public class CountTask extends RecursiveTask<Long> {

  /**
   * 閥值
   */
  static final int THRESHOLD = 10000;
  long start;
  long end;

  public CountTask(long start, long end) {
    this.start = start;
    this.end = end;
  }

  /**
   * 有返回值的
   * @return
   */
  @Override
  protected Long compute() {

    long sum = 0;
    // 當閥值小於10000則不分解了
    boolean canCompute = (end - start) < THRESHOLD;
    if (canCompute) {
      for (long i = start; i <= end; i++) {
        sum += i;
      }
    } else {
      // 2000
      long step = (start + end) / 100;
      ArrayList<CountTask> subTasks = new ArrayList<>();
      long pos = start;
      for (int i = 0; i < 100; i++) {
        long lastOne = pos + step;
        if (lastOne > end) {
          lastOne = end;
        }
        //0-2000 個計算任務 * 100
        CountTask subTask = new CountTask(pos, lastOne);
        pos += step + 1;
        subTasks.add(subTask);
        subTask.fork();// fork
      }

      for (CountTask t : subTasks) {
        sum += t.join();
      }
    }
    return sum;

  }

  public static void main(String[] args) {

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    CountTask task = new CountTask(0, 200000L);
    // 將一個大的任務提交到池中
    ForkJoinTask<Long> result = forkJoinPool.submit(task);
    long res = 0;
    try {
      // 等待運算結果
      res = result.get();
      System.out.println("sum = " + res);
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }

  }
}

由於計算求和必須需要返回值,因此我們選擇了 RecursiveTask 作為任務的模型。首先我們構造了一個大任務,提交給線程池,線程池會返回一個攜帶結果的任務,通過 get 方法可以得到最終結果。如果執行 get 方法時任務沒有結束,那么主線程就會在 get 方法等待。

再看看 CountTask 的實現,首先 CountTask 繼承自 RecursiveTask ,可以攜帶返回值,這里的返回值類型設置為 long,定義一個 THRESHOLD 設置了任務分解的規模,也就是如果需要求和的總數大於 THRESHOLD 個,那么任務就需要再次分解,否則就直接執行。 每次分解時,簡單的將原有任務划分成100個規模相等的小任務,並使用 fork() 提交子任務。之后,等待所有的子任務結束,並將結果再次求和。

再使用 ForkJoin的時候注意:如果任務的划分層次很深,一直得不到返回,那么可能出現兩種情況: 第一,系統內的線程數量越來越多,導致性能嚴重下降。第二,函數的調用層次變的很深,最終導致棧溢出。

此外,ForkJoin 線程池使用一個無鎖的棧來管理空閑線程,如果一個工作線程暫時取不到可用的任務,則可能會被掛起,掛起的線程將會被壓入由線程池維護的棧中,待將來有任務可用時,再從棧中喚醒這些線程。

總結

本文來源自 《Java 高並發程序設計》,沒有什么自己的見解。因為使用場景太少了。不過還是可以看看源碼來漲漲姿勢的。嘿嘿。

good luck !!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM