一、背景
雖然目前處理器核心數已經發展到很大數目,但是按任務並發處理並不能完全充分的利用處理器資源,因為一般的應用程序沒有那么多的並發處理任務。基於這種現狀,考慮把一個任務拆分成多個單元,每個單元分別得到執行,最后合並每個單元的結果。
Fork/Join框架是JAVA7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

二、工作竊取算法
指的是某個線程從其他隊列里竊取任務來執行。使用的場景是一個大任務拆分成多個小任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列中,並且每個隊列都有單獨的線程來執行隊列里的任務,線程和隊列一一對應。但是會出現這樣一種情況:A線程處理完了自己隊列的任務,B線程的隊列里還有很多任務要處理。A是一個很熱情的線程,想過去幫忙,但是如果兩個線程訪問同一個隊列,會產生競爭,所以A想了一個辦法,從雙端隊列的尾部拿任務執行。而B線程永遠是從雙端隊列的頭部拿任務執行(任務是一個個獨立的小任務),這樣感覺A線程像是小偷在竊取B線程的東西一樣。

工作竊取算法的優點:
利用了線程進行並行計算,減少了線程間的競爭。
工作竊取算法的缺點:
1、如果雙端隊列中只有一個任務時,線程間會存在競爭。
2、竊取算法消耗了更多的系統資源,如會創建多個線程和多個雙端隊列。
三、框架設計
Fork/Join中兩個重要的類:
1、ForkJoinTask:使用該框架,需要創建一個ForkJoin任務,它提供在任務中執行fork和join操作的機制。一般情況下,我們並不需要直接繼承ForkJoinTask類,只需要繼承它的子類,它的子類有兩個:
a、RecursiveAction:用於沒有返回結果的任務。
b、RecursiveTask:用於有返回結果的任務。
2、ForkJoinPool:任務ForkJoinTask需要通過ForkJoinPool來執行。
1 package test; 2 3 import java.util.concurrent.ExecutionException; 4 import java.util.concurrent.ForkJoinPool; 5 import java.util.concurrent.Future; 6 import java.util.concurrent.RecursiveTask; 7 8 9 public class CountTask extends RecursiveTask<Integer> 10 { 11 private static final long serialVersionUID = 1L; 12 //閾值 13 private static final int THRESHOLD = 2; 14 private int start; 15 private int end; 16 17 public CountTask(int start, int end) 18 { 19 this.start = start; 20 this.end = end; 21 } 22 23 @Override 24 protected Integer compute() 25 { 26 int sum = 0; 27 //判斷任務是否足夠小 28 boolean canCompute = (end - start) <= THRESHOLD; 29 if(canCompute) 30 { 31 //如果小於閾值,就進行運算 32 for(int i=start; i<=end; i++) 33 { 34 sum += i; 35 } 36 } 37 else 38 { 39 //如果大於閾值,就再進行任務拆分 40 int middle = (start + end)/2; 41 CountTask leftTask = new CountTask(start,middle); 42 CountTask rightTask = new CountTask(middle+1,end); 43 //執行子任務 44 leftTask.fork(); 45 rightTask.fork(); 46 //等待子任務執行完,並得到執行結果 47 int leftResult = leftTask.join(); 48 int rightResult = rightTask.join(); 49 //合並子任務 50 sum = leftResult + rightResult; 51 52 } 53 return sum; 54 } 55 56 public static void main(String[] args) 57 { 58 ForkJoinPool forkJoinPool = new ForkJoinPool(); 59 CountTask task = new CountTask(1,6); 60 //執行一個任務 61 Future<Integer> result = forkJoinPool.submit(task); 62 try 63 { 64 System.out.println(result.get()); 65 } 66 catch (InterruptedException e) 67 { 68 e.printStackTrace(); 69 } 70 catch (ExecutionException e) 71 { 72 e.printStackTrace(); 73 } 74 75 } 76 77 }
這個程序是將1+2+3+4+5+6拆分成1+2;3+4;5+6三個部分進行子程序進行計算后合並。
四、源碼解讀
1、leftTask.fork();
1 public final ForkJoinTask<V> fork() { 2 Thread t; 3 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 4 ((ForkJoinWorkerThread)t).workQueue.push(this); 5 else 6 ForkJoinPool.common.externalPush(this); 7 return this; 8 }
fork方法內部會先判斷當前線程是否是ForkJoinWorkerThread的實例,如果滿足條件,則將task任務push到當前線程所維護的雙端隊列中。
1 final void push(ForkJoinTask<?> task) { 2 ForkJoinTask<?>[] a; ForkJoinPool p; 3 int b = base, s = top, n; 4 if ((a = array) != null) { // ignore if queue removed 5 int m = a.length - 1; // fenced write for task visibility 6 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); 7 U.putOrderedInt(this, QTOP, s + 1); 8 if ((n = s - b) <= 1) { 9 if ((p = pool) != null) 10 p.signalWork(p.workQueues, this); 11 } 12 else if (n >= m) 13 growArray(); 14 } 15 }
在push方法中,會調用ForkJoinPool的signalWork方法喚醒或創建一個工作線程來異步執行該task任務。
2、
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
通過doJoin方法返回的任務狀態來判斷,如果不是NORMAL,則拋異常:
private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); }
來看下doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
先查看任務狀態,如果已經完成,則直接返回任務狀態;如果沒有完成,則從任務隊列中取出任務並執行。
