Fork/Join框架簡介


1 引子

Fork/Join框架是從Java1.7開始提供的一個並行處理任務的框架(本篇博客基於JDK1.8分析),它的基本思路是將一個大任務分解成若干個小任務,並行處理多個小任務,最后再匯總合並這些小任務的結果便可得到原來的大任務結果。

從字面意思來理解Fork/Join框架,"Fork"表示“分叉”,它把大任務分解成多個小任務並行處理,而“Join”表示“聯合”,它將這些小任務的結果合並在一起,最終得到大任務的結果。比如計算累加1+2+3+...+100000,可分解成100個子任務,每個子任務只對1000個自然數求和,最終合並這100個子任務的結果。

 Fork/Join框架會用到工作竊取(work-stealing)算法,那么什么是工作竊取算法呢?

工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。使用工作竊取算法的主要目的是為了並發提高性能、提高執行效率,減少無效的等待。當我們需要處理一個大任務時,我們會將這個任務分解成多個相對獨立的子任務,為了減少線程競爭,我們將這些子任務放入到不同的隊列中,分別為每個隊列新建一個線程,一個線程消費一個隊列。在處理任務過程中,操作系統可能給有某些線程分配的時間片較多或某些線程所處理任務的工作量較小,它們會先一步將自己所對應隊列中的任務處理完了,而此時其他線程卻還沒處理完自己所屬的任務。此時執行快的線程可以到其他線程對應隊列中去“竊取”任務來處理,這樣就避免了無意義的干等。此時就會出現多線程訪問同一個隊列,為了避免線程競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部取出任務執行,而竊取任務的線程永遠從雙端隊列的尾部“竊取”任務執行。

 工作竊取算法的優缺點

優點:充分利用線程進行並行計算,減少了線程間的競爭

缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時,並且該算法會消耗了更多的系統資源,如創建多個線程和多個雙端隊列。

2 使用Fork/Join框架

1) 步驟

如何使用“Fork/Join框架”?其實這還是比較簡單的,正如其名字一樣,先"Fork"分解任務,再“Join”合並結果。

第一步,切分任務 。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停地割,直到分割出的子任務足夠小。

第二步,執行任務並合並結果 。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。

Fork/Join框架要使用兩個類來完成上面的兩個步驟。

①ForkJoinTask : 使用Fork/Join框架,我們應當提供一個ForkJoinTask任務,若沒有任務,Fork/Join就無從談起。我它提供了fork()、join()這兩個API進行任務的分解、結果的合並,它有兩個主要的直接子類,我們一般繼承這個兩個子類。ForkJoinTask是實現Future的抽象類,之前在FutureTask源碼完整解讀中有過對Future的介紹,它表示一個異步任務的結果。

  • RecursiveTask :表示有返回結果的任務

  • RecursiveAction : 表示沒有結果的任務

②ForkJoinPool : ForkJoinTask任務需要執行器ForkJoinPool來執行。ForkJoinPool是ExecutorService的實現類,它代表一個可執行任務的執行器。

 

 任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。

2) 用例

實現需求:求出1+2+3+....1000的結果。這里的任務需要有結果,應該選擇使用RecursiveTask任務 。另外我們需要考慮如何分割任務,此時我計划讓每個子任務執行100個數的相加,因此將分割的閥值設為100,那么Fork/Join框架會將這個主任務分割成10個子任務,最終將10個子任務的結果合並在一起。

package juc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo {
    public static void main(String[] args) {
        AccumulationTask task = new AccumulationTask(0, 1000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> future = forkJoinPool.submit(task);
        try {
            long start = System.currentTimeMillis();
            int r = future.get();
            System.out.println("執行‘0+1+2+3+...+1000'計算用時" + (System.currentTimeMillis() - start) + "毫秒,其結果是:" + r);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
    static class AccumulationTask extends RecursiveTask<Integer> {
        private static final int FORK_THRESHOLD = 100;
        private final int start;
        private final int end;
        public AccumulationTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
            boolean isContinueFork = (end - start) > FORK_THRESHOLD;
            int sum = 0;
            if (isContinueFork) {   //大於閥值需要繼續分割任務
                //二分法,分成左右兩段
                int m = (start + end) / 2;
                AccumulationTask leftTask = new AccumulationTask(start, m);
                AccumulationTask rightTask = new AccumulationTask(m + 1, end);
                //fork方法:分別執行左右兩段任務(若任務不夠小,將遞歸調用compute)
                leftTask.fork();
                rightTask.fork();
                //等待左右兩段任務執行完,再獲取子任務的結果
                int leftResult = leftTask.join();
                int rightResult = rightTask.join();
                sum = leftResult + rightResult;
            } else {//任務足夠小了,可以直接計算
                for (int i = this.start; i <= this.end; i++) {
                    sum += i;
                }
            }
            return sum;
        }
    }
}

從上面的示例可看出使用Fork/Join框架的關鍵在於實現compute方法 。在compute方法中,我們首先要確定任務是否需要繼續分割,如果任務足夠小、滿足預先設定的閥值就可直接執行任務。如果任務仍然很大,就必須繼續分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果,使用join方法會等待子任務執行完並得到其結果。

3 Fork/Join框架的異常處理

ForkJoinTask任務在執行過程中可能會(在執行自身任務的線程中)拋出異常,我們無法在主線程中直接捕獲異常,但ForkJoinTask提供了isCompletedAbnormally方法來判定任務是否拋出過異常或任務被取消(isCompletedNormally方法返回任務是否正常完成的布爾值)。另外ForkJoinTask還提供了getException方法來獲取異常。

getException返回Throwable對象,如果任務被取消了就返回CancellationException,如果任務未完成或未拋出異常就返回null 。

  簡單地獲取異常

if (leftTask.isCompletedAbnormally()) {
    System.out.println(leftTask.getException());
}

   捕獲異常修改原來的用例

public class ForkJoinDemo {
    public static void main(String[] args) {
        AccumulationTask task = new AccumulationTask(0, 1000);
        if(task.isCompletedAbnormally()){
            System.out.println(task.getException());
        }
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> future = forkJoinPool.submit(task);
        try {
            long start = System.currentTimeMillis();
            int r = future.get();
          System.out.println("執行‘0+1+2+3+...+1000'計算用時" + (System.currentTimeMillis() - start) + "毫秒,其結果是:" + r);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    static class AccumulationTask extends RecursiveTask<Integer> {
        private static final int FORK_THRESHOLD = 100;
        private final int start;
        private final int end;

        public AccumulationTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            boolean isContinueFork = (end - start) > FORK_THRESHOLD;
            int sum = 0;
            if (isContinueFork) {   //大於閥值進需要繼續分割任務
                //二分法,分成左右兩段
                int m = (start + end) / 2;
                AccumulationTask leftTask = new AccumulationTask(start, m);
                AccumulationTask rightTask = new AccumulationTask(m + 1, end);
                //fork方法:分別執行左右兩段任務(若任務不夠小,將遞歸調用compute)
                leftTask.fork();
                rightTask.fork();
                //等待左右兩段任務執行完,再獲取子任務的結果
                int leftResult = leftTask.join();
                int rightResult = rightTask.join();
                sum = leftResult + rightResult;
            } else {
                //任務足夠小了,可以直接計算
                    for (int i = this.start; i <= this.end; i++) {
                        sum += i;
                        if (i == 999) throw new IllegalStateException();
                    }
            }
            return sum;
        }
    }
}
捕獲異常后

4 Fork/Join框架的實現原理

ForkJoinPool中有一個重要的成員變量workQueues ,它是靜態內部類WorkQueue類型的數組。WorkQueue類中有一個ForkJoinTask類型數組array和一個ForkJoinWorkerThread成員變量owner, array數組負責將存放程序提交給ForkJoinPool的任務,而owner負責執行當前WorkQueue中的任務。

static final class WorkQueue{
    // Instance fields
    volatile int scanState;    // versioned, <0: inactive; odd:scanning
    int stackPred;             // pool stack (ctl) predecessor
    int nsteals;               // number of steals
    int hint;                  // randomization and stealer index hint
    int config;                // pool index and mode
    volatile int qlock;        // 1: locked, < 0: terminate; else 0
    volatile int base;         // index of next slot for poll
    int top;                   // index of next slot for push
    ForkJoinTask<?>[] array;   // the elements (initially unallocated)
    final ForkJoinPool pool;   // the containing pool (may be null)
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    volatile Thread parker;    // == owner during call to park; else null
    volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    //...省略
  }

1) ForkJoinTask的fork方法實現原理

當我們調用ForkJoinTask的fork方法時,程序首先判斷當前線程的類型,若是ForkJoinWorkerThread線程,它會調用ForkJoinWorkerThread的pushTask方法務提交到當前線程t的workQueue對應的隊列中;若是普通線程,它調用externalPush()方法將任務提交到隊列中;最后返回這任務本身

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

push方法是ForkJoinPool的靜態內部類WorkQueue的成員方法,它的基本邏輯是: 將當前任務放入到ForkJoinTask類型數組array中,然后調用signalWork執行任務,若數組容量不夠還將調用growArray對數組array擴容。

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);//將task放入到成員變量ForkJoinTask類型數組array中
        U.putOrderedInt(this, QTOP, s + 1);//更新下次入隊位置的索引
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                //隊列中最多只有一個任務了,可以喚醒一個線程或創建一個新線程來執行任務
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)//數組array容量不夠,需要擴容
            growArray();
    }
}

而ForkJoinPool的成員方法externalPush的基本邏輯與上面的push方法有些類似,但也有些不同:先確定WorkQueue的槽位,再將當前任務放到WorkQueue的成員變量ForkJoinTask數組array中,再調用signalWork執行任務。若workQueues是空的,將調用externalSubmit來初始化workQueues及相關屬性。

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();//探針值,用於計算q在workQueues中的索引槽位
    int rs = runState; //運行狀態
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&  //workQueues非空,且workQueues可放入任務(長度大於1)
        //與HashMap類似,m&r是用來確定數組的索引(取余,這里的r相當於HashMap中Node的hash屬性),
        //SQMASK=Ob1111110,(SQMASK十進制為126,)它限制了槽位索引只能是0-126
        //而SQMASK的二進制最低位為0,又相當於強制將"m & r'的最低位設為0(二進制最低位為零時表示偶數),
        //因此"m & r & SQMASK"的結果取0-126之間的偶數(共有64個偶數)。
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {  //鎖定q,這里CAS更新成功后,q.qlock為1,其他線程就不能CAS更新q.qlock了
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);//將task放入到成員變量ForkJoinTask類型數組array中
            U.putOrderedInt(q, QTOP, s + 1);//更新下次入隊位置的索引
            U.putIntVolatile(q, QLOCK, 0);//無條件更新q.qlock,解除對q的鎖定
            if (n <= 1)  //隊列中最多只有一個任務了,可以喚醒一個線程或創建一個新線程來執行任務
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);//q.array無法容納新任務時,也要解除對q的鎖定
    }
    //完整版本的externalPush,可處理不常見的情況,並在向ForkJoinPoll中首次提交第一個任務時執行輔助初始化。 
    //它還會檢測外部線程的首次提交,如果索引處的WorkQueue為空或存在線程競爭,則會創建一個新的共享隊列。
    externalSubmit(task);// workQueues是空的,需要初始化workQueues及相關屬性,並提交任務
}

2) ForkJoinTask的join方法實現原理

Join方法的主要作用是阻塞當前線程並等待獲取結果。讓我們一起看看ForkJoinTask的join方法的實現:

首先調用doJoin方法來執行並獲取任務運行狀態,若是非正常完成,就報告異常,若正常完成就返回結果。

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

ForkJoinTask使用成員變量state表示任務狀態,它可能有四種狀態,已完成(NORMAL)、被取消(CANCELLED)、等待信號(SIGNAL)和出現異常(EXCEPTIONAL)。DONE_MASK是表示任務狀態的位(用來取state二進制形式的最高4位),SMASK任務的標簽數的掩碼(可調用compareAndSetForkJoinTaskTag將state置為1)。

    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits 
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

若任務狀態是已完成,則可直接返回其結果;若任務被取消,則拋出CancellationException異常;若執行任務過程中拋出過異常,則直接拋出該異常。

private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}
public final Void getRawResult() { return null; }//RecursiveAction不需要結果,返回null
public final V getRawResult() {//RecursiveAction
    //RecursiveAction在執行exec方法時主要執行"result = compute();"代碼,將計算結果賦值給成員result
    return result;
}

我們再來看看doJoin()方法是怎么做的。doJoin()方法使用了過多的三元運算符,不太容易理解,下面我將三元運算替換成if-else。將方法“翻譯”后可以很容易地看出doJoin方法的主要邏輯:首先需要查看任務的狀態,若任務已完成(可能是任務取消或拋出異常等非正常完成),則直接返回任務狀態;若任務還沒執行完,則從工作隊列中取出並執行此任務。若此任務能立刻執行完成(可能是任務取消或拋出異常等非正常完成)就返回此狀態,反之就調用ForkJoinPool.awaitJoin等待任務執行完成。

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                                tryUnpush(this) && (s = doExec()) < 0 ? s :
                                wt.pool.awaitJoin(w, this, 0L) :
                        externalAwaitDone();
    }
    //翻譯后的doJoin方法
    private int doJoin() {
        int s;Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        if ((s = status) < 0) {//任務已完成,直接返回state
            return s;
        } else {
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { //當前線程是ForkJoinWorkerThread線程時
                wt = (ForkJoinWorkerThread) t;
                w = wt.workQueue;
                //tryUnpush取出這個任務
                //doExec准備執行exec方法(exec又調用compute方法),並(若完成)記錄狀態,但是doExce方法不會等待任務執行完成
                if (w.tryUnpush(this) && (s = doExec()) < 0){ 
                    return s;
                }else{
                    return  wt.pool.awaitJoin(w, this, 0L)//等待任務執行完成
                }
            }else{
                return externalAwaitDone();//當前線程是普通線程,調用externalAwaitDone阻塞當前線程,等待任務完成
            }
        }
    }

參考:《Java並發編程的藝術》方騰飛

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM