並發編程(十三):Fork-Join框架



1.Fork/Join框架簡介

Java7提供的一個並行任務執行,把大任務分割成小任務,最終匯總每個小任務結果后得到大任務結果的框架

Fork/Join運行流程圖:


2.工作竊取算法

某個線程從其他隊列竊取任務來執行

大任務分割為子任務,將子任務放在不同的任務隊列中,每個隊列創建一個單獨線程來執行隊列里的任務,線程和隊列一一對應

工作竊取:有的線程把自己隊列中的任務干完之后,會去其他線程的隊列中竊取一個任務執行,通常使用雙端隊列,被竊取任務的線程從雙端隊列頭部拿任務執行,竊取任務的線程從雙端隊列尾部拿任務執行

工作竊取算法優缺點:

  • 優點:充分利用線程進行並行計算,減少線程間的競爭
  • 缺點:消耗更多系統資源,只有一個任務時會出現競爭

3.Fork/Join框架設計

主要步驟:分割任務,執行任務,合並結果

  • 分割任務:需要一個fork類把大任務拆成子任務,子任務太大可能還要分割
  • 執行任務並合並結果:子任務在雙端隊列中執行,結果統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據

需要使用兩個類:

  • ForkJoinTask:創建fork-join任務,提供了fork()和join()實現機制,通常情況下繼承它的子類即可:RecursiveAction(無返回值),RecursiveTask(有返回值)
  • ForkJoinPool:執行fork-join任務

4.使用Fork/Join框架

創建任務:重寫了RecursiveTask的compute()方法

// 如果任務大於閾值,就分裂成兩個子任務計算 
int middle = (start + end) / 2; 
CountTask leftTask = new CountTask(start, middle); 
CountTask rightTask = new CountTask(middle + 1, end); 
// 執行子任務 
leftTask.fork(); 
rightTask.fork(); 
// 等待子任務執行完,並得到其結果 
int leftResult=leftTask.join(); 
int rightResult=rightTask.join(); 
// 合並子任務 
sum = leftResult + rightResult; 

使用Fork-Join池執行:

ForkJoinPool forkJoinPool = new ForkJoinPool(); 
// 生成一個計算任務,負責計算1+2+3+4 
CountTask task = new CountTask(1, 4); 
// 執行一個任務 ,返回一個Future
Future<Integer> result = forkJoinPool.submit(task); 
try { 
    System.out.println(result.get()); 
} catch (InterruptedException e) { 
} catch (ExecutionException e) { 
} 

5.Fork/Join框架異常處理

ForkJoinTask提供了isCompletedAbnormally()方法檢查任務是否拋出異常或被取消,通過getException()方法獲取異常:

if(task.isCompletedAbnormally()) { 
	System.out.println(task.getException()); 
} 

6.Fork/Join框架實現原理

ForkJoinPool由任務數組ForkJoinTask和工作線程數組ForkJoinWorkerThread組成

  • ForkJoinTask負責存放提交的任務
  • ForkJoinWorkerThread負責執行這些任務

6.1 fork()方法的實現

public final ForkJoinTask<V> fork() {
    //調用pushTask執行任務
   ((ForkJoinWorkerThread) Thread.currentThread())
        .pushTask(this);
   return this;
}

pushTask把當前任務存放在ForkJoinTask數組中,調用signalWork()喚醒或創建一個工作線程來執行任務:

final void pushTask(ForkJoinTask<> t) {
    ForkJoinTask<>[] q; 
    //s:隊頭,m:隊尾
    int s, m;
    if ((q = queue) != null) { // ignore if queue removed 
        long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
        UNSAFE.putOrderedObject(q, u, t);
        queueTop = s + 1; // or use putOrderedInt 
        if ((s -= queueBase) <= 2)
            //喚醒或創建線程處理
            pool.signalWork();
        else if (s == m)
            //擴展隊列
            growQueue();
   }
} 

6.2 join()方法的實現

主要作用是阻塞當前線程並等待獲取結果

public final V join() {
   //通過doJoin得到任務狀態
   if (doJoin() != NORMAL)
        return reportResult();
   else
        return getRawResult();
}
private V reportResult() {
   int s; Throwable ex;
   if ((s = status) == CANCELLED)
        throw new CancellationException();
   if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        UNSAFE.throwException(ex);
   return getRawResult();
} 

四種任務狀態:

  • 已完成(NORMAL):直接返回任務結果
  • 被取消(CANCELLED):直接拋出CancllationException
  • 信號(SIGNAL)
  • 出現異常(EXCEPTIONAL):直接拋出對應異常

6.3 doJoin()實現

private int doJoin() {
    Thread t; ForkJoinWorkerThread w; int s; boolean completed;
    //線程是ForkJoinWorkerThread
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        //任務執行完,直接返回
        if ((s = status) < 0)
        	return s;
        //沒有執行完,取出任務執行
        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
            try {
                //執行
            	completed = exec();
            } catch (Throwable rex) {
                //記錄異常信息,並設置狀態為EXCEPTIONAL
           		return setExceptionalCompletion(rex);
            }
            //順利執行完
            if (completed)
            	return setCompletion(NORMAL);
        }
        return w.joinTask(this);
    }else
      //等待
      return externalAwaitDone();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM