前言
前面介紹了ForkJoinPool相關的兩個類ForkJoinTask、ForkJoinWorkerThread,現在開始了解ForkJoinPool。ForkJoinPool也是實現了ExecutorService的線程池。但ForkJoinPool不同於其他類型的ExecutorService,主要是因為它使用了竊取工作機制:池中的所有線程都試圖查找和執行提交給池和/或由其他活動任務創建的任務(如果不存在工作,則最終阻塞等待工作)。但ForkJoinPool並不是為了代替其他兩個線程池,大家所適用的場景各不相同。ForkJoinPool主要是為了執行ForkJoinTask而存在,而ForkJoinTask在上一文已經講過是一種可以將任務進行遞歸分解執行從而提高執行並行度的任務,那么ForkJoinPool線程池當然主要就是為了完成這些可遞歸分解任務的調度執行,加上一些對線程池生命周期的控制,以及提供一些對池的狀態檢查方法(例如getStealCount),用於幫助開發、調優和監視fork/join應用程序。同樣,方法toString以方便的形式返回池狀態的指示,以便進行非正式監視。
由於ForkJoinPool的源碼太長並且其中涉及到的設計實現非常復雜,目前理解有限,只能做大概的原理闡述以及一些使用示例。
ForkJoinPool原理
在原理開始之前,先了解一下其構造方法,其參數最多的一個構造方法如下:
1 public ForkJoinPool(int parallelism, 2 ForkJoinWorkerThreadFactory factory, 3 UncaughtExceptionHandler handler, 4 boolean asyncMode) { 5 //........... 6 }
構造ForkJoinPool可以指定如下四個參數:
parallelism: 並行度,默認為CPU核心數,最小為1。為1的時候相當於單線程執行。
factory:工作線程工廠,用於創建ForkJoinWorkerThread。
handler:處理工作線程運行任務時的異常情況類,默認為null。
asyncMode:是否為異步模式,默認為 false。這里的同步/異步並不是指F/J框架本身是采用同步模式還是采用異步模式工作,而是指其中的工作線程的工作方式。在F/J框架中,每個工作線程(Worker)都有一個屬於自己的任務隊列(WorkQueue),這是一個底層采用數組實現的雙向隊列。同步是指:對於工作線程(Worker)自身隊列中的任務,采用后進先出(LIFO)的方式執行;異步是指:對於工作線程(Worker)自身隊列中的任務,采用先進先出(FIFO)的方式執行。為true的異步模式只在不join任務結果的消息傳遞框架中非常有用,因此一般如果任務的結果需要通過join合並,則該參數都設為false。
創建 ForkJoinPool實例除了使用構造方法外,從JDK8開始,還提供了一個靜態的commonPool(),該方法可以通過指定系統參數的方式(System.setProperty(?,?))定義“並行度、線程工廠和異常處理類”;並且它使用的是同步模式,也就是說可以支持任務合並(join)。使用該方法返回的ForkJoinPool實例一般來說都能滿足大多數的使用場景。
內部數據結構
ForkJoinPool采用了哈希數組 + 雙端隊列的方式存放任務,但這里的任務分為兩類,一類是通過execute、submit 提交的外部任務,另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務,ForkJoinPool並沒有把這兩種任務混在一個任務隊列中,對於外部任務,會利用Thread內部的隨機probe值映射到哈希數組的偶數槽位中的提交隊列中,這種提交隊列是一種數組實現的雙端隊列稱之為Submission Queue,專門存放外部提交的任務。對於ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數組的奇數槽位,每一個工作線程fork/join分解的任務都會被添加到自己擁有的那個工作隊列中。
在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數組,其元素就是內部類WorkQueue實現的基於數組的雙端隊列。該哈希數組的長度為2的冪,並且支持擴容。如下就是該哈希數組的示意結構圖:

如圖,提交隊列位於哈希數組workQueue的奇數索引槽位,工作線程的工作隊列位於偶數槽位,默認情況下,asyncMode為false,因此工作線程把工作隊列當着棧一樣使用,將分解的子任務推入工作隊列的top端,取任務的時候也從top端取(前面關於雙端隊列的介紹中,凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top),而當某些工作線程的任務為空的時候,就會從其他隊列(不限於workQueue,也會是提交隊列)竊取(steal)任務,如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務,竊取任務的時候采用的是先進先出FIFO的策略(即從base端竊取任務),這樣不但可以避免在取任務的時候與擁有其隊列的工作線程發生沖突,從而減小競爭,還可以輔助其完成比較大的任務。而當asyncMode為true的話,擁有該工作隊列的工作線程將按照先進先出的策略從base端取任務,這一般只用於不需要返回結果的任務,或者事件消息傳遞框架。
上圖展示了提交任務與分解的任務在ForkJoinPool內部的組織形式,並且簡單的闡述了工作竊取機制的原理,其實,工作竊取機制的實現過程還包含很多細節,並沒有怎么簡單,例如還有一種“互助機制”,假設工作線程2竊取了工作線程1的任務之后,通過fork/join又分解產生了子任務,這些子任務會進入工作線程2的工作隊列中,這時候如果工作線程1把剩余的任務都完成了,當他發現自己的任務被別人竊取的話,那么它會試着去竊取工作線程2的任務,(你偷了我的,現在我就要偷你的),這就是互助機制。
關於ForkJoinPool的源碼太復雜就不分析了,可以參考如下幾篇文章:https://www.jianshu.com/apps?utm_medium=desktop&utm_source=navbar-apps 和https://www.cnblogs.com/zhuxudong/p/10122688.html
一些public方法
除了同ThreadPoolExecutor線程池一樣重寫了AbstractExecutorService的一些方法例如,submit(異步提交任務返回Future)、execute(異步提交任務無返回值)、invokeAll(批量執行該類沒有重寫)、invokeAny(只要有一個執行結束就返回,該類沒有重寫)等方法之外,ForkJoinPool增加了一些自己特有的用於監視其狀態的方法可供使用者調用,但一般來說這些方法都用不上,除了那些提交任務的方法:
awaitQuiescence,等待線程池空閑
isQuiescent,如果線程池處於空閑狀態,返回true。
getActiveThreadCount,獲取正在執行任務或竊取任務的線程個數。
getQueuedSubmissionCount(),獲取提交提交給線程池但還沒開始執行的任務個數。就是所有提交隊列中任務之和。
hasQueuedSubmissions(),若getQueuedSubmissionCount不為0,返回true,表示存在任務還在提交隊列沒被執行。
getQueuedTaskCount(),返回所有工作線程工作隊列中的所有任務個數。
getRunningThreadCount(),返回正在運行的沒有被阻塞(例如調用Join會被阻塞)的線程個數。
getStealCount(),返回線程從另一個線程的工作隊列中竊取的任務總數的估計值。
getParallelism(),返回線程池的並行度,構造方法中有這個參數。
getAsyncMode(),返回工作線程從其任務隊列中取走任務的模式,默認為false,表示LIFO后進先出,否則是FIFO,構造方法中有這個參數。
使用示例
對於Fork/Join的使用,也是圍繞ForkJoinTask的三個抽象子類的不同作用進行的,我們知道ForkJoinTask的三個子類是RecursiveAction、RecursiveTask和CountedCompleter,分別用於不返回結果,返回結果以及完成觸發指定操作的操作。
對於RecursiveAction的使用,最容易讓人想到的例子就是,最一個集合或者數組中的所有元素進行自增運行的操作:
1 public class IncrementTask extends RecursiveAction { 2 3 static final int THRESHOLD = 10; 4 final long[] array; 5 final int lo, hi; 6 7 IncrementTask(long[] array, int lo, int hi) { // 構造方法,指定數組下標范圍 8 this.array = array; 9 this.lo = lo; 10 this.hi = hi; 11 } 12 13 // 實現抽象類的接口,這個任務所執行的主要計算。 14 protected void compute() { 15 if (hi - lo < THRESHOLD) { // 數組索引區間小於閾值(任務足夠小) 16 for (int i = lo; i < hi; ++i) 17 array[i]++; // 對每個元素自增1 18 } else { 19 int mid = (lo + hi) >>> 1; // 拆分一半 20 invokeAll(new IncrementTask(array, lo, mid), new IncrementTask(array, mid, hi)); // 一起執行 21 } 22 } 23 24 public static void main(String[] args) { 25 long[] array = ... ;//一個很長的數組 26 new IncrementTask(array, 0, array.length).invoke(); //隱式的使用了ForkJoinPool.commonPool() 27 } 28 }
對於RecursiveAction這種不返回結果的並行運算其實還有很多應用場景,比如Java Doc中使用其來對一個數組的元素進行排序,有比如要將一批數據持久化到數據庫等。
1 static class SortTask extends RecursiveAction { 2 final long[] array; final int lo, hi; 3 4 SortTask(long[] array, int lo, int hi) { //構造方法,指定數組下標范圍 5 this.array = array; this.lo = lo; this.hi = hi; 6 } 7 SortTask(long[] array) { this(array, 0, array.length); } //構造方法,默認全部數組 8 9 //實現抽象類的接口,這個任務所執行的主要計算。 10 protected void compute() { 11 if (hi - lo < THRESHOLD) //數組索引區間小於閾值(任務足夠小) 12 sortSequentially(lo, hi); //排序 13 else { 14 int mid = (lo + hi) >>> 1; //拆分一半 15 invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); //一起執行 16 merge(lo, mid, hi); //合並結果 17 } 18 } 19 // implementation details follow: 20 static final int THRESHOLD = 1000; //數組索引最小區間閾值 21 22 void sortSequentially(int lo, int hi) { 23 Arrays.sort(array, lo, hi); 24 } 25 void merge(int lo, int mid, int hi) { //合並結果 26 long[] buf = Arrays.copyOfRange(array, lo, mid); 27 for (int i = 0, j = lo, k = mid; i < buf.length; j++) 28 array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; 29 } 30 }
對於RecursiveTask的使用,也是最常用的,例如求和運算1+2+3+4+…+1000000000。
1 public class SumTask extends RecursiveTask<Long>{ 2 3 private static final int THRESHOLD = 100000; 4 private int start;//開始下標 5 private int end;//結束下標 6 7 8 public SumTask(int start, int end) { 9 this.start = start; 10 this.end = end; 11 } 12 13 private long computeByUnit() { 14 long sum = 0L; 15 for (int i = start; i < end; i++) { 16 sum += i; 17 } 18 return sum; 19 } 20 21 @Override 22 protected Long compute() { 23 //如果當前任務的計算量在閾值范圍內,則直接進行計算 24 if (end - start < THRESHOLD) { 25 return computeByUnit(); 26 } else {//如果當前任務的計算量超出閾值范圍,則進行計算任務拆分 27 //計算中間索引 28 int middle = (start + end) / 2; 29 //定義子任務-迭代思想 30 SumTask left = new SumTask(start, middle); 31 SumTask right = new SumTask(middle, end); 32 //划分子任務-fork 33 left.fork(); 34 //right.fork(); 尾部遞歸消除 35 //合並計算結果-join 36 return left.join() + right.invoke(); 37 } 38 } 39 40 public static void main(String[] args) throws Exception{ 41 //這兩種方式提交都可以 42 long result = new SumTask(1, 1000000001).invoke(); //不響應中斷,必須等待執行完成 43 44 Future<Long> future = ForkJoinPool.commonPool().submit(new SumTask(1, 1000000001)); 45 result = future.get(); //可以被中斷 46 } 47 48 }
提交任務到ForkJoinPool可以直接使用ForkJoinTask的invoke,隱式的使用ForkJoinPool.commonPool()池,也可以顯示的創建ForkJoinPool實例通過submit提交,通過Future.get的方法獲取結果,區別在於后者支持中斷,前者必須等待任務完成才會返回。
對於CountedCompleter的使用,由於該類設計成可以構建任務樹,所以使用起來靈活多變,CountedCompleter類本身只是提供了一種並發執行任務的設計理念,想要使用它必須要對其有很深的理解才行。這里列舉一下Java Doc給出的示例吧:
示例一及其三個改進版本,該示例只是一種簡單的遞歸分解任務,但比起傳統的fork/join來說,其樹形任務結構依然更加可取,因為它們減少了線程間的通信並提高了負載平衡:
1 //版本一 2 class MyOperation { void apply(E e) { ... } } //具體的任務執行邏輯 3 4 class ForEach extends CountedCompleter { 5 6 public static void forEach(E[] array, MyOperation op) { 7 new ForEach(null, array, op, 0, array.length).invoke(); //隱式調用ForkJoinPool.commonPool()執行該任務 8 } 9 10 final E[] array; 11 final MyOperation op; 12 final int lo, hi; 13 14 ForEach(CountedCompleter p, E[] array, MyOperation op, int lo, int hi) { 15 super(p); 16 this.array = array; 17 this.op = op; 18 this.lo = lo; 19 this.hi = hi; 20 } 21 22 public void compute() { // version 1 23 if (hi - lo >= 2) { //分解任務 24 int mid = (lo + hi) >>> 1; 25 setPendingCount(2); // 必須在fork之前設置掛起計數器 26 new ForEach(this, array, op, mid, hi).fork(); // fork右子節點 27 new ForEach(this, array, op, lo, mid).fork(); // fork左子節點 28 } 29 else if (hi > lo) //直接執行足夠小的任務 30 op.apply(array[lo]); 31 tryComplete(); //嘗試完成該任務,要么遞減掛起任務數,已經到0的話,就將父節點的掛起數減1,以此類推 32 } 33 } 34 35 //版本2,尾部遞歸消除 36 class ForEach<E> ... 37 public void compute() { // version 2 38 if (hi - lo >= 2) { 39 int mid = (lo + hi) >>> 1; 40 setPendingCount(1); // 掛起計數器為1 41 new ForEach(this, array, op, mid, hi).fork(); // fork右子節點 42 new ForEach(this, array, op, lo, mid).compute(); // 直接執行左子節點,尾部遞歸消除 43 } 44 else { 45 if (hi > lo) 46 op.apply(array[lo]); 47 tryComplete(); 48 } 49 } 50 } 51 52 //版本3 53 public void compute() { // version 3 54 int l = lo, h = hi; 55 while (h - l >= 2) { //迭代循環fork每一個任務,而不用創建左子節點 56 int mid = (l + h) >>> 1; 57 addToPendingCount(1);//每一次設置掛起任務數為1 58 new ForEach(this, array, op, mid, h).fork(); // right child 59 h = mid; 60 } 61 if (h > l) 62 op.apply(array[l]); 63 propagateCompletion(); //傳播完成 64 } 65 }
示例二,搜索數組:
1 class Searcher extends CountedCompleter { 2 final E[] array; 3 final AtomicReference result; //記錄搜索結果 4 final int lo, hi; 5 6 Searcher(CountedCompleter p, E[] array, AtomicReference result, int lo, int hi) { 7 super(p); 8 this.array = array; this.result = result; this.lo = lo; this.hi = hi; 9 } 10 11 public E getRawResult() { return result.get(); }//返回結果 12 13 public void compute() { // 與ForEach 版本3類似 14 int l = lo, h = hi; 15 while (result.get() == null && h >= l) { //迭代循環創建一個任務 16 if (h - l >= 2) { //分解任務 17 int mid = (l + h) >>> 1; 18 addToPendingCount(1); //設置掛起任務數為1 19 new Searcher(this, array, result, mid, h).fork(); 20 h = mid; 21 } 22 else { //足夠小,只有一個元素了 23 E x = array[l]; 24 //找到我們想要的數據了,設置到result中 25 if (matches(x) && result.compareAndSet(null, x)) 26 quietlyCompleteRoot(); // 根任務現在可以獲取結果了 27 break; 28 } 29 } 30 tryComplete(); // normally complete whether or not found 31 } 32 boolean matches(E e) { ... } // 如果找到了返回true 33 34 public static E search(E[] array) { 35 return new Searcher(null, array, new AtomicReference(), 0, array.length).invoke(); 36 } 37 }
示例三,map-reduce:
1 class MyMapper { E apply(E v) { ... } } 2 3 class MyReducer { E apply(E x, E y) { ... } } 4 5 class MapReducer extends CountedCompleter { 6 final E[] array; final MyMapper mapper; 7 final MyReducer reducer; final int lo, hi; 8 MapReducer sibling; 9 E result; 10 MapReducer(CountedCompleter p, E[] array, MyMapper mapper, MyReducer reducer, int lo, int hi) { 11 super(p); 12 this.array = array; this.mapper = mapper; 13 this.reducer = reducer; this.lo = lo; this.hi = hi; 14 } 15 public void compute() { 16 if (hi - lo >= 2) { //分解任務 17 int mid = (lo + hi) >>> 1; 18 MapReducer left = new MapReducer(this, array, mapper, reducer, lo, mid); 19 MapReducer right = new MapReducer(this, array, mapper, reducer, mid, hi); 20 left.sibling = right; 21 right.sibling = left; 22 setPendingCount(1); // 只有右任務的掛起的 23 right.fork(); //fork又任務 24 left.compute(); // 直接執行左子節點 25 } 26 else { //任務足夠小 27 if (hi > lo) 28 result = mapper.apply(array[lo]); //直接執行 29 tryComplete(); 30 } 31 } 32 public void onCompletion(CountedCompleter caller) { 33 if (caller != this) { 34 MapReducer child = (MapReducer)caller; 35 MapReducer sib = child.sibling; //兄弟任務 36 if (sib == null || sib.result == null) 37 result = child.result; //沒有兄弟任務那就是最小那個任務 38 else 39 //合並兄弟任務的結果 40 result = reducer.apply(child.result, sib.result); 41 } 42 } 43 public E getRawResult() { return result; } //返回結果 44 45 public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) { 46 return new MapReducer(null, array, mapper, reducer, 47 0, array.length).invoke(); 48 } 49 }
示例四,完全遍歷:
1 class MapReducer extends CountedCompleter { // version 2 2 final E[] array; 3 final MyMapper mapper; 4 final MyReducer reducer; final int lo, hi; 5 MapReducer forks, next; // record subtask forks in list 6 E result; 7 MapReducer(CountedCompleter p, E[] array, MyMapper mapper, 8 MyReducer reducer, int lo, int hi, MapReducer next) { 9 super(p); 10 this.array = array; this.mapper = mapper; 11 this.reducer = reducer; this.lo = lo; this.hi = hi; 12 this.next = next; 13 } 14 public void compute() { 15 int l = lo, h = hi; 16 while (h - l >= 2) { //循環創建任務 17 int mid = (l + h) >>> 1; 18 addToPendingCount(1); 19 (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); 20 h = mid; 21 } 22 if (h > l) //任務足夠小,直接執行 23 result = mapper.apply(array[l]); 24 25 // 通過減少和推進子任務鏈接來完成流程 26 //利用firstComplete,nextComplete兩個方法遍歷 27 for (CountedCompleter c = firstComplete(); c != null; c = c.nextComplete()) { 28 for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) 29 t.result = reducer.apply(t.result, s.result); 30 } 31 } 32 public E getRawResult() { return result; } 33 34 public static E mapReduce(E[] array, MyMapper mapper, MyReducer reducer) { 35 return new MapReducer(null, array, mapper, reducer, 36 0, array.length, null).invoke(); 37 } 38 }}
示例五,觸發器:
1 class HeaderBuilder extends CountedCompleter<...> { ... } 2 3 class BodyBuilder extends CountedCompleter<...> { ... } 4 5 class PacketSender extends CountedCompleter<...> { 6 PacketSender(...) { super(null, 1); ... } // trigger on second completion 7 public void compute() { } // never called 8 public void onCompletion(CountedCompleter caller) { sendPacket(); } 9 } 10 11 // sample use: 12 PacketSender p = new PacketSender(); 13 new HeaderBuilder(p, ...).fork(); 14 new BodyBuilder(p, ...).fork();
說實話,這幾個示例除了前面三個,其他的我基本上也沒完全理解,先記錄着吧,關於RecursiveAction、RecursiveTask的更多示例,可以查看Java Doc的類注釋。其實一般來說,利用那兩個簡單的RecursiveAction、RecursiveTask已經足夠解決我們的問題了,在沒有理解CountedCompleter之前,最好不要使用它。
總結
其實要想詳細的理解ForkJoinPool,也就是Fork/Join框架的實現過程是一個非常耗時耗精力的事情,我這里只是記錄了我自己對ForkJoinPool粗淺的理解,所謂總結也就談不上有多深入了,但我們學習這些框架結構的時候,最主要的其實也還是學習其設計思想與理念,並不一定非要每一句代碼都分析到,對於ForkJoinPool線程池和ForkJoinTask、ForkJoinWorkerThread三者實現的Fork/Join框架,目前按我的理解它就是為了利於現有計算機多核心的架構,將一些可以進行分解的任務進行遞歸分解之后並行的執行,有些地方也將這種思想稱之為分治法,並在執行完成之后利於遞歸的原理將任務結果匯聚起來,從而達到快速解決復雜任務的高效編程設計。對於ForkJoinPool其內部采用了很多為了加快並行度而實現的設計思想,例如工作竊取機制,互助機制,垃圾回收機制等等都是我們值得學習借鑒的地方。
