多線程:Fork和Join詳解(帶例子)


首先我們得先明白這兩個單詞是什么意思:

Fork:叉 ; 分岔處,分流處,岔口,岔路 ; 分岔 ;

Join:連接;   接合;   聯結;   結合;   聯合;   匯合;  加入;

好了,看到這倆的翻譯,是不是有所頓悟呢?請君繼續看......

1. Java並發的發展歷程

–Java 1 支持threads, locks, condition queues

–Java 5 引入了 thread pools, blocking queues, concurrent collections

–Java 7 加入了fork-join庫

–Java 8 加入了 parallel streams

2. 基本思想

Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

分治法:把一個規模大的問題划分為規模較小的子問題,然后分而治之,最后合並子問題的解得到原問題的解。

步驟:

  1. 分割原問題;
  2. 求解子問題;
  3. 合並子問題的解為原問題的解

在分治法中,子問題一般是相互獨立的,因此,經常通過遞歸調用算法來求解子問題。

3. 分治法典型應用

(1)二分搜索

(2)大整數乘法

(3)Strassen矩陣乘法

(4)棋盤覆蓋

(5)合並排序

(6)快速排序

(7)線性時間選擇

(8)漢諾塔

4. Fork/Join框架的實現原理

4.1 ForkJoinPool

ForkJoinPool 是實現Fork,Join的線程池 - ThreadPool

ForkJoinTask 一個描述ForkJoin的抽象類 Runnable/Callable

RecursiveAction 返回結果的ForkJoinTask實現Runnable

RecursiveTask<V>返回結果的ForkJoinTask實現Callable

CountedCompleter<T> 在任務完成執行后會觸發執行一個自定義的鈎子函數

ForkJoinPool由ForkJoinTask數組和ForkJoinWorkerThread數組組成,ForkJoinPool中的所有的工作線程均有一個自己的工作隊列WorkQueue:

  • ForkJoinTask數組負責將存放程序提交給ForkJoinPool,ForkJoinTask封裝了數據及其相應的計算;
  • ForkJoinWorkerThread負責執行這些任務;

ForkJoinTask主要包括兩個方法分別實現任務的分拆與合並:

  1、fork()類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作隊列中;

  2、跟Thread.join()不同,ForkJoinTask的join()方法並不簡單的阻塞線程;

4.2 Fork方法的實現原理

ForkJoinTask的Fork方法的實現原理:

當我們調用ForkJoinTask的fork方法時,程序會把任務放在ForkJoinWorkerThread的pushTask的workQueue中,異步地執行這個任務,然后立即返回結果,代碼如下:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

pushTask方法把當前任務存放在ForkJoinTask數組隊列里。然后再調用ForkJoinPool的signalWork()方法喚醒或創建一個工作線程來執行任務。代碼如下:

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}

4.3 join方法的實現原理

ForkJoinTask的join方法實現原理

Join方法的主要作用是阻塞當前線程等待獲取結果。讓我們一起看看ForkJoinTask的join方法的實現,代碼如下:

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

它首先調用doJoin方法,通過doJoin()方法得到當前任務的狀態來判斷返回什么結果,任務狀態有4種:

  • 已完成(NORMAL):直接返回任務結果;
  • 被取消(CANCELLED):直接拋出CancellationException;
  • 信號(SIGNAL)
  • 出現異常(EXCEPTIONAL):直接拋出對應的異常

讓我們分析一下doJoin方法的實現:

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}


final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

在doJoin()方法里,首先通過查看任務的狀態,看任務是否已經執行完成,

  • 如果執行完成,則直接返回任務狀態;
  • 如果沒有執行完,則從任務數組里取出任務並執行;
  • 如果任務順利執行完成,則設置任務狀態為NORMAL;
  • 如果出現異常,則記錄異常,並將任務狀態設置為EXCEPTIONAL。

4.4 Fork/Join框架的異常處理

ForkJoinTask在執行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:

if(task.isCompletedAbnormally()){
    System.out.println(task.getException());
}

5. 舉個例子

使用Fork/Join框架首先要考慮到的是如何分割任務,如果希望每個子任務最多執行兩個數的相加,那么我們設置分割閾值是2,由於是4個數字相加,所以Fork/Join框架會把這個任務fork成兩個子任務,子任務一負責計算1+2,子任務二負責計算3+4,然后再join兩個子任務的結果。因為是有結果的任務,所以必須繼承RecursiveTask

計算1+2+...+10000實現代碼如下:

public class CountTask extends RecursiveTask<Integer> {
    private static final int THREAD_HOLD = 2;//設置分割閾值

    private int start;
    private int end;

    public CountTask(int start,int end){
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算
        boolean canCompute = (end - start) <= THREAD_HOLD;
        if(canCompute){
            for(int i=start;i<=end;i++){
                sum += i;
            }
        }else{
            int middle = (start + end) / 2;
            CountTask left = new CountTask(start,middle);
            CountTask right = new CountTask(middle+1,end);
            //執行子任務
            left.fork();
            right.fork();
            //獲取子任務結果
            int lResult = left.join();
            int rResult = right.join();
            sum = lResult + rResult;
        }
        return sum;
    }

    public static void main(String[] args){
        ForkJoinPool pool = new ForkJoinPool();
        CountTask task = new CountTask(1,10000);
        Future<Integer> result = pool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

 

 

 

 參考及致謝:

1.Fork/Join框架基本使用(里面有一個歸並排序的過程圖,很好)

2.多線程(五) Fork/Join框架介紹及實例講解(里面有任務竊取算法思想)

3.Fork-join原理解析(主要借鑒,特別感謝)

 

Over......


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM