前言
這是Java並發包提供的最后一個線程池實現,也是最復雜的一個線程池。針對這一部分的代碼太復雜,由於目前理解有限,只做簡單介紹。通常大家說的Fork/Join框架其實就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實現其三個抽象子類)為任務、ForkJoinWorkerThread作為執行任務的具體線程實體這三者構成的任務調度機制。通俗的說,ForkJoin框架的作用主要是為了實現將大型復雜任務進行遞歸的分解,直到任務足夠小才直接執行,從而遞歸的返回各個足夠小的任務的結果匯集成一個大任務的結果,依次類推最終得出最初提交的那個大型復雜任務的結果,這和方法的遞歸調用思想是一樣的。當然ForkJoinPool線程池為了提高任務的並行度和吞吐量做了非常多而且復雜的設計實現,其中最著名的就是任務竊取機制。
對照前面介紹的ThreadPoolExecutor執行的任務是Future的實現類FutureTask、執行線程的實體是內部類Worker,ForkJoinPool執行的任務就是Future的實現類ForkJoinTask、執行線程就是ForkJoinWorkerThread。
ForkJoinWorkerThread
該類直接繼承了Thread,但是僅僅是為了增加一些額外的功能,並沒有對線程的調度執行做任何更改。ForkJoinWorkerThread是被ForkJoinPool管理的工作線程,在創建出來之后都被設置成為了守護線程,由它來執行ForkJoinTasks。該類主要為了維護創建線程實例時通過ForkJoinPool為其創建的任務隊列,與其他兩個線程池整個線程池只有一個任務隊列不同,ForkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實現任務竊取機制,該隊列被設計成一個雙端隊列,而ForkJoinWorkerThread的首要任務就是執行自己的這個雙端任務隊列中的任務,其次是竊取其他線程的工作隊列,以下是其代碼片段:

1 public class ForkJoinWorkerThread extends Thread { 2 3 final ForkJoinPool pool; // 這個線程工作的ForkJoinPool池 4 final ForkJoinPool.WorkQueue workQueue; // 這個線程擁有的工作竊取機制的工作隊列 5 6 //創建在給定ForkJoinPool池中執行的ForkJoinWorkerThread。 7 protected ForkJoinWorkerThread(ForkJoinPool pool) { 8 // Use a placeholder until a useful name can be set in registerWorker 9 super("aForkJoinWorkerThread"); 10 this.pool = pool; 11 this.workQueue = pool.registerWorker(this); //向ForkJoinPool執行池注冊當前工作線程,ForkJoinPool為其分配一個工作隊列 12 } 13 14 //該工作線程的執行內容就是執行工作隊列中的任務 15 public void run() { 16 if (workQueue.array == null) { // only run once 17 Throwable exception = null; 18 try { 19 onStart(); 20 pool.runWorker(workQueue); //執行工作隊列中的任務 21 } catch (Throwable ex) { 22 exception = ex; //記錄異常 23 } finally { 24 try { 25 onTermination(exception); 26 } catch (Throwable ex) { 27 if (exception == null) 28 exception = ex; 29 } finally { 30 pool.deregisterWorker(this, exception); //撤銷工作 31 } 32 } 33 } 34 } 35 36 ..... 37 }
ForkJoinTask
與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類,其實現過程中與ForkJoinPool相互交叉,因此其源碼在不理解ForkJoinPool的情況下很難全部看明白,這里只了解大概,ForkJoinTask的作用就是根據任務的分解實現(exec抽象方法),將任務進行拆分,並等待子任務的執行結果,由此可以組合成父任務的結果,以此類推。
ForkJoinTask有一個int類型的status字段,其高16位存儲任務執行狀態例如NORMAL、CANCELLED或EXCEPTIONAL,低16位預留用於用戶自定義的標記。任務未完成之前status大於等於0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小於0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.

1 public abstract class ForkJoinTask<V> implements Future<V>, Serializable { 2 3 /** 該任務的執行狀態 */ 4 volatile int status; // accessed directly by pool and workers 5 static final int DONE_MASK = 0xf0000000; // mask out non-completion bits 6 static final int NORMAL = 0xf0000000; // must be negative 7 static final int CANCELLED = 0xc0000000; // must be < NORMAL 8 static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED 9 static final int SIGNAL = 0x00010000; // must be >= 1 << 16 10 static final int SMASK = 0x0000ffff; // short bits for tags 11 12 // 異常哈希表 13 14 //被任務拋出的異常數組,為了報告給調用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數組。注意,取消異常不會出現在數組,而是記錄在statue字段中 15 //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。 16 private static final ExceptionNode[] exceptionTable; //異常哈希鏈表數組 17 private static final ReentrantLock exceptionTableLock; 18 private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應的異常節點對象的引用隊列 19 20 /** 21 * 固定容量的exceptionTable. 22 */ 23 private static final int EXCEPTION_MAP_CAPACITY = 32; 24 25 26 //異常數組的鍵值對節點。 27 //該哈希鏈表數組使用線程id進行比較,該數組具有固定的容量,因為它只維護任務異常足夠長,以便參與者訪問它們,所以在持續的時間內不應該變得非常大。但是,由於我們不知道最后一個joiner何時完成,我們必須使用弱引用並刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變為isQuiescent時都會調用helpExpungeStaleExceptions 28 static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> { 29 final Throwable ex; 30 ExceptionNode next; 31 final long thrower; // 拋出異常的線程id 32 final int hashCode; // 在弱引用消失之前存儲hashCode 33 ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) { 34 super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節點加入隊列exceptionTableRefQueue 35 this.ex = ex; 36 this.next = next; 37 this.thrower = Thread.currentThread().getId(); 38 this.hashCode = System.identityHashCode(task); 39 } 40 } 41 42 ................. 43 }
除了status記錄任務的執行狀態之外,其他字段主要是為了對任務執行的異常的處理,ForkJoinTask采用了哈希數組 + 鏈表的數據結構(JDK8以前的HashMap實現方法)存放所有(以為這些字段是static)的ForkJoinTask任務的執行異常。
fork--安排任務異步執行
源碼很簡單,就不貼了,該方法其實就是將任務通過push方法加入到當前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務),等待被線程池調度執行,這是一個非阻塞的立即返回方法。這里需要知道,ForkJoinPool線程池通過哈希數組+雙端隊列的方式將所有的工作線程擁有的任務隊列和從外部提交的任務分別映射到哈希數組的不同槽位上,下一篇會介紹。將新任務始終push到隊列一端的方式可以保證比較大的任務在隊列的頭部,越小的任務越在尾部,這時候擁有該任務隊列的線程如果按照先進后出的方式pop彈出任務執行的話(這時候的任務隊列就是當着棧來使用),將會是先從小任務開始,逐漸往大任務進行。而竊取任務的其他線程從對列頭部開始竊取的話,將會幫助它完成大任務。
join---等待執行結果

1 //當計算完成時返回計算結果。此方法與get()的不同之處在於,異常完成會導致RuntimeException或Error,而不是ExecutionException,調用線程被中斷不會通過拋出InterruptedException導致方法突然返回。 2 public final V join() { 3 int s; 4 if ((s = doJoin() & DONE_MASK) != NORMAL) 5 reportException(s); //非正常結束,拋出相關的異常堆棧信息 6 return getRawResult(); //正常結束,返回結果 7 } 8 9 //等待任務執行結束並返回其狀態status,該方法實現了join, get, quietlyJoin. 直接處理已經完成的,外部等待和unfork+exec的情況,其它情況轉發到ForkJoinPool.awaitJoin 10 //如果 status < 0 則返回s; 11 //否則,若不是ForkJoinWorkerThread ,則等待 externalAwaitDone() 返回 12 //否則,若 (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 則 返回s; 13 //否則,返回 wt.pool.awaitJoin(w, this, 0L) 14 private int doJoin() { 15 int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; 16 return (s = status) < 0 ? s : //status為負數表示任務已經執行結束,直接返回status。 17 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 18 (w = (wt = (ForkJoinWorkerThread)t).workQueue). 19 tryUnpush(this) && (s = doExec()) < 0 ? s : //調用pool的執行邏輯,並等待返回執行結果狀態 20 wt.pool.awaitJoin(w, this, 0L) : //調用pool的等待機制 21 externalAwaitDone(); //不是ForkJoinWorkerThread, 22 } 23 24 //拋出與給定狀態關聯的異常(如果有),被取消是CancellationException。 25 private void reportException(int s) { 26 if (s == CANCELLED) 27 throw new CancellationException(); 28 if (s == EXCEPTIONAL) 29 rethrow(getThrowableException()); 30 } 31 32 public abstract V getRawResult(); 33 34 //返回給定任務的執行異常(如果有的話),為了提供准確的異常堆棧信息,若異常不是由當前線程拋出的,將嘗試以記錄的異常為原因創建一個與拋出異常類型相同的新異常。 35 //如果沒有那樣的構造方法將嘗試使用無參的構造函數,並通過設置initCause方法以達到同樣的效果,盡管它可能包含誤導的堆棧跟蹤信息。 36 private Throwable getThrowableException() { 37 if ((status & DONE_MASK) != EXCEPTIONAL) 38 return null; 39 40 //1. 通過當前任務對象的哈希值到哈希鏈表數組中找到相應的異常節點 41 int h = System.identityHashCode(this); //當前任務的hash值 42 ExceptionNode e; 43 final ReentrantLock lock = exceptionTableLock; 44 lock.lock(); //加鎖 45 try { 46 expungeStaleExceptions(); //清理被GC回收的任務的異常節點 47 ExceptionNode[] t = exceptionTable; 48 e = t[h & (t.length - 1)]; //通過取模對應得索引獲取哈希數組槽位中得節點 49 while (e != null && e.get() != this) 50 e = e.next; //遍歷找到當前任務對應的異常節點 51 } finally { 52 lock.unlock(); 53 } 54 Throwable ex; 55 if (e == null || (ex = e.ex) == null) //表示沒有出現任何異常 56 return null; 57 if (e.thrower != Thread.currentThread().getId()) { //有異常但是不是由當前線程拋出的 58 Class<? extends Throwable> ec = ex.getClass(); 59 try { 60 Constructor<?> noArgCtor = null; 61 Constructor<?>[] cs = ec.getConstructors();// public ctors only 62 //通過反射找到構造方法,並構造新異常 63 for (int i = 0; i < cs.length; ++i) { 64 Constructor<?> c = cs[i]; 65 Class<?>[] ps = c.getParameterTypes(); 66 if (ps.length == 0) 67 noArgCtor = c; //記錄下無參構造方法,以備沒有找到期望的構造方法時使用 68 else if (ps.length == 1 && ps[0] == Throwable.class) { 69 Throwable wx = (Throwable)c.newInstance(ex); //發現了我們期望的Throwable類型的參數的構造方法 70 return (wx == null) ? ex : wx; 71 } 72 } 73 if (noArgCtor != null) { //沒有找到期望的構造方法,只能通過無參構造方法創建新異常 74 Throwable wx = (Throwable)(noArgCtor.newInstance()); 75 if (wx != null) { 76 wx.initCause(ex); //將原始異常設置進去 77 return wx; 78 } 79 } 80 } catch (Exception ignore) { 81 } 82 } 83 return ex; 84 } 85 86 87 88 //清除哈希鏈表數組中已經被GC回收掉的任務的異常節點。從exceptionTableRefQueue節點引用隊列中獲取異常節點並移除哈希鏈表數組中得對應節點 89 private static void expungeStaleExceptions() { 90 for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { 91 if (x instanceof ExceptionNode) { 92 int hashCode = ((ExceptionNode)x).hashCode; //節點hash 93 ExceptionNode[] t = exceptionTable; 94 int i = hashCode & (t.length - 1); //取模得到哈希表索引 95 ExceptionNode e = t[i]; 96 ExceptionNode pred = null; 97 while (e != null) { 98 ExceptionNode next = e.next; 99 if (e == x) { //找到了目標節點 100 if (pred == null) 101 t[i] = next; 102 else 103 pred.next = next; 104 break; 105 } 106 pred = e; //往后遍歷鏈表 107 e = next; 108 } 109 } 110 } 111 } 112 113 114 //竊取任務的主要執行方法,除非已經完成了,否則調用exec()並記錄完成時的狀態。 115 final int doExec() { 116 int s; boolean completed; 117 if ((s = status) >= 0) { //任務還未完成 118 try { 119 completed = exec(); 調用exec()並記錄完成時的狀態。 120 } catch (Throwable rex) { 121 return setExceptionalCompletion(rex); //記錄異常並返回相關狀態,並喚醒通過join等待此任務的線程。 122 } 123 if (completed) 124 s = setCompletion(NORMAL); //更新狀態為正常結束,並喚醒通過join等待此任務的線程。 125 } 126 return s; 127 } 128 129 //立即執行此任務的基本操作。返回true表示該任務已經正常完成,否則返回false表示此任務不一定完成(或不知道是否完成)。 130 //此方法還可能拋出(未捕獲的)異常,以指示異常退出。此方法旨在支持擴展,一般不應以其他方式調用。 131 protected abstract boolean exec(); 132 133 //等待未完成的非ForkJoinWorkerThread線程提交的任務執行結束,並返回任務狀態status 134 private int externalAwaitDone() { 135 136 //若是CountedCompleter任務,等待ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) 返回 137 //否則,若ForkJoinPool.common.tryExternalUnpush(this),返回 doExec() 結果; 138 //否則,返回0 139 int s = ((this instanceof CountedCompleter) ? // try helping 140 ForkJoinPool.common.externalHelpComplete( 141 (CountedCompleter<?>)this, 0) : //輔助完成外部提交的CountedCompleter任務 142 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); //輔助完成外部提交的非CountedCompleter任務 143 if (s >= 0 && (s = status) >= 0) { //表示任務還沒結束,需要阻塞等待。 144 boolean interrupted = false; 145 do { 146 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //標記有線程需要被喚醒 147 synchronized (this) { 148 if (status >= 0) { 149 try { 150 wait(0L); //任務還沒結束,無限期阻塞直到被喚醒 151 } catch (InterruptedException ie) { 152 interrupted = true; 153 } 154 } 155 else 156 notifyAll(); //已經結束了喚醒所有阻塞的線程 157 } 158 } 159 } while ((s = status) >= 0); 160 if (interrupted) 161 Thread.currentThread().interrupt(); //恢復中斷標識 162 } 163 return s; 164 } 165 166 167 //記錄異常,更新status狀態,喚醒所有等待線程 168 private int setExceptionalCompletion(Throwable ex) { 169 int s = recordExceptionalCompletion(ex); 170 if ((s & DONE_MASK) == EXCEPTIONAL) 171 internalPropagateException(ex); //調用鈎子函數傳播異常 172 return s; 173 } 174 175 /** 176 * 對任務異常結束的異常傳播支持的鈎子函數 177 */ 178 void internalPropagateException(Throwable ex) { 179 } 180 181 //記錄異常並設置狀態status 182 final int recordExceptionalCompletion(Throwable ex) { 183 int s; 184 if ((s = status) >= 0) { 185 int h = System.identityHashCode(this); //哈希值 186 final ReentrantLock lock = exceptionTableLock; 187 lock.lock(); //加鎖 188 try { 189 expungeStaleExceptions(); 190 ExceptionNode[] t = exceptionTable; 191 int i = h & (t.length - 1); 192 for (ExceptionNode e = t[i]; ; e = e.next) { 193 if (e == null) { //遍歷完了都沒找到,說明哈希鏈表數組中不存在該任務對於的異常節點 194 t[i] = new ExceptionNode(this, ex, t[i]); //創建一個異常節點用頭插法插入哈希鏈表數組 195 break; 196 } 197 if (e.get() == this) // 哈希鏈表數組中已經存在相應的異常節點,退出 198 break; 199 } 200 } finally { 201 lock.unlock(); 202 } 203 s = setCompletion(EXCEPTIONAL); 204 } 205 return s; 206 } 207 208 //標記任務完成標志,並喚醒通過join等待此任務的線程。 209 private int setCompletion(int completion) { 210 for (int s;;) { 211 if ((s = status) < 0) 212 return s; 213 if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { //更新狀態 214 if ((s >>> 16) != 0) 215 synchronized (this) { notifyAll(); } //喚醒所有等待線程 216 return completion; 217 } 218 } 219 }
join方法就是ForkJoinTask最核心也最復雜的方法,就是等待任務執行結束並返回執行結果,若任務被取消拋出CancellationException異常,若是其他異常導致異常結束則拋出相關RuntimeException或Error信息,這些異常還可能包括由於內部資源耗盡而導致的RejectedExecutionException,比如分配內部任務隊列失敗。異常的處理利用了另一個哈希數組 + 鏈表的結構。該方法不會由於線程被中斷而拋出InterruptedException異常,而是會在等到任務執行結束之后再將中斷狀態復位。
該方法的執行過程中調用了一些未實現的抽象方法:exec方法就是執行任務的入口,任務的邏輯與拆分策略都由該方法實現,只有返回true才表示任務正常完成。該方法可以拋出異常以指示異常結束。getRawResult方法用於返回任務正常結束的執行結果。internalPropagateException方法則是當任務異常的回調鈎子函數。一般來講,我們都會在exec方法里面實現如下的貌似遞歸的拆分邏輯(偽代碼):
1 if 任務足夠小 then 2 執行任務; 3 返回結果; 4 else 5 拆分成兩個子任務t1、t2 6 t1.fork(); //提交到任務隊列 7 t2.fork(); //提交到任務隊列 8 Object result = t1.join() + t2.join(); //合並結果,這里的加號僅僅代表合並結果,並不是做加法運行 9 return result; //返回最終結果
我們知道,fork負責將任務推入隊列,等待被調度執行,join則是等待執行任務,並返回結果,而join在執行任務的時候最終就是調用的exec,而exec中任務已經足夠小就直接執行,否則會拆分任務之后通過fork將拆分出的子任務再次加入隊列,其子任務執行的時候依然會執行exec(假設子任務的exec也是這樣的實現),到時候又會繼續拆分,或者足夠小就直接執行,兩個子任務合並結果之后是其父任務的結果,兩個父任務的結果又合並成祖父任務的結果,以此類推就是遞歸的完成了整個任務。
get---獲取異步任務結果
既然ForkJoinTask也是Future的子類,那么Future最重要的獲取異步任務結果的get方法也必然要實現:

1 //如果需要,等待計算完成,然后檢索其結果。 2 public final V get() throws InterruptedException, ExecutionException { 3 int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : //是ForkJoinWorkerThread,執行doJoin 4 externalInterruptibleAwaitDone(); //執行externalInterruptibleAwaitDone 5 Throwable ex; 6 if ((s &= DONE_MASK) == CANCELLED) 7 throw new CancellationException(); //被取消的拋出CancellationException 8 if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) 9 throw new ExecutionException(ex); //執行中出現異常的拋出相應的異常 10 return getRawResult(); //返回正常結果 11 } 12 13 //阻塞非ForkJoinWorkerThread線程,直到完成或中斷。 14 private int externalInterruptibleAwaitDone() throws InterruptedException { 15 int s; 16 if (Thread.interrupted()) 17 throw new InterruptedException(); 18 if ((s = status) >= 0 && 19 (s = ((this instanceof CountedCompleter) ? 20 ForkJoinPool.common.externalHelpComplete( 21 (CountedCompleter<?>)this, 0) : 22 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 23 0)) >= 0) { //根據不同的任務類型 返回執行或暫時等待被執行的狀態 24 while ((s = status) >= 0) { //需要阻塞等待 25 if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { 26 synchronized (this) { 27 if (status >= 0) 28 wait(0L); //阻塞等待 29 else 30 notifyAll(); //喚醒所有等待線程 31 } 32 } 33 } 34 } 35 return s; 36 }
get方法也是通過實現join方法的doJoin方法實現的,不同的是,調用get方法的線程如果被中斷的話,get方法會立即拋出InterruptedException異常,而join方法則不會;另外任務異常完成的的相關異常,get方法會將相關異常都封裝成ExecutionException異常,而join方法則是原樣拋出相關的異常不會被封裝成ExecutionException異常。get方法采用的wait/notifyAll這種線程通信機制來實現阻塞與喚醒。另外還有超時版本的get方法這里就不貼代碼了,由此可見get支持可中斷和/或定時等待完成。
invoke---立即執行任務,並等待返回結果

1 //開始執行此任務,如果需要等待其完成,並返回其結果,如果底層執行此任務時出現異常,則拋出相應的(未捕獲的)RuntimeException或Error。 2 public final V invoke() { 3 int s; 4 if ((s = doInvoke() & DONE_MASK) != NORMAL) 5 reportException(s); 6 return getRawResult(); 7 } 8 9 // invoke, quietlyInvoke的實現 10 private int doInvoke() { 11 int s; Thread t; ForkJoinWorkerThread wt; 12 return (s = doExec()) < 0 ? s : //執行此任務,完成返回其status 13 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //若未完成或需要等待就根據不同任務類型執行不同的等待邏輯 14 (wt = (ForkJoinWorkerThread)t).pool. 15 awaitJoin(wt.workQueue, this, 0L) : 16 externalAwaitDone(); 17 }
invoke的實現會利用當前調用invoke的線程立即執行exec方法,當然如果exec方法的實現使用了fork/join,其還是會利用ForkJoinPool線程池的遞歸調度執行策略,等待子任務執行完成,一步步的合並成最終的任務結果,並返回。值得注意的是,該方法不會因為線程被中斷而立即返回,而必須在等到任務執行有了結果之后才會對中斷狀態進行補償。
invokeAll----批量執行任務,並等待它們執行結束

1 //執行兩個任務 2 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) { 3 int s1, s2; 4 t2.fork(); //t2任務交給線程池調度執行 5 if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //t1任務立即由當前線程執行 6 t1.reportException(s1); //若t1異常結束,則拋出異常,包括被取消的CancellationException 7 if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2執行結束 8 t2.reportException(s2); //若t2異常結束,則拋出異常,包括被取消的CancellationException 9 } 10 11 //執行任務數組 12 public static void invokeAll(ForkJoinTask<?>... tasks) { 13 Throwable ex = null; 14 int last = tasks.length - 1; 15 for (int i = last; i >= 0; --i) { 16 ForkJoinTask<?> t = tasks[i]; 17 if (t == null) { 18 if (ex == null) //都不能為null 19 ex = new NullPointerException(); 20 } 21 else if (i != 0) 22 t.fork(); //除了第一個任務都交給線程池調度執行 23 else if (t.doInvoke() < NORMAL && ex == null) //由當前線程執行第一個任務 24 ex = t.getException(); //記錄第一個任務的異常 25 } 26 for (int i = 1; i <= last; ++i) { 27 ForkJoinTask<?> t = tasks[i]; 28 if (t != null) { 29 if (ex != null) //第一個任務異常結束,取消其他所有任務 30 t.cancel(false); 31 else if (t.doJoin() < NORMAL) //有任務異常結束,記錄異常 32 ex = t.getException(); 33 } 34 } 35 if (ex != null) 36 rethrow(ex); //若有任務異常結束,拋出數組最前面那個異常結束的任務的異常 37 } 38 39 //批量執行任務,返回每個任務對應的ForkJoinTask實例, 40 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) { 41 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) { 42 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); //將任務封裝成ForkJoinTask,調用上面那個方法實現 43 return tasks; 44 } 45 //下面的邏輯與上面那個invokeAll也是一樣的。 46 @SuppressWarnings("unchecked") 47 List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks; 48 Throwable ex = null; 49 int last = ts.size() - 1; 50 for (int i = last; i >= 0; --i) { 51 ForkJoinTask<?> t = ts.get(i); 52 if (t == null) { 53 if (ex == null) 54 ex = new NullPointerException(); 55 } 56 else if (i != 0) 57 t.fork(); 58 else if (t.doInvoke() < NORMAL && ex == null) 59 ex = t.getException(); 60 } 61 for (int i = 1; i <= last; ++i) { 62 ForkJoinTask<?> t = ts.get(i); 63 if (t != null) { 64 if (ex != null) 65 t.cancel(false); 66 else if (t.doJoin() < NORMAL) 67 ex = t.getException(); 68 } 69 } 70 if (ex != null) 71 rethrow(ex); 72 return tasks; 73 }
批量任務的執行其實現都是排在前面的任務(只有兩個參數是,第一個參數就是排在前面的任務,是數組或者隊列時,索引越小的就是排在越前面的)由當前線程執行,后面的任務交給線程池調度執行,如果有多個任務都出現異常,只會拋出排在最前面那個任務的異常。
quietlyInvoke(),quietlyJoin()----不需要執行結果的invoke和join
源碼就不貼了,quietlyInvoke(),quietlyJoin()這兩個方法就僅僅了是調用了doInvoke和doJoin,然后就沒有然后了,它們就是不關心執行結果版本的invoke和Join,當然異常結束的也不會將異常拋出來,當執行一組任務並且需要將結果或異常的處理延遲到全部任務完成時,這可能很有用。
cancel---嘗試取消任務的執行
public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; }
其主要通過setCompletion標記尚未完成的任務的狀態為CANCELLED,並喚醒通過join等待此任務的線程。已經執行完成的任務無法被取消,返回true表示取消成功。注意該方法傳入的mayInterruptIfRunning並沒有使用,因此,ForkJoinTask不支持在取消任務時中斷已經開始執行的任務,當然ForkJoinTask的子類可以重寫實現。
tryUnfork---取消fork,即從任務隊列中移除任務

1 //取消任務的執行計划。如果此任務是當前線程最近才剛剛通過fork安排執行,並且尚未在另一個線程中開始執行,則此方法通常會成功,但也不是100%保證會成功。 2 public boolean tryUnfork() { 3 Thread t; 4 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 5 ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //針對ForkJoinWorkerThread的取消邏輯 6 ForkJoinPool.common.tryExternalUnpush(this)); //針對外部提交任務的取消邏輯 7 }
tryUnfork嘗試將該任務從任務隊列中彈出,彈出之后線程池自然不會再調度該任務。該方法的實現只會在任務剛剛被推入任務隊列,並且還處於任務隊列的棧頂時才可能會成功,否則100%失敗。
reinitialize---重新初始化該任務

1 public void reinitialize() { 2 if ((status & DONE_MASK) == EXCEPTIONAL) //有異常 3 clearExceptionalCompletion(); //從哈希鏈表數組中移除當前任務的異常節點,並將status重置為0 4 else 5 status = 0; 6 }
如果任務異常結束,會從異常哈希表中清除該任務的異常記錄,該方法僅僅是將任務狀態status重置為0,使得該任務可以被重新執行。
任務的完成狀態查詢----isDone、isCompletedNormally、isCancelled、isCompletedAbnormally
任務的執行狀態可以在多個詳細級別上查詢:
- 如果任務以任何方式完成(包括任務在未執行的情況下被取消),則isDone為true。
- 如果任務在沒有取消或沒有遇到異常的情況下完成,則 isCompletedNormally 為true。
- 如果任務被取消(在這種情況下getException方法返回一個CancellationException),則 isCancelled 為true。
- 如果任務被取消或遇到異常,則isCompletedAbnormally異常為true,在這種情況下,getException將返回遇到的異常或java.util.concurrent.CancellationException。
為Runnable和Callable提供的adapt方法
adapt方法主要是為了兼容傳統的Runnable和Callable任務,通過adapt方法可以將它們封裝成ForkJoinTask任務,當將 ForkJoinTask 與其他類型的任務混合執行時,可以使用這些方法。
其他一些方法
getPool可以返回執行該任務的線程所在的線程池實例,inForkJonPool可以判定當前任務是否是由ForkJoinWorkerThread線程提交的,一般來說這意味着當前任務是內部拆分之后的子任務。
getQueuedTaskCount方法返回已經通過fork安排給當前工作線程執行,但還沒有被執行的任務數量,該值是一個瞬間值。因為工作線程調度執行的任務通過fork提交的任務還是進入的該工作線程的任務隊列,因此可以通過該任務得知該值。
其它一些方法:

1 //可能會在承載當前任務的執行池處於靜默(空閑)狀態時執行任務。這個方法可能在有很多任務都通過fork被安排執行,但是一個顯示的join調用都沒有,直到它們都被執行完的設計中使用。 2 //其實就是如果有一批任務被安排執行,並且不知道它們什么時候結束,如果希望在這些任務都執行結束之后再安排一個任務,就可以使用helpQuiesce。 3 public static void helpQuiesce() { 4 Thread t; 5 //根據執行線程的不同類型,調用不同的靜默執行邏輯 6 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { 7 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t; 8 wt.pool.helpQuiescePool(wt.workQueue); 9 } 10 else 11 ForkJoinPool.quiesceCommonPool(); 12 } 13 14 //返回被當前工作線程持有的任務數a比其它可能竊取其任務的其它工作線程持有的任務數b多多少的估計值,就是 a - b 的差值。若當前工作線程不是在ForkJoinPool中,則返回0 15 //通常該值被恆定在一個很小的值3,若超過這個閾值,則就在本地處理。 16 public static int getSurplusQueuedTaskCount() { 17 return ForkJoinPool.getSurplusQueuedTaskCount(); 18 } 19 20 //獲取但不移除(即不取消執行計划)安排給當前線程的可能即將被執行的下一個任務。但不能保證該任務將在接下來實際被立即執行。該方法可能在即使任務存在但因為競爭而不可訪問而返回null 21 //該方法主要是為了支持擴展,否則可能不會被使用。 22 protected static ForkJoinTask<?> peekNextLocalTask() { 23 Thread t; ForkJoinPool.WorkQueue q; 24 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 25 q = ((ForkJoinWorkerThread)t).workQueue; 26 else 27 q = ForkJoinPool.commonSubmitterQueue(); 28 return (q == null) ? null : q.peek(); 29 } 30 31 //獲取並且移除(即取消執行)安排給當前線程的可能即將被執行的下一個任務。 32 //該方法主要是為了支持擴展,否則可能不會被使用。 33 protected static ForkJoinTask<?> pollNextLocalTask() { 34 Thread t; 35 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 36 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : 37 null; 38 } 39 40 //如果當前線程被ForkJoinPool運行,獲取並且移除(即取消執行)當前線程即將可能執行的下一個任務。該任務可能是從其它線程中竊取來的。 41 //返回nulll並不一定意味着此任務正在操作的ForkJoinPool處於靜止狀態。該方法主要是為了支持擴展,否則可能不會被使用。 42 protected static ForkJoinTask<?> pollTask() { 43 Thread t; ForkJoinWorkerThread wt; 44 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? 45 (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : 46 null; 47 }
一些說明
通常ForkJoinTask只適用於非循環依賴的純函數的計算或孤立對象的操作,否則,執行可能會遇到某種形式的死鎖,因為任務循環地等待彼此。但是,這個框架支持其他方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用於構造解決這種依賴任務的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short類型的值,並使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用於任何目的,但是它們可以用於構造專門的子類,由此可以使用提供的方法來避免重新訪問已經處理過的節點/任務。
ForkJoinTask應該執行相對較少的計算,並且應該避免不確定的循環。大任務應該被分解成更小的子任務,通常通過遞歸分解。如果任務太大,那么並行性就不能提高吞吐量。如果太小,那么內存和內部任務維護開銷可能會超過處理開銷。
ForkJoinTask是可序列化的,這使它們能夠在諸如遠程執行框架之類的擴展中使用。只在執行之前或之后序列化任務才是明智的,而不是在執行期間。
ForkJoinTask 的三個抽象子類
通常我們不會直接實現ForkJoinTask,而是實現其三個抽象子類,ForkJoinTask僅僅是為了配合ForkJoinPool實現任務的調度執行,通常我們使用的時候,僅僅只需要提供任務的拆分與執行即可,RecursiveAction 用於大多數不返回結果的計算, RecursiveTask 用於返回結果的計算, CountedCompleter 用於那些操作完成之后觸發其他操作的操作。
RecursiveAction --- 不返回結果的任務

1 public abstract class RecursiveAction extends ForkJoinTask<Void> { 2 private static final long serialVersionUID = 5232453952276485070L; 3 4 /** 5 * The main computation performed by this task. 6 */ 7 protected abstract void compute(); 8 9 /** 10 * Always returns {@code null}. 11 * 12 * @return {@code null} always 13 */ 14 public final Void getRawResult() { return null; } 15 16 /** 17 * Requires null completion value. 18 */ 19 protected final void setRawResult(Void mustBeNull) { } 20 21 /** 22 * Implements execution conventions for RecursiveActions. 23 */ 24 protected final boolean exec() { 25 compute(); 26 return true; 27 } 28 29 }
RecursiveAction很簡單,作為不返回結果的任務,其getRawResult方法永遠返回null,setRawResult方法則什么都不做,它增加了一個無返回值的compute抽象方法,用於當ForkJoinTask被調度執行exec方法時調用,exec方法在執行完compute之后直接返回true,表示任務正常結束,而compute方法就是留給我們去實現大任務如何拆小任務,小任務怎么執行的邏輯。
RecursiveTask --- 要返回結果的任務

1 public abstract class RecursiveTask<V> extends ForkJoinTask<V> { 2 private static final long serialVersionUID = 5232453952276485270L; 3 4 /** 5 * The result of the computation. 6 */ 7 V result; 8 9 /** 10 * The main computation performed by this task. 11 * @return the result of the computation 12 */ 13 protected abstract V compute(); 14 15 public final V getRawResult() { 16 return result; 17 } 18 19 protected final void setRawResult(V value) { 20 result = value; 21 } 22 23 /** 24 * Implements execution conventions for RecursiveTask. 25 */ 26 protected final boolean exec() { 27 result = compute(); 28 return true; 29 } 30 31 }
RecursiveTask也很簡單,既然要返回結果,所以它定義了一個表示執行結果的result字段,getRawResult/setRawResult就用來操作該字段,它增加了一個有返回值的compute抽象方法,用於當ForkJoinTask被調度執行exec方法時調用,exec方法在執行完compute之后,將compute的返回結果作為任務的執行結果賦值給result,並最終返回true表示任務正常結束,同樣compute方法也是留給我們去實現大任務如何拆小任務,小任務怎么執行,並且返回任務執行結果的邏輯。
CountedCompleter --- 操作完成觸發鈎子函數的操作
CountedCompleter是Java8才新增的一個ForkJoinTask的抽象子類,比前面兩個要復雜的多(反正我目前是沒怎么弄明白),它是為了在任務完成之后調用鈎子函數,它可以支持返回結果(這時應該重寫方法getRawResult()以提供join()、invoke()和相關方法的結果),也可以不返回結果,默認其getRawResult方法永遠返回null,setRawResult方法也什么都不做。
它也增加了一個無返回值的compute抽象方法,用於當ForkJoinTask被調度執行exec方法時調用,但它的exec方法在正常執行完compute之后,永遠都返回的是false,表示任務沒有正常結束,根據ForkJoinTask的doExec方法實現,對於CountedCompleter正常結束但返回false這種實現,將導致不會執行setCompletion即改變任務的狀態也不會喚醒等待該任務的線程,這都交給了CountedCompleter自己來完成,而compute若異常結束則還是按原來的邏輯記錄異常到哈希鏈表數組中,然后改變任務的狀態為EXCEPTIONAL,因此只有在顯式調用complete(T)、ForkJoinTask.cancel(boolean)、ForkJoinTask.completeExceptionally(Throwable)或compute異常完成時,任務狀態才會更改。
CountedCompleter在創建實例的時候還可以傳入一個CountedCompleter實例,因此可以形成樹狀的任務結構,樹上的所有任務是可以並行執行的,且每一個子任務完成后都可以通過tryComplete輔助其父任務的完成,CountedCompleter的代碼量很少區區300行,但其設計理念卻很有用,CountedCompleters在存在子任務停滯和阻塞的情況下比其他形式的ForkJoinTasks更健壯,但編程的直觀性較差。CountedCompleter的使用類似於其他基於完成的組件(例如java.nio.channel.completionhandler),除了在完成時觸發onCompletion(CountedCompleter)可能需要多個掛起任務的完成,而不僅僅是一個。除非進行了其他初始化,否則掛起計數從0開始,但是可以使用setPendingCount、addToPendingCount和compareAndSetPendingCount方法(原子性地)更改掛起計數。在調用tryComplete時,如果掛起的操作計數非零,則遞減;否則,將觸發onCompletion操作,如果該CountedCompleters本身還有一個CountedCompleters,則繼續對其CountedCompleters進行該過程。
一個CountedCompleter實現類必須實現compute方法,在大多數情況下(如下所示),在其返回之前應該調用一次tryComplete()。該類還可以選擇性地重寫方法onCompletion(CountedCompleter),以便在正常完成時執行想要的操作,以及方法onExceptionalCompletion(Throwable, CountedCompleter),以便在任何異常時執行操作。
CountedCompleters通常不返回結果,在這種情況下,它們通常被聲明為CountedCompleter<Void>,並且總是返回null作為結果值。在其他情況下,您應該重寫方法getRawResult來提供來自join()、invoke()和相關方法的結果。通常,該方法應該返回CountedCompleter對象的一個字段(或一個或多個字段的函數)的值,該對象在完成時保存結果。方法setRawResult在CountedCompleters中默認什么也不做。重寫此方法以維護包含結果數據的其他對象或字段是可能的,但很少適用。
一個沒有其他CountedCompleter(例如getCompleter返回null)的CountedCompleter可以作為一個具有附加功能的常規的ForkJoinTask。然而,任何一個有其他CountedCompleter的CountedCompleter,那么它只能作為其他計算的內部輔助,因此它自身的任務狀態(例如ForkJoinTask.isDone等方法報告的)將是無意義的,其狀態只在顯示的調用complete,ForkJoinTask.cancel, ForkJoinTask.completeExceptionally(Throwable) 或者compute方法拋出異常時才會被改變。在任何異常完成之后,如果存在一個並且還沒有以其他方式完成的任務,則可能會將異常傳播給任務的 CountedCompleter(以及它的CountedCompleter,以此類推)。類似地,取消一個內部CountedCompleter只會對該completer產生局部影響,因此通常不太有用。
CountedCompleters是一個抽象類,要理解它有點難度,其編程實現可以達到的目的也靈活多變,就Java doc提供了幾個示例可以看出,CountedCompleters可以將任務遞歸分解成基於樹的形狀,基於樹的技術通常也比直接fork葉子任務更可取,因為它們減少了線程間的通信並提高了負載平衡。它還可以用於搜索查找過程中,當一個線程搜索到目標就可以讓根節點任務完成,其他子節點任務自發終止。還可以用於計算任務時子節點任務合並結果得父節點任務又與其兄弟節點合並結果得到祖父任務結果,以此類推這樣的場景等等,總之CountedCompleter類的使用非常靈活,如java並行流中涉及到的運行集拆分,結果合並,運算調度等就有用到它。