ForkJoin之ForkJoinTask框架學習筆記


1.Fork/Join框架:(分治算法思想)

在必要的情況下,將一個大任務,進行拆分(fork) 成若干個子任務(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務的結果進行join匯總。

2.Fork/Join工作方式:

ForkJoinTask需要通過ForkJoinPool來執行。

ForkJoinTask可以理解為類線程但比線程輕量的實體, 在ForkJoinPool中運行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask它的子任務.

ForkJoinTask同時也是一個輕量的Future,使用時應避免較長阻塞和io.

 

ForkJoinPoolForkJoinTask數組ForkJoinWorkerThread數組組成,ForkJoinTask數組負責將存放程序提交給ForkJoinPool,而ForkJoinWorkerThread負責執行這些任務

任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部

當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務(工作竊取算法)。

 

也就是說Fork/Join采用“工作竊取模式”,當執行新的任務時他可以將其拆分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨即線程中偷一個並把它加入自己的隊列中。

就比如兩個CPU上有不同的任務,這時候A已經執行完,B還有任務等待執行,這時候A就會將B隊尾的任務偷過來加入自己的隊列中,對於傳統的線程,ForkJoin更有效的利用的CPU資源!

 

ForkJoinWorkerThread線程是一種在Fork/Join框架中運行的特性線程,它除了具有普通線程的特性外,最主要的特點是每一個ForkJoinWorkerThread線程都具有一個獨立的任務等待隊列(work queue),這個任務隊列用於存儲在本線程中被拆分的若干子任務。

 

 

3.Fork/Join框架實現

實現這個框架需要繼承RecursiveTask 或者 RecursiveAction ,

RecursiveTask是有返回值的,RecursiveAction 則沒有。

下面是計算一個計算數據和的示例:

public class ForkJoinWork extends RecursiveTask<Long> {
    private Long start;//起始值
    private Long end;//結束值
    public static final Long critical = 100000L;//臨界值

    public ForkJoinWork(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        // return null;
        //判斷是否是拆分完畢
        Long lenth = end - start;   //起始值差值
        if (lenth <= critical) {
            //如果拆分完畢就相加
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            //沒有拆分完畢就開始拆分
            Long middle = (end + start) / 2;//計算的兩個值的中間值
            ForkJoinWork right = new ForkJoinWork(start, middle);
            right.fork();//拆分,並壓入線程隊列
            ForkJoinWork left = new ForkJoinWork(middle + 1, end);
            left.fork();//拆分,並壓入線程隊列

            //合並
            return right.join() + left.join();
        }

    }
}

測試: 

public class ForkJoinWorkTest {

@Test public void test() { //ForkJoin實現 long l = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool();//實現ForkJoin 就必須有ForkJoinPool的支持 ForkJoinTask<Long> task = new ForkJoinWork(0L, 10000000000L);//參數為起始值與結束值 Long invoke = forkJoinPool.invoke(task); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + invoke + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 56418 //ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 10000000000L); } @Test public void test2() { //普通線程實現 Long x = 0L; Long y = 10000000000L; long l = System.currentTimeMillis(); for (Long i = 0L; i <= y; i++) { x += i; } long l1 = System.currentTimeMillis(); System.out.println("invoke = " + x + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 64069 } @Test public void test3() { //Java 8 並行流的實現 long l = System.currentTimeMillis(); long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, Long::sum); //long reduce = LongStream.rangeClosed(0, 10000000000L).parallel().reduce(0, (a, b) -> a+b); long l1 = System.currentTimeMillis(); System.out.println("invoke = " + reduce + " time: " + (l1 - l)); //invoke = -5340232216128654848 time: 2152 } }

 

4.分析:

我們觀察上面可以看出來執行10000000000L的相加操作各自執行完畢的時間不同。觀察到當數據很大的時候ForkJoin比普通線程實現有效的多,但是相比之下ForkJoin的實現實在是有點麻煩,這時候Java 8 就為我們提供了一個並行流來實現ForkJoin實現的功能。可以看到並行流比自己實現ForkJoin還要快。

Java 8 中將並行流進行了優化,我們可以很容易的對數據進行並行流的操作,Stream API可以聲明性的通過parallel()與sequential()在並行流與串行流中隨意切換

5.ForkJoinPool

ForkJoinTask需要通過ForkJoinPool來執行。位於java.util.concurrent包下,構造函數:

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
  • parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定決定框架內並行執行的線程數量。並行的每一個任務都會有一個線程進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數量,也不要將這個屬性和ThreadPoolExecutor線程池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與后者完全不一樣。而后續的討論中,還可以發現Fork/Join框架中可存在的線程數量和這個參數值的關系並不是絕對的關聯(有依據但並不全由它決定)。

  • factory:當Fork/Join框架創建一個新的線程時,同樣會用到線程創建工廠。只不過這個線程工廠不再需要實現ThreadFactory接口,而是需要實現ForkJoinWorkerThreadFactory接口。后者是一個函數式接口,只需要實現一個名叫newThread的方法。在Fork/Join框架中有一個默認的ForkJoinWorkerThreadFactory接口實現:DefaultForkJoinWorkerThreadFactory。

  • handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被拋出時,就會被handler捕獲。

  • asyncMode:這個參數也非常重要,從字面意思來看是指的異步模式,它並不是說Fork/Join框架是采用同步模式還是采用異步模式工作。Fork/Join框架中為每一個獨立工作的線程准備了對應的待執行任務隊列,這個任務隊列是使用數組進行組合的雙向隊列。即是說存在於隊列中的待執行任務,即可以使用先進先出的工作模式,也可以使用后進先出的工作模式。當asyncMode設置為ture的時候,隊列采用先進先出方式工作;反之則是采用后進先出的方式工作,該值默認為false.(WorkQueue)

 

 ForkJoinPool還有另外兩個構造函數,一個構造函數只帶有parallelism參數,既是可以設定Fork/Join框架的最大並行任務數量;

另一個構造函數則不帶有任何參數,對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量(Runtime.getRuntime().availableProcessors())。

實際上ForkJoinPool還有一個私有的、原生構造函數,之上提到的三個構造函數都是對這個私有的、原生構造函數的調用。

  //對於最大並行任務數量也只是一個默認值——當前操作系統可以使用的CPU內核數量
 public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }

  //框架的最大並行任務數量
 public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

  //私有的、原生構造函數(被上面的構造函數 調用)
 private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

 

6.fork方法 

fork方法用於將新創建的子任務放入當前線程的work queue隊列中,Fork/Join框架將根據當前正在並發執行ForkJoinTask任務的ForkJoinWorkerThread線程狀態,

決定是讓這個任務在隊列中等待,還是創建一個新的ForkJoinWorkerThread線程運行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread線程運行它。

fork方法,將當前任務入池 ; 當我們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThreadpushTaskworkQueue中,異步地執行這個任務,然后立即返回結果。

代碼如下:

   public final ForkJoinTask<V> fork() {
        Thread t;
    //如果當前線程是ForkJoinWorkerThread,將任務壓入該線程的任務隊列
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else
      //否則調用common池的externalPush方法入隊 ForkJoinPool.common.externalPush(this); return this; }

 pushTask方法把當前任務存放在ForkJoinTask數組隊列里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:

   final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }

 

7.join方法

join方法用於讓當前線程阻塞,直到對應的子任務完成運行並返回執行結果。或者,如果這個子任務存在於當前線程的任務等待隊列(work queue)中,則取出這個子任務進行“遞歸”執行。

其目的是盡快得到當前子任務的運行結果,然后繼續執行。也就是讓子任務先執行的意思。

   public final V join() {
        int s;
      //調用doJoin方法阻塞等待的結果不是NORMAL,說明有異常或取消.報告異常
if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s);
    //等於NORMAL,正常執行完畢,返回原始結果
return getRawResult(); }

它首先調用doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有4種:已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現異常(EXCEPTIONAL)。

  如果任務狀態是已完成,則直接返回任務結果。

  如果任務狀態是被取消,則直接拋出CancellationException

  如果任務狀態是拋出異常,則直接拋出對應的異常

   如果沒有返回狀態,會否則使用當線程池所在的ForkJoinPool的awaitJoin方法等待.

  讓我們分析一下doJoin方法的實現

 private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
      //已完成,返回status,未完成再嘗試后續
return (s = status) < 0 ? s :
      //未完成,當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,並嘗試將當前task出隊然后執行,執行的結果是完成則返回狀態,否則使用當線程池所在的ForkJoinPool的awaitJoin方法等待 ((t
= Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) :
        //當前線程不是ForkJoinWorkerThread,調用externalAwaitDone方法. externalAwaitDone(); }
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }

 在doJoin()方法里,首先通過查看任務的狀態,看任務是否已經執行完成,如果執行完成,則直接返回任務狀態;如果沒有執行完,則從任務數組里取出任務並執行。

 如果任務順利執行完成,則設置任務狀態為NORMAL,如果出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL。

 

 8.invoke方法

  public final V invoke() {
        int s;
      //先嘗試執行
if ((s = doInvoke() & DONE_MASK) != NORMAL)
      //doInvoke方法的結果status只保留完成態位表示非NORMAL,則報告異常 reportException(s);
     //正常完成,返回原始結果.
return getRawResult(); } //ForkJoinPool::awaitJoin,在該方法中使用循環的方式進行internalWait,滿足了每次按截止時間或周期進行等待,同時也順便解決了虛假喚醒 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }

externalAwaitDone函數.它體現了ForkJoin框架的一個核心:外部幫助,

externalAwaitDone的邏輯不復雜,在當前task為ForkJoinPool.common的情況下可以在外部進行等待和嘗試幫助完成.

方法會首先根據ForkJoinTask的類型進行嘗試幫助,並返回當前的status,若發現未完成,則進入下面的等待喚醒邏輯.該方法的調用者為非worker線程.

 //外部線程等待一個common池中的任務完成.
    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ?
                //當前task是一個CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,並將完成狀態返回.
                ForkJoinPool.common.externalHelpComplete(
                        (CountedCompleter<?>)this, 0) :
                //當前task不是CountedCompleter,則調用common pool嘗試外部彈出該任務並進行執行,
                //status賦值doExec函數的結果,若彈出失敗(其他線程先行彈出)賦0.
                ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            //檢查上一步的結果,即外部使用common池彈出並執行的結果(不是CountedCompleter的情況),或外部嘗試幫助CountedCompleter完成的結果
            //status大於0表示嘗試幫助完成失敗.
            //擾動標識,初值false
            boolean interrupted = false;
            do {
                //循環嘗試,先給status標記SIGNAL標識,便於后續喚醒操作.
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                //CAS成功,進同步塊發現double check未完成,則等待.
                                wait(0L);
                            } catch (InterruptedException ie) {
                                //若在等待過程中發生了擾動,不停止等待,標記擾動.
                                interrupted = true;
                            }
                        }
                        else
                            //進同步塊發現已完成,則喚醒所有等待線程.
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);//循環條件,task未完成.
            if (interrupted)
                //循環結束,若循環中間曾有擾動,則中斷當前線程.
                Thread.currentThread().interrupt();
        }
        //返回status
        return s;
    }

 

  

 

參考:https://www.cnblogs.com/wzqjy/p/7921063.html

https://blog.csdn.net/tyrroo/article/details/81390202

 https://segmentfault.com/a/1190000019549838

https://www.cnblogs.com/senlinyang/p/7885964.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM