概念
從JDK1.7開始,Java提供ForkJoin框架用於並行執行任務,它的思想就是講一個大任務分割成若干小任務,最終匯總每個小任務的結果得到這個大任務的結果。作為一個並發框架在jdk7的時候就加入到了我們的java並發包java.util.concurrent中,並且在java 8 的lambda並行流中充當着底層框架的角色。
思維導圖

核心類介紹
-
ForkJoinPool:充當fork/join框架里面的管理者,最原始的任務都要交給它才能處理。它負責控制整個fork/join有多少個workerThread,workerThread的創建,激活都是由它來掌控。它還負責workQueue隊列的創建和分配,每當創建一個workerThread,它負責分配相應的workQueue。然后它把接到的活都交給workerThread去處理,它可以說是整個frok/join的容器。
-
ForkJoinWorkerThread:fork/join里面真正干活的"工人",本質是一個線程。里面有一個ForkJoinPool.WorkQueue的隊列存放着它要干的活,接活之前它要向ForkJoinPool注冊(registerWorker),拿到相應的workQueue。然后就從workQueue里面拿任務出來處理。它是依附於ForkJoinPool而存活,如果ForkJoinPool的銷毀了,它也會跟着結束。
-
ForkJoinPool.WorkQueue: 雙端隊列就是它,它負責存儲接收的任務。
-
ForkJoinTask:代表fork/join里面任務類型,我們一般用它的兩個子類RecursiveTask、RecursiveAction。這兩個區別在於RecursiveTask任務是有返回值,RecursiveAction沒有返回值。任務的處理邏輯包括任務的切分都集中在compute()方法里面。
代碼實現
我們以將一個0到10000的List集合的每個元素進行打印為例:
實現類:
package com.forkjoin.test.task; import java.util.List; import java.util.concurrent.RecursiveTask; /** * forkjoin拆分任務實現 */ public class ForkJoinTask extends RecursiveTask<List<Integer>> { private List<Integer> integerList; public ForkJoinTask(List<Integer> integerList) { this.integerList = integerList; } @Override protected List<Integer> compute() { System.out.println("數據條數:" + integerList.size()); // 當數據條數大於100時,將任務拆分 if(integerList.size() > 100) { int minSize = integerList.size() / 2; List<Integer> leftList = integerList.subList(0, minSize); List<Integer> rightList = integerList.subList(minSize, integerList.size()); // 拆分出兩個任務 並行執行 ForkJoinTask leftTask = new ForkJoinTask(leftList); ForkJoinTask rightTask = new ForkJoinTask(rightList); this.invokeAll(leftTask,rightTask); }else { // 一直到拆分出的數據不大於100條時,執行任務 integerList.stream().forEach(x -> { System.out.println(x.toString()); }); } return null; } }
調用類:
package com.forkjoin.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import com.forkjoin.test.task.ForkJoinTask; public class ForkJoinTest { public static void main(String[] args) { // 生成一個0-10000的整數集合 List<Integer> list = new ArrayList<>(); for (int i = 0; i <= 10000; i++) { list.add(i); } // forkjoin拆分任務調用 ForkJoinTask task = new ForkJoinTask(list); ForkJoinPool pool=ForkJoinPool.commonPool(); pool.submit(task); pool.shutdown(); } }
