1.Fork/Join框架:(分治算法思想)
在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join匯總。
2.Fork/Join工作方式:
ForkJoinTask需要通過ForkJoinPool來執行。
ForkJoinTask可以理解為類線程但比線程輕量的實體, 在ForkJoinPool中運行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任務.
ForkJoinTask同時也是一個輕量的Future,使用時應避免較長阻塞和io.
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務。
任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。
當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(工作竊取算法)。
也就是說Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。
就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來,加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!
ForkJoinWorkerThread線程是一種在Fork/Join框架中運行的特性線程,它除了具有普通線程的特性外,最主要的特點是每一個ForkJoinWorkerThread線程都具有一個獨立的任務等待隊列(work queue),這個任務隊列用於存儲在本線程中被拆分的若干子任務。
3.Fork/Join框架實現
實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,
RecursiveTask是有返回值的,RecursiveAction 則沒有。
下面是計算一個計算數據和的示例:
public class ForkJoinWork extends RecursiveTask<Long> { private Long start;//起始值 private Long end;//結束值 public static final Long critical = 100000L;//臨界值 public ForkJoinWork(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { // return null; //判斷是否是拆分完畢 Long lenth = end - start; //起始值差值 if (lenth <= critical) { //如果拆分完畢就相加 Long sum = 0L; for (Long i = start; i <= end; i++) { sum += i; } return sum; } else { //沒有拆分完畢就開始拆分 Long middle = (end + start) / 2;//計算的兩個值的中間值 ForkJoinWork right = new ForkJoinWork(start, middle); right.fork();//拆分,並壓入線程隊列 ForkJoinWork left = new ForkJoinWork(middle + 1, end); left.fork();//拆分,並壓入線程隊列 //合並 return right.join() + left.join(); } } }
測試:
public class ForkJoinWorkTest {
@Test public void test() { //ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);//參數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 56418 //ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 10000000000L); } @Test public void test2() { //普通線程實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 64069 } @Test public void test3() { //Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); //long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, (a, b) -> a+b); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 2152 } }
4.分析:
我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當數據很大的時候ForkJoin比普通線程實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快。
Java 8 中將並行流進行了優化,我們可以很容易的對數據進行並行流的操作,Stream API可以聲明性的通過parallel()與sequential()在並行流與串行流中隨意切換!
5.ForkJoinPool
ForkJoinTask需要通過ForkJoinPool來執行。位於java.util.concurrent包下,構造函數:
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
-
parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量,也不要將這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與后者完全不一樣。而后續的討論中,還可以發現Fork/Join框架中可存在的線程數量和這個參數值的關系並不是絕對的關聯(有依據但並不全由它決定)。
-
factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory接口,而是需要實現ForkJoinWorkerThreadFactory接口。后者是一個函數式接口,只需要實現一個名叫newThread的方法。在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory接口實現:DefaultForkJoinWorkerThreadFactory。
-
handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。
-
asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它並不是說Fork/Join框架是采用同步模式還是采用異步模式工作。Fork/Join框架中為每一個獨立工作的線程准備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用后進先出的工作模式。當asyncMode設置為ture的時候,隊列采用先進先出方式工作;反之則是采用后進先出的方式工作,該值默認為false.(WorkQueue)
ForkJoinPool還有另外兩個構造函數,一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;
另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。
實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。
//對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量 public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } //框架的最大並行任務數量 public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } //私有的、原生構造函數(被上面的構造函數 調用) private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
6.fork方法
fork方法用於將新創建的子任務放入當前線程的work queue隊列中,Fork/Join框架將根據當前正在並發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,
決定是讓這個任務在隊列中等待,還是創建一個新的ForkJoinWorkerThread線程運行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運行它。
fork方法,將當前任務入池 ; 當我們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,然后立即返回結果。
代碼如下:
public final ForkJoinTask<V> fork() { Thread t;
//如果當前線程是ForkJoinWorkerThread,將任務壓入該線程的任務隊列 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else
//否則調用common池的externalPush方法入隊 ForkJoinPool.common.externalPush(this); return this; }
pushTask方法把當前任務存放在ForkJoinTask數組隊列里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
7.join方法
join方法用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,如果這個子任務存在於當前線程的任務等待隊列(work queue)中,則取出這個子任務進行“遞歸”執行。
其目的是盡快得到當前子任務的運行結果,然后繼續執行。也就是讓子任務先執行的意思。
public final V join() { int s;
//調用doJoin方法阻塞等待的結果不是NORMAL,說明有異常或取消.報告異常 if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s);
//等於NORMAL,正常執行完畢,返回原始結果 return getRawResult(); }
它首先調用doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)。
如果任務狀態是已完成,則直接返回任務結果。
如果任務狀態是被取消,則直接拋出CancellationException
如果任務狀態是拋出異常,則直接拋出對應的異常
如果沒有返回狀態,會否則使用當線程池所在的ForkJoinPool的awaitJoin方法等待.
讓我們分析一下doJoin方法的實現
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//已完成,返回status,未完成再嘗試后續 return (s = status) < 0 ? s :
//未完成,當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,並嘗試將當前task出隊然后執行,執行的結果是完成則返回狀態,否則使用當線程池所在的ForkJoinPool的awaitJoin方法等待 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) :
//當前線程不是ForkJoinWorkerThread,調用externalAwaitDone方法. externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
在doJoin()方法里,首先通過查看任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組里取出任務並執行。
如果任務順利執行完成,則設置任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL。
8.invoke方法
public final V invoke() { int s;
//先嘗試執行 if ((s = doInvoke() & DONE_MASK) != NORMAL)
//doInvoke方法的結果status只保留完成態位表示非NORMAL,則報告異常 reportException(s);
//正常完成,返回原始結果. return getRawResult(); } //ForkJoinPool::awaitJoin,在該方法中使用循環的方式進行internalWait,滿足了每次按截止時間或周期進行等待,同時也順便解決了虛假喚醒 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
externalAwaitDone函數.它體現了ForkJoin框架的一個核心:外部幫助,
externalAwaitDone的邏輯不復雜,在當前task為ForkJoinPool.common的情況下可以在外部進行等待和嘗試幫助完成.
方法會首先根據ForkJoinTask的類型進行嘗試幫助,並返回當前的status,若發現未完成,則進入下面的等待喚醒邏輯.該方法的調用者為非worker線程.
//外部線程等待一個common池中的任務完成. private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? //當前task是一個CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,並將完成狀態返回. ForkJoinPool.common.externalHelpComplete( (CountedCompleter<?>)this, 0) : //當前task不是CountedCompleter,則調用common pool嘗試外部彈出該任務並進行執行, //status賦值doExec函數的結果,若彈出失敗(其他線程先行彈出)賦0. ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { //檢查上一步的結果,即外部使用common池彈出並執行的結果(不是CountedCompleter的情況),或外部嘗試幫助CountedCompleter完成的結果 //status大於0表示嘗試幫助完成失敗. //擾動標識,初值false boolean interrupted = false; do { //循環嘗試,先給status標記SIGNAL標識,便於后續喚醒操作. if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { //CAS成功,進同步塊發現double check未完成,則等待. wait(0L); } catch (InterruptedException ie) { //若在等待過程中發生了擾動,不停止等待,標記擾動. interrupted = true; } } else //進同步塊發現已完成,則喚醒所有等待線程. notifyAll(); } } } while ((s = status) >= 0);//循環條件,task未完成. if (interrupted) //循環結束,若循環中間曾有擾動,則中斷當前線程. Thread.currentThread().interrupt(); } //返回status return s; }
參考:https://www.cnblogs.com/wzqjy/p/7921063.html
https://blog.csdn.net/tyrroo/article/details/81390202
https://segmentfault.com/a/1190000019549838
https://www.cnblogs.com/senlinyang/p/7885964.html