並發編程之Fork/Join


並發與並行

並發:多個進程交替執行。

並行:多個進程同時進行,不存在線程的上下文切換。

並發與並行的目的都是使CPU的利用率達到最大。Fork/Join就是為了盡可能提高硬件的使用率而應運而生的。

計算密集型與IO密集型

計算密集型:也稱之為CPU密集型,此時系統的硬盤,內存性能相對於CPU要很多。系統在運作的時候CPU是處於100% loading的狀態,在系統完成磁盤的讀寫(I/O)以后,程序就會進行計算,在進行計算的時候CPU占用率是很高的。計算密集型任務最大的特點就是進行大量的計算,消耗CPU資源,比如說高清解碼,計算圓周率啥的,都是靠CPU的運算能力。這種類型的任務雖然也支持多任務,但是花費在任務切換的時間越多,執行效率就越低,要最高效的利用cpu,建議任務數小於核心線程數。代碼運行效率也很關鍵,一般使用C語言來寫。線程數的設置:CPU核數+1(現代CPU支持超線程)。

IO密集型:CPU性能要比硬盤,內存性能好很多。這時候,大部分的情況是CPU在等I/O的讀寫操作,此時CPU loading並不是很高。I/O bound的程序一般在達到極限的時候,CPU利用率仍然比較低。對於IO密集型的任務主要涉及到網絡,磁盤IO.特點是CPU消耗很少,任務的大部分時間都是在等待IO操作完成(磁盤IO的速度遠遠低於cpu與內存的速度)。對於這種任務,任務越多,CPU的效率越高。對於這種任務適合使用開發效率最高的腳本語言,C語言基本上沒啥用。線程數的設置:(線程等待時間+線程CPU時間)/線程CPU時間)*CPU數目。

如何利用多核CPU,計算很大數組中所有整數進行排序?

當數據量小的時候使用快速排序快,快速排序顯著的特征是用遞歸的方法去排序的。當數據量大的時候歸遞排序。遞歸排序的思想就是在數組中取一個中間值,將一個數組分為2個,一個比中間值大,一個比中間值小,如此反復拆分排序,直到最后無法再進行拆分,然后將結果合並。因此遞歸方法除了空間復雜度增加了,還可能會產生棧溢出。(程序計數器是唯一不會發生棧溢出的),虛擬機棧默認最大空間是1M.   

分治思想:就是將一個規模大的問題划分為規模較小的子問題,然后逐步解決小問題,最后合並子問題的解就得到了原問題的解。即分割原問題--求解子問題--合並子問題的解。

子問題一般都是相互獨立的,因此,通常通過遞歸調用算法來求解子問題。

Fork/Join框架

  Fork/Join 是一個用於並行執行任務的框架,是一個把大任務拆分成小任務,執行小任務,最后匯總小任務的結果得到大任務的結果的框架。整體框架如下:

 

 

 

Fork/Join 特征:

1、ForkJoinPool是ExecutorService的補充 ,適用於一些特定的場景,適合於計算密集型場景。如果存在I/O,線程間同步,sleep()等會造成線程長時間阻塞的情況,此時可以配合ManagedBlocker使用。

2、ForkJoinPool主要是實現分治法,分治之后遞歸調用函數。

 

ForkJoinPool 框架主要類

ForkJoinPool 實現ForkJoin的線程池 - ThreadPool

ForkJoinWorkerThread 實現ForkJoin的線程

ForkJoinTask<V> 一個描述ForkJoin的抽象類 Runnable/Callable

RecursiveAction 無返回結果的ForkJoinTask實現Runnable

RecursiveTask<V> 有返回結果的ForkJoinTask實現Callable

CountedCompleter<T> 在任務完成執行后會觸發執行一個自定義的鈎子函數

提交任務:

 

 

fork()類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作隊列中, 跟Thread.join()不同,ForkJoinTask的join()方法並不簡單的阻塞線程 利用工作線程運行其他任務, 當一個工作線程中中調用join(),它將處理其他任務,直到注意到目標子任務已經完成。

 ForkJoinPool中的所有工作線程都有一個自己的工作隊列WorkQueue,是一個雙端隊列Deque,從隊頭取任務,先進后出,線程私有,不共享。

如下圖所示:

 

 

線程竊取

工作竊取就是指某個線程從其他隊列里竊取任務來執行。在ForkJoinPool中就是將一個大任務分成n個互不依賴的子任務,為了減少線程之間的競爭,於是把這些子任務放到不同的隊列當中去,並為每一個對列創建一個線程來執行隊列中的任務,A隊列的任務由A線程來執行。但是有的線程執行得比較快,很快就把自己隊列當中的任務執行完成了,但是A隊列里還有待執行的任務,這時候這個線程(假設是B線程)就會去竊取他的隊列當中的任務來執行。為了減少竊取任務線程與被竊取任務線程之間的競爭,采用雙端隊列,竊取任務是從隊尾竊取,被竊取任務線程從隊頭獲取任務來執行。

為了盡可能的提高CPU的利用率,空閑的線程將從其他線程的隊列中竊取任務來執行,從workQueue的隊尾竊取任務,從而減少競爭,任務的竊取是遵從FIFO順序進行的,因為先放入的任務往往表示更大的工作量,竊取來的任務支持進一步的遞歸分解。

WorkQueue雙端隊列最小化任務“竊取”的競爭, push()/pop()僅在其所有者工作線程中調用 ,這些操作都是通過CAS來實現的,是Wait-free的 。

poll() 則由其他工作線程來調用“竊取”任務 可能不是wait-free。任務竊取的好處就是充分利用了資源,但是也有缺點,當隊列當中只有一個任務的時候,就會出現競爭,並且系統會耗費更多的資源,比如創建多個線程和多個雙端隊列。

 

 

 總結一下就是:

1. ForkJoinPool 的每個工作線程都維護着一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
2. 每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊頭(左為隊尾,右側為隊頭),並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊頭取出任務來執行。(ForkJoinTask的fork()的子任務,將放入運行該任務的工作線程的隊頭,工作線程以LIFO的順序來處理隊列中的任務)
3. 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊尾,也就是說工作線程在竊取其他工作線程的任務時,使用的是FIFO 方式。
4. 在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。
5. 在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

代碼如下:

 public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

為了測試ForkJoinPool的好處,我們來看以下兩段代碼,來對比一下結果:

首先我們來看一下,就用自己寫的分任務執行,來計算

package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

  //執行該方法,看看測試結果
public static void main(String[] args) throws Exception { int[] arr = Utils.buildRandomIntArray(20); System.out.printf("The array length is: %d\n", arr.length); long result = sum(arr); System.out.printf("The result is: %d\n", result); } } package com.test.executor.arrsum.utils; import java.util.Random; public class Utils { public static int[] buildRandomIntArray(int size) { int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static int[] buildRandomIntArray() { int size = new Random().nextInt(100); int[] array = new int[size]; for (int i = 0; i < size; i++) { array[i] = new Random().nextInt(100); } return array; } public static void main(String[] args) { int[] ints = Utils.buildRandomIntArray(20); for (int i = 0; i < ints.length; i++) { System.out.println(ints[i]); } } }
package com.test.executor.arrsum;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.test.executor.arrsum.utils.Utils;

public class SumRecursiveMT {
    public static class RecursiveSumTask implements Callable<Long> {
        public static final int SEQUENTIAL_CUTOFF = 1;
        int lo;
        int hi;
        int[] arr; // arguments
        ExecutorService executorService;

        RecursiveSumTask( ExecutorService executorService, int[] a, int l, int h) {
            this.executorService = executorService;
            this.arr = a;
            this.lo = l;
            this.hi = h;
        }

        public Long call() throws Exception { // override
            System.out.format("%s range [%d-%d] begin to compute %n",
                    Thread.currentThread().getName(), lo, hi);
            long result = 0;
            if (hi - lo <= SEQUENTIAL_CUTOFF) {
                for (int i = lo; i < hi; i++)
                    result += arr[i];

                System.out.format("%s range [%d-%d] begin to finished %n",
                        Thread.currentThread().getName(), lo, hi);
            }
            else {
                RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);
                RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);
                Future<Long> lr = executorService.submit(left);
                Future<Long> rr = executorService.submit(right);

                result = lr.get() + rr.get();
                System.out.format("%s range [%d-%d] finished to compute %n",
                        Thread.currentThread().getName(), lo, hi);
            }

            return result;
        }
    }


    public static long sum(int[] arr) throws Exception {
        int nofProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        //ExecutorService executorService = Executors.newCachedThreadPool();

        RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);
        long result =  executorService.submit(task).get();
        return result;
    }

    public static void main(String[] args) throws Exception {
        int[] arr = Utils.buildRandomIntArray(20);
        System.out.printf("The array length is: %d\n", arr.length);
        
        long result = sum(arr);

        System.out.printf("The result is: %d\n", result);

    }
}

 

運行該代碼的結果如下:

 

 結果一直沒有出來,就說明一直在計算。因為線程在遞歸計算,開的線程太多,然后計算時間比較長。

ForkJoin的使用

ForkJoinTask:我們要使用ForkJoin框架,就要創建一個ForkJoin 任務,創建ForkJoin任務的話,不需要直接繼承ForkJoinTask類,而是繼承他的子類.ForkJoin框架有兩個子類RecursiveAction和RecursiveTask<V>。

  1、RecursiveAction:用於返回沒有結果的任務。(比如寫數據到磁盤以后就退出。一個RecursiveAction可以把工作分割成若干小塊,由獨立的線程或者CPU執行,通過繼承實現RecursiveAction)

  2、RecursiveTask<V> :用於執行有返回結果的任務。(將一個任務分割成若干的子任務,每個子任務返回的值合並到一個集體結果,可以水平的分割和合並。)

 ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。任務分割出來的子任務會添加到當前工作線程的雙端隊列當中,進入隊列的頭部。當一個工作線程的隊列中沒有任務的時候它會從其他隊列的尾部獲取任務來執行。

接下來來看看用ForkJoinPool來計算的代碼:

package com.test.executor.arrsum;

import java.util.concurrent.RecursiveTask;

/**
 * RecursiveTask 並行計算,同步有返回值
 * ForkJoin框架處理的任務基本都能使用遞歸處理,比如求斐波那契數列等,但遞歸算法的缺陷是:
 *    一只會只用單線程處理,
 *    二是遞歸次數過多時會導致堆棧溢出;
 * ForkJoin解決了這兩個問題,使用多線程並發處理,充分利用計算資源來提高效率,同時避免堆棧溢出發生。
 * 當然像求斐波那契數列這種小問題直接使用線性算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架,
 * 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數組排序。
 * 最佳應用場景:多核、多內存、可以分割計算再合並的計算密集型任務
 */
class LongSum extends RecursiveTask<Long> {

    static final int SEQUENTIAL_THRESHOLD = 1000;
    static final long NPS = (1000L * 1000 * 1000);
    static final boolean extraWork = true; // change to add more than just a sum


    int low;
    int high;
    int[] array;

    LongSum(int[] arr, int lo, int hi) {
        array = arr;
        low = lo;
        high = hi;
    }

    /**
     * fork()方法:將任務放入隊列並安排異步執行,一個任務應該只調用一次fork()函數,除非已經執行完畢並重新初始化。
     * tryUnfork()方法:嘗試把任務從隊列中拿出單獨處理,但不一定成功。
     * join()方法:等待計算完成並返回計算結果。
     * isCompletedAbnormally()方法:用於判斷任務計算是否發生異常。
     */
    protected Long compute() {

        if (high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for (int i = low; i < high; ++i) {
                sum += array[i];
            }
            return sum;

        } else {
            int mid = low + (high - low) / 2;
            LongSum left = new LongSum(array, low, mid);
            LongSum right = new LongSum(array, mid, high);
            left.fork();
            right.fork();
            long rightAns = right.join();
            long leftAns = left.join();
            return leftAns + rightAns;
        }
    }
}

       package com.test.executor.arrsum;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

import com.test.executor.arrsum.utils.Utils;

public class LongSumMain {

    //獲取邏輯處理器數量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /** for time conversion */
    static final long NPS = (1000L * 1000 * 1000);

    static long calcSum;

    static final boolean reportSteals = true;

    public static void main(String[] args) throws Exception {
        int[] array = Utils.buildRandomIntArray(20000000);
        System.out.println("cpu-num:"+NCPU);
        //單線程下計算數組數據總和
         calcSum = seqSum(array);
        System.out.println("seq sum=" + calcSum);

        //采用fork/join方式將數組求和任務進行拆分執行,最后合並結果
        LongSum ls = new LongSum(array, 0, array.length);
          ForkJoinPool fjp  = new ForkJoinPool(NCPU); //使用的線程數
        ForkJoinTask<Long> task = fjp.submit(ls);
        System.out.println("forkjoin sum=" + task.get());

        if(task.isCompletedAbnormally()){
            System.out.println(task.getException());
        }

        fjp.shutdown();

    }


    static long seqSum(int[] array) {
        long sum = 0;
        for (int i = 0; i < array.length; ++i)
            sum += array[i];
        return sum;
    }

}

以上的運行結果就很快:

cpu-num:4
seq sum=989877234
forkjoin sum=989877234

 Fork/Join框架原理

異常處理

   ForkJoinTask在執行任務的時候可能會拋異常,此時我們沒有辦法從主線程里面獲取異常,所以我們使用以下幾種方法來判斷以及獲取異常:

  1、isCompletedAbnormally()方法來判斷任務有沒有拋出異常或者被取消。

  2、getException()可以獲取到異常。

  3、isCompletedNormally()這個方法是看任務是否正常執行完成且沒有任何異常。

  示例:

if(task.isCompletedAbnormally())
   System.out.print(task.getException());

ForkJoinPool構造方法

 public ForkJoinPool() {
        this(Runtime.getRuntime().availableProcessors(),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
 public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        Thread.UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        checkPermission();
        if (factory == null)
            throw new NullPointerException();
        if (parallelism <= 0 || parallelism > MAX_ID)
            throw new IllegalArgumentException();
        this.parallelism = parallelism;
        this.factory = factory;
        this.ueh = handler;
        this.locallyFifo = asyncMode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
        // initialize workers array with room for 2*parallelism if possible
        int n = parallelism << 1;
        if (n >= MAX_ID)
            n = MAX_ID;
        else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
            n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
        }
        workers = new ForkJoinWorkerThread[n + 1];
        this.submissionLock = new ReentrantLock();
        this.termination = submissionLock.newCondition();
        StringBuilder sb = new StringBuilder("ForkJoinPool-");
        sb.append(poolNumberGenerator.incrementAndGet());
        sb.append("-worker-");
        this.workerNamePrefix = sb.toString();
    }

重要參數說明:

1、parallelism:並行數。一般跟CPU個數保持一致。通過Runtime.getRuntime().availableProcessors()可以獲取到當前機器的CPU個數。

2、ForkJoinWorkerThreadFactory factory:創建線程的工廠

3、Handler  :線程異常處理器,Thread.UncaughtExceptionHandler ,該處理器在線程執行任務時由於某些無法預料到的錯誤而導致任務線程中斷時進行一些處理,默認情況為null。

 4、boolean asyncMode: 表示工作線程內的任務隊列是采用何種方式進行調度,可以是先進先出FIFO,也可以是先進后出FILO.如果為true,則表示線程池中的線程使用先進先出的方式進行調度,默認為false.

ForkJoinTask fork()/join()方法

1、fork():這個方法的作用就是將任務放到當前線程的工作隊列當中去;

public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

2、join()的方法我們先看一下代碼:

 private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

  */
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }
public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

1、檢查調用Join()方法的線程是否是ForkJoinWorkerThread,如果不是的話就阻塞當前線程,等待任務完成,如果是則不阻塞;

2、判斷任務的狀態,是否已經完成,如果已經完成,則返回結果;

3、任務沒有完成,判斷任務是否處於自己的隊列當中,如果是,就取出執行完任務;

4、任務沒有在自己隊列當中,則說明任務被偷走,找到偷走任務的小偷,竊取小偷工作隊列中的任務,並執行,幫助小偷快點完成待join的任務;

5、若小偷偷走的任務已經完成,則找到小偷的小偷,幫助他完成任務;

6、遞歸執行5;

總體歸納起來的流程如下:

 

 

 ForkJoinPool 之submit()方法

  public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
        return task;
    }
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

ForkJoinPool有自己的工作隊列,這些工作對列是用來接收由外部線程(非ForkJoinThread)提交過來的任務,這個對列稱為submittingQueue。submit()和fork()沒有本質的區別,只是提交對象是submittingQueue.submittingQueue也是工作線程竊取對象,當其中的任務被工作線程竊取成功的時候,代表提交任務正式進入執行階段。

 

Fork/Join框架執行流程

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM