多線程編程學習七( Fork/Join 框架).


一、介紹

使用 java8 lambda 表達式大半年了,一直都知道底層使用的是 Fork/Join 框架,今天終於有機會來學學 Fork/Join 框架了。

Fork/Join 框架是 Java 7 提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。

Fork/Join 的運行流程示意圖:

比如,一個 1+2+3+...+100 的工作任務,我們可以把它 Fork 成 10 個子任務,分別計算這 10 個子任務的運行結果。最后再把 10 個子任務的結果 Join 起來,匯總成最后的結果。

為了減少線程間的競爭,通常把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等着,不如去幫其它線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。線程的這種執行方式,我們稱之為“工作竊取”算法。

二、設計

實現 Fork/Join 框架的設計,大抵需要兩步:

1. 分割任務

首先我們需要創建一個 ForkJoin 任務,把大任務分割成子任務,如果子任務不夠小,則繼續往下分,直到分割出的子任務足夠小。

在 Java 中我們可以使用 ForkJoinTask 類,它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下,我們只需要繼承它的子類:

  • RecursiveAction — 用於沒有返回結果的任務
  • RecursiveTask — 用於有返回結果的任務

2. 任務執行並返回結果

分割的子任務分別放在雙端隊列里,然后啟動幾個線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據。

在 Java 中任務的執行需要通過 ForkJoinPool 來執行。

三、示例

來一個阿里面試題:百萬級 Integer 數據量的一個 array 求和。

public class ArrayCountTask extends RecursiveTask<Long> {
    /**
     * 閾值
     */
    private static final Integer THRESHOLD = 10000;

    private Integer[] array;
    private Integer start;
    private Integer end;

    public ArrayCountTask(Integer[] array, Integer start, Integer end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        // 最小子任務計算
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
        } else {
            // 把大於閾值的任務繼續往下拆分,有點類似遞歸的思維。 recursive 就是遞歸的意思。
            int middle = (start + end) >>> 1;
            ArrayCountTask leftArrayCountTask = new ArrayCountTask(array, start, middle);
            ArrayCountTask rightArrayCountTask = new ArrayCountTask(array, middle, end);
            // 執行子任務
            //leftArrayCountTask.fork();
            //rightArrayCountTask.fork();

            // invokeAll 方法使用
            invokeAll(leftArrayCountTask, rightArrayCountTask);

            //等待子任務執行完,並得到其結果
            Long leftJoin = leftArrayCountTask.join();
            Long rightJoin = rightArrayCountTask.join();
            // 合並子任務的結果
            sum = leftJoin + rightJoin;
        }

        return sum;
    }
}
    public static void main(String[] args) {
        // 1. 造一個 int 類型的百萬級別數組
        Integer[] array = new Integer[150000000];
        for (int i = 0; i < array.length; i++) {
            array[i] = new Random().nextInt(100);
        }
        // 2.普通方式計算結果
        long start = System.currentTimeMillis();
        long sum = 0;
        for (int i = 0; i < array.length; i++) {
            sum += array[i];
        }
        long end = System.currentTimeMillis();
        System.out.println("普通方式計算結果:" + sum + ",耗時:" + (end - start));
        long start2 = System.currentTimeMillis();
        // 3.fork/join 框架方式計算結果
        ArrayCountTask arrayCountTask = new ArrayCountTask(array, 0, array.length);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        sum = forkJoinPool.invoke(arrayCountTask);
        long end2 = System.currentTimeMillis();
        System.out.println("fork/join 框架方式計算結果:" + sum + ",耗時:" + (end2 - start2));

        // 結論:
        // 1. 電腦 i5-4300m,雙核四線程
        // 2. 數組量少的時候,fork/join 框架要進行線程創建/切換的操作,性能不明顯。
        // 3. 數組量超過 100000000,fork/join 框架的性能才開始體現。

    }

ForkJoinTask 與一般任務的主要區別在於它需要實現 compute 方法,在這個方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用 fork 方法時,又會進入 compute 方法,看看當前子任務是否需要繼續分割成子任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用 join 方法會等待子任務執行完並得到其結果。

在執行子任務時調用 fork 方法並不是最佳的選擇,最佳的選擇是 invokeAll 方法。因為執行 compute() 方法的線程本身也是一個 worker 線程,當對兩個子任務調用 fork() 時,這個worker 線程就會把任務分配給另外兩個 worker,但是它自己卻停下來等待不干活了!這樣就白白浪費了 Fork/Join 線程池中的一個 worker 線程,導致了4個子任務至少需要7個線程才能並發執行。

比如甲把 400 分成兩個 200 后,fork() 寫法相當於甲把一個 200 分給乙,把另一個 200 分給丙,然后,甲成了監工,不干活,等乙和丙干完了他直接匯報工作。乙和丙在把 200 分拆成兩個 100 的過程中,他倆又成了監工,這樣,本來只需要 4 個工人的活,現在需要 7 個工人才能完成,其中有3個是不干活的。

 

ForkJoinPool 由 ForkJoinTask 數組和 ForkJoinWorkerThread 數組組成。ForkJoinTask 數組負責將存放程序提交給 ForkJoinPool 的任務;而 ForkJoinWorkerThread 數組負責執行這些任務,ForkJoinWorkerThread 體現的就是“工作竊取”算法。

  • 當我們調用 ForkJoinTask 的 fork 方法時,程序會調用 ForkJoinWorkerThread 的 pushTask 方法異步地執行這個任務,然后立即返回結果。
  • 當我們調用 ForkJoinTask 的 join 方法時,程序會阻塞當前線程並等待獲取結果。

ForkJoinPool 使用 submit 或 invoke 提交的區別:invoke 同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit 是異步執行,只有在 Future 調用 get 的時候會阻塞。

ForkJoinPool 繼承自 AbstractExecutorService, 不是為了替代 ExecutorService,而是它的補充,在某些應用場景下性能比 ExecutorService 更好。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM