概述
Fork/Join基於分而治之的算法,它可以將一個大的任務拆分成多個子任務進行並行處理,最后將子任務結果合並成最后的計算結果,並進行輸出。本文中對Fork/Join框架的講解,基於JDK1.8+中的Fork/Join框架實現,參考的Fork/Join框架主要源代碼也基於JDK1.8+。

基本使用
work類
實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,RecursiveTask是有返回值的,Action則沒有。下面是計算一個計算數據和的示例:
public class ForkJoinWork extends RecursiveTask<Long> { private Long start;//起始值 private Long end;//結束值 public static final Long critical = 100000L;//臨界值 public ForkJoinWork(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { // return null; //判斷是否是拆分完畢 Long lenth = end - start; //起始值差值 if (lenth <= critical) { //如果拆分完畢就相加 Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } else { //沒有拆分完畢就開始拆分 Long middle = (end + start) / 2;//計算的兩個值的中間值 ForkJoinWork right = new ForkJoinWork(start, middle); right.fork();//拆分,並壓入線程隊列 ForkJoinWork left = new ForkJoinWork(middle + 1, end); left.fork();//拆分,並壓入線程隊列 //合並 return right.join() + left.join(); } } }
測試
public class ForkJoinWorkTest { @Test public void test() { //ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);//參數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 56418 //ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 10000000000L); } @Test public void test2() { //普通線程實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 64069 } @Test public void test3() { //Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); //long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, (a, b) -> a+b); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 2152 } }
分析
我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當數據很大的時候ForkJoin比普通線程實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快。
Java 8 中將並行流進行了優化,我們可以很容易的對數據進行並行流的操作,Stream API可以聲明性的通過parallel()與sequential()在並行流與串行流中隨意切換!
Fork/Join與傳統線程池的區別
Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!
工作順序圖
下圖展示了以上代碼的工作過程概要,但實際上Fork/Join框架的內部工作過程要比這張圖復雜得多,例如如何決定某一個recursive task是使用哪條線程進行運行;再例如如何決定當一個任務/子任務提交到Fork/Join框架內部后,是創建一個新的線程去運行還是讓它進行隊列等待。
所以如果不深入理解Fork/Join框架的運行原理,只是根據之上最簡單的使用例子觀察運行效果,那么我們只能知道子任務在Fork/Join框架中被拆分得足夠小后,並且其內部使用多線程並行完成這些小任務的計算后再進行結果向上的合並動作,最終形成頂層結果。不急,一步一步來,我們先從這張概要的過程圖開始討論。

圖中最頂層的任務使用submit方式被提交到Fork/Join框架中,后者將前者放入到某個線程中運行,工作任務中的compute方法的代碼開始對這個任務T1進行分析。如果當前任務需要累加的數字范圍過大(代碼中設定的是大於200),則將這個計算任務拆分成兩個子任務(T1.1和T1.2),每個子任務各自負責計算一半的數據累加,請參見代碼中的fork方法。如果當前子任務中需要累加的數字范圍足夠小(小於等於200),就進行累加然后返回到上層任務中。
ForkJoinPool
構造函數
ForkJoinPool有四個構造函數,其中參數最全的那個構造函數如下所示:
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode)
-
parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量,也不要將這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與后者完全不一樣。而后續的討論中,讀者還可以發現Fork/Join框架中可存在的線程數量和這個參數值的關系並不是絕對的關聯(有依據但並不全由它決定)。
-
factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory接口,而是需要實現ForkJoinWorkerThreadFactory接口。后者是一個函數式接口,只需要實現一個名叫newThread的方法。在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory接口實現:DefaultForkJoinWorkerThreadFactory。
-
handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。
-
asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它並不是說Fork/Join框架是采用同步模式還是采用異步模式工作。Fork/Join框架中為每一個獨立工作的線程准備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用后進先出的工作模式。

當asyncMode設置為ture的時候,隊列采用先進先出方式工作;反之則是采用后進先出的方式工作,該值默認為false
...... asyncMode ? FIFO_QUEUE : LIFO_QUEUE, ......
ForkJoinPool還有另外兩個構造函數,一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。
...... private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } ......
如果你對Fork/Join框架沒有特定的執行要求,可以直接使用不帶有任何參數的構造函數。也就是說推薦基於當前操作系統可以使用的CPU內核數作為Fork/Join框架內最大並行任務數量,這樣可以保證CPU在處理並行任務時,盡量少發生任務線程間的運行狀態切換(實際上單個CPU內核上的線程間狀態切換基本上無法避免,因為操作系統同時運行多個線程和多個進程)。
fork方法和join方法
Fork/Join框架中提供的fork方法和join方法,可以說是該框架中提供的最重要的兩個方法,它們和parallelism“可並行任務數量”配合工作,可以導致拆分的子任務T1.1、T1.2甚至TX在Fork/Join框架中不同的運行效果。例如TX子任務或等待其它已存在的線程運行關聯的子任務,或在運行TX的線程中“遞歸”執行其它任務,又或者啟動一個新的線程運行子任務……
fork方法用於將新創建的子任務放入當前線程的work queue隊列中,Fork/Join框架將根據當前正在並發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,決定是讓這個任務在隊列中等待,還是創建一個新的ForkJoinWorkerThread線程運行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運行它。
這里面有幾個元素概念需要注意,ForkJoinTask任務是一種能在Fork/Join框架中運行的特定任務,也只有這種類型的任務可以在Fork/Join框架中被拆分運行和合並運行。ForkJoinWorkerThread線程是一種在Fork/Join框架中運行的特性線程,它除了具有普通線程的特性外,最主要的特點是每一個ForkJoinWorkerThread線程都具有一個獨立的任務等待隊列(work queue),這個任務隊列用於存儲在本線程中被拆分的若干子任務。

join方法用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,如果這個子任務存在於當前線程的任務等待隊列(work queue)中,則取出這個子任務進行“遞歸”執行。其目的是盡快得到當前子任務的運行結果,然后繼續執行。
