ForkJoinPool的優勢在於,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之后,再將這些執行結果合並起來即可。
Java7 提供了ForkJoinPool來支持將一個任務拆分成多個“小任務”並行計算,再把多個“小任務”的結果合並成總的計算結果。
ForkJoinPool是ExecutorService的實現類,因此是一種特殊的線程池。
使用方法:創建了ForkJoinPool實例之后,就可以調用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)方法來執行指定任務了。
其中ForkJoinTask代表一個可以並行、合並的任務。ForkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任務,而RecusiveAction代表沒有返回值的任務。
Code:
RecusiveAction實現方法:
package com.qhong.thread.ForkJoinPoolDemo; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ForkJoinPoolDemo extends RecursiveAction { private static final long serialVersionUID = 1L; //定義一個分解任務的閾值——50,即一個任務最多承擔50個工作量 private int THRESHOLD=50; //任務量 private int task_Num=0; ForkJoinPoolDemo(int Num){ this.task_Num=Num; } public static void main (String[] args) throws Exception { //創建一個支持分解任務的線程池ForkJoinPool ForkJoinPool pool=new ForkJoinPool(); ForkJoinPoolDemo task=new ForkJoinPoolDemo(120); pool.submit(task); pool.awaitTermination(20, TimeUnit.SECONDS);//等待20s,觀察結果 pool.shutdown(); } /** * @author qhong * @param * @return * @date 2018/4/18 17:13 * @description 實現recursiveAction中抽象方法 */ @Override protected void compute() { if(task_Num<=THRESHOLD){ System.out.println(Thread.currentThread().getName()+"承擔了"+task_Num+"份工作"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ //隨機解成兩個任務 Random m=new Random(); int x=m.nextInt(50); ForkJoinPoolDemo left=new ForkJoinPoolDemo(x); ForkJoinPoolDemo right=new ForkJoinPoolDemo(task_Num-x); left.fork(); right.fork(); } } }
Output:
ForkJoinPool-1-worker-1承擔了6份工作 ForkJoinPool-1-worker-2承擔了2份工作 ForkJoinPool-1-worker-3承擔了30份工作 ForkJoinPool-1-worker-0承擔了9份工作 ForkJoinPool-1-worker-1承擔了46份工作 ForkJoinPool-1-worker-2承擔了17份工作 ForkJoinPool-1-worker-0承擔了0份工作 ForkJoinPool-1-worker-3承擔了10份工作
RecusiveTask的具體實現:
package com.qhong.thread.ForkJoinPoolDemo; import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.stream.LongStream; /** * @author qhong * @date 2018/4/18 16:14 * @description **/ public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; public ForkJoinCalculator() { // 也可以使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } public static void main(String[] args) { ForkJoinCalculator forkJoinCalculator=new ForkJoinCalculator(); long[] numbers=LongStream.range(1,20).toArray(); System.out.println(Arrays.toString(numbers)); long result=forkJoinCalculator.sumUp(numbers); System.out.println("result:"+result); } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 當需要計算的數字小於6時,直接計算結果 if (to - from < 4) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } System.out.println(String.format("currentThread:%s,total:%s,from:%s,to:%s",Thread.currentThread().getName(),total,from,to)); return total; // 否則,把任務一分為二,遞歸計算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); } }
Output:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] currentThread:ForkJoinPool-1-worker-2,total:6,from:0,to:2 currentThread:ForkJoinPool-1-worker-3,total:36,from:10,to:12 currentThread:ForkJoinPool-1-worker-2,total:9,from:3,to:4 currentThread:ForkJoinPool-1-worker-3,total:29,from:13,to:14 currentThread:ForkJoinPool-1-worker-2,total:21,from:5,to:7 currentThread:ForkJoinPool-1-worker-3,total:70,from:15,to:18 currentThread:ForkJoinPool-1-worker-2,total:19,from:8,to:9 result:190
分析:
根據上面的示例代碼,可以看出 fork()
和 join()
是 Fork/Join Framework “魔法”的關鍵。我們可以根據函數名假設一下 fork()
和 join()
的作用:
fork()
:開啟一個新線程(或是重用線程池內的空閑線程),將任務交給該線程處理。join()
:等待該任務的處理線程處理完畢,獲得返回值。
並不是每個 fork()
都會促成一個新線程被創建,而每個 join()
也不是一定會造成線程被阻塞。
Fork/Join Framework 的實現算法並不是那么“顯然”,而是一個更加復雜的算法——這個算法的名字就叫做 work stealing 算法。
ForkJoinPool
的每個工作線程都維護着一個工作隊列(WorkQueue
),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask
)。- 每個工作線程在運行中產生新的任務(通常是因為調用了
fork()
)時,會放入工作隊列的隊尾,並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。 - 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
- 在遇到
join()
時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。 - 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。
fork()
做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里。
join()
的工作則復雜得多,也是 join()
可以使得線程免於被阻塞的原因——不像同名的 Thread.join()
。
- 檢查調用
join()
的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。 - 查看任務的完成狀態,如果已經完成,直接返回結果。
- 如果任務尚未完成,但處於自己的工作隊列內,則完成它。
- 如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。
- 如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
- 遞歸地執行第5步。
所謂work-stealing模式,即每個工作線程都會有自己的任務隊列。當工作線程完成了自己所有的工作后,就會去“偷”別的工作線程的任務。
假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等着,不如去幫其他線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
submit
其實除了前面介紹過的每個工作線程自己擁有的工作隊列以外,ForkJoinPool
自身也擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread
線程)提交過來的任務,而這些工作隊列被稱為 submitting queue 。
submit()
和 fork()
其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味着提交的任務真正開始進入執行階段。
ForkJoinPool與ThreadPoolExecutor區別:
1.ForkJoinPool中的每個線程都會有一個隊列,而ThreadPoolExecutor只有一個隊列,並根據queue類型不同,細分出各種線程池
2.ForkJoinPool能夠使用數量有限的線程來完成非常多的具有父子關系的任務,ThreadPoolExecutor中根本沒有什么父子關系任務
3.ForkJoinPool在使用過程中,會創建大量的子任務,會進行大量的gc,但是ThreadPoolExecutor不需要,因此單線程(或者任務分配平均)
4.ForkJoinPool在多任務,且任務分配不均是有優勢,但是在單線程或者任務分配均勻的情況下,效率沒有ThreadPoolExecutor高,畢竟要進行大量gc子任務
ForkJoinPool在多線程情況下,能夠實現工作竊取(Work Stealing),在該線程池的每個線程中會維護一個隊列來存放需要被執行的任務。當線程自身隊列中的任務都執行完畢后,它會從別的線程中拿到未被執行的任務並幫助它執行。
ThreadPoolExecutor因為它其中的線程並不會關注每個任務之間任務量的差異。當執行任務量最小的任務的線程執行完畢后,它就會處於空閑的狀態(Idle),等待任務量最大的任務執行完畢。
因此多任務在多線程中分配不均時,ForkJoinPool效率高。
stream中應用ForkJoinPool
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
parallelStream讓部分Java代碼自動地以並行的方式執行
最后:
有一點要注意,就是手動設置ForkJoinPool的線程數量時,實際線程數為設置的線程數+1,因為還有一個main主線程
即使將ForkJoinPool的通用線程池的線程數量設置為1,實際上也會有2個工作線程。因此線程數為1的ForkJoinPool通用線程池和線程數為2的ThreadPoolExecutor是等價的。
與ForkJoinPool對應的是CompletableFuture
Future
以及相關使用方法提供了異步執行任務的能力,但是對於結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。
阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結果
CompletableFuture就是利用觀察者設計模式當計算結果完成及時通知監聽者
在Java 8中, 新增加了一個包含50個方法左右的類: CompletableFuture,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。
具體講解鏈接:http://colobu.com/2016/02/29/Java-CompletableFuture/
http://colobu.com/2018/03/12/20-Examples-of-Using-Java%E2%80%99s-CompletableFuture/
http://www.cnblogs.com/lixuwu/p/7979480.html#undefined
http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/