目錄
1.Fork/Join框架簡介
Java7提供的一個並行任務執行,把大任務分割成小任務,最終匯總每個小任務結果后得到大任務結果的框架
Fork/Join運行流程圖:
2.工作竊取算法
某個線程從其他隊列竊取任務來執行
大任務分割為子任務,將子任務放在不同的任務隊列中,每個隊列創建一個單獨線程來執行隊列里的任務,線程和隊列一一對應
工作竊取:有的線程把自己隊列中的任務干完之后,會去其他線程的隊列中竊取一個任務執行,通常使用雙端隊列,被竊取任務的線程從雙端隊列頭部拿任務執行,竊取任務的線程從雙端隊列尾部拿任務執行
工作竊取算法優缺點:
- 優點:充分利用線程進行並行計算,減少線程間的競爭
- 缺點:消耗更多系統資源,只有一個任務時會出現競爭
3.Fork/Join框架設計
主要步驟:分割任務,執行任務,合並結果
- 分割任務:需要一個fork類把大任務拆成子任務,子任務太大可能還要分割
- 執行任務並合並結果:子任務在雙端隊列中執行,結果統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據
需要使用兩個類:
ForkJoinTask
:創建fork-join任務,提供了fork()和join()實現機制,通常情況下繼承它的子類即可:RecursiveAction
(無返回值),RecursiveTask
(有返回值)ForkJoinPool
:執行fork-join任務
4.使用Fork/Join框架
創建任務:重寫了RecursiveTask的compute()
方法
// 如果任務大於閾值,就分裂成兩個子任務計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 執行子任務
leftTask.fork();
rightTask.fork();
// 等待子任務執行完,並得到其結果
int leftResult=leftTask.join();
int rightResult=rightTask.join();
// 合並子任務
sum = leftResult + rightResult;
使用Fork-Join池執行:
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務,負責計算1+2+3+4
CountTask task = new CountTask(1, 4);
// 執行一個任務 ,返回一個Future
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
5.Fork/Join框架異常處理
ForkJoinTask提供了isCompletedAbnormally()
方法檢查任務是否拋出異常或被取消,通過getException()
方法獲取異常:
if(task.isCompletedAbnormally()) {
System.out.println(task.getException());
}
6.Fork/Join框架實現原理
ForkJoinPool由任務數組ForkJoinTask和工作線程數組ForkJoinWorkerThread組成
- ForkJoinTask負責存放提交的任務
- ForkJoinWorkerThread負責執行這些任務
6.1 fork()方法的實現
public final ForkJoinTask<V> fork() {
//調用pushTask執行任務
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}
pushTask把當前任務存放在ForkJoinTask數組中,調用signalWork()
喚醒或創建一個工作線程來執行任務:
final void pushTask(ForkJoinTask<> t) {
ForkJoinTask<>[] q;
//s:隊頭,m:隊尾
int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
//喚醒或創建線程處理
pool.signalWork();
else if (s == m)
//擴展隊列
growQueue();
}
}
6.2 join()方法的實現
主要作用是阻塞當前線程並等待獲取結果
public final V join() {
//通過doJoin得到任務狀態
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
private V reportResult() {
int s; Throwable ex;
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
return getRawResult();
}
四種任務狀態:
- 已完成(NORMAL):直接返回任務結果
- 被取消(CANCELLED):直接拋出CancllationException
- 信號(SIGNAL)
- 出現異常(EXCEPTIONAL):直接拋出對應異常
6.3 doJoin()實現
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
//線程是ForkJoinWorkerThread
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
//任務執行完,直接返回
if ((s = status) < 0)
return s;
//沒有執行完,取出任務執行
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
//執行
completed = exec();
} catch (Throwable rex) {
//記錄異常信息,並設置狀態為EXCEPTIONAL
return setExceptionalCompletion(rex);
}
//順利執行完
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}else
//等待
return externalAwaitDone();
}