首先我們得先明白這兩個單詞是什么意思:
Fork:叉 ; 分岔處,分流處,岔口,岔路 ; 分岔 ;
Join:連接; 接合; 聯結; 結合; 聯合; 匯合; 加入;
好了,看到這倆的翻譯,是不是有所頓悟呢?請君繼續看......
1. Java並發的發展歷程
–Java 1 支持threads, locks, condition queues –Java 5 引入了 thread pools, blocking queues, concurrent collections –Java 7 加入了fork-join庫 –Java 8 加入了 parallel streams
2. 基本思想
Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
分治法:把一個規模大的問題划分為規模較小的子問題,然后分而治之,最后合並子問題的解得到原問題的解。
步驟:
分割原問題;
求解子問題;
合並子問題的解為原問題的解
。
在分治法中,子問題一般是相互獨立的,因此,經常通過遞歸調用算法來求解子問題。
3. 分治法典型應用
(1)二分搜索
(2)大整數乘法
(3)Strassen矩陣乘法
(4)棋盤覆蓋
(5)合並排序
(6)快速排序
(7)線性時間選擇
(8)漢諾塔
4. Fork/Join框架的實現原理
4.1 ForkJoinPool
ForkJoinPool 是實現Fork,Join的線程池 - ThreadPool
ForkJoinTask 一個描述ForkJoin的抽象類 Runnable/Callable
RecursiveAction 無返回結果的ForkJoinTask實現Runnable RecursiveTask<V> 有返回結果的ForkJoinTask實現Callable CountedCompleter<T> 在任務完成執行后會觸發執行一個自定義的鈎子函數
ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinPool中的所有的工作線程均有一個自己的工作隊列WorkQueue:
- ForkJoinTask數組負責將存放程序提交給ForkJoinPool,ForkJoinTask封裝了數據及其相應的計算;
- ForkJoinWorkerThread負責執行這些任務;
ForkJoinTask主要包括兩個方法分別實現任務的分拆與合並:
1、fork()類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作隊列中;
2、跟Thread.join()不同,ForkJoinTask的join()方法並不簡單的阻塞線程;
4.2 Fork方法的實現原理
ForkJoinTask的Fork方法的實現原理:
當我們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,然后立即返回結果,代碼如下:
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
pushTask方法把當前任務存放在ForkJoinTask數組隊列里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
4.3 join方法的實現原理
ForkJoinTask的join方法實現原理
Join方法的主要作用是阻塞當前線程並等待獲取結果。讓我們一起看看ForkJoinTask的join方法的實現,代碼如下:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
它首先調用doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有4種:
- 已完成(NORMAL):直接返回任務結果;
- 被取消(CANCELLED):直接拋出CancellationException;
- 信號(SIGNAL)
- 出現異常(EXCEPTIONAL):直接拋出對應的異常
讓我們分析一下doJoin方法的實現:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
在doJoin()方法里,首先通過查看任務的狀態,看任務是否已經執行完成,
- 如果執行完成,則直接返回任務狀態;
- 如果沒有執行完,則從任務數組里取出任務並執行;
- 如果任務順利執行完成,則設置任務狀態為NORMAL;
- 如果出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL。
4.4 Fork/Join框架的異常處理
ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:
if(task.isCompletedAbnormally()){ System.out.println(task.getException()); }
5. 舉個例子
使用Fork/Join框架首先要考慮到的是如何分割任務,如果希望每個子任務最多執行兩個數的相加,那么我們設置分割閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然后再join兩個子任務的結果。因為是有結果的任務,所以必須繼承RecursiveTask。
計算1+2+...+10000實現代碼如下:
public class CountTask extends RecursiveTask<Integer> { private static final int THREAD_HOLD = 2;//設置分割閾值 private int start; private int end; public CountTask(int start,int end){ this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //如果任務足夠小就計算 boolean canCompute = (end - start) <= THREAD_HOLD; if(canCompute){ for(int i=start;i<=end;i++){ sum += i; } }else{ int middle = (start + end) / 2; CountTask left = new CountTask(start,middle); CountTask right = new CountTask(middle+1,end); //執行子任務 left.fork(); right.fork(); //獲取子任務結果 int lResult = left.join(); int rResult = right.join(); sum = lResult + rResult; } return sum; } public static void main(String[] args){ ForkJoinPool pool = new ForkJoinPool(); CountTask task = new CountTask(1,10000); Future<Integer> result = pool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
參考及致謝:
1.Fork/Join框架基本使用(里面有一個歸並排序的過程圖,很好)
2.多線程(五) Fork/Join框架介紹及實例講解(里面有任務竊取算法思想)
3.Fork-join原理解析(主要借鑒,特別感謝)
Over......