Fork/Join 框架框架使用


1、介紹

Fork/Join 框架是 Java7 提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。在多核計算機中正確使用可以很好的發揮cpu的作用,提高程序的執行效率。框架采用工作竊取算法,當有子任務線程處理完當前任務時,它會從其他線程執行的任務隊列里竊取任務來執行,從而提高整體的執行效率。為了減少線程間的任務資源競爭,隊列通常使用雙端隊列,別竊取任務線程永遠從啥UN廣大UN隊列的嘔吐不獲取任務執行,而竊取任務的線程永遠從雙端隊列的尾部獲取任務執行。

2、使用

根據業務場景來考慮是否需要使用Fork/Join框架來進行任務的拆分和匯總操作。當需要時,比如說需要執行一個很大的業務計算之類的,此時使用Fork/Join框架分以下兩步:

  • 對任務進行分割        把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小
  • 執行分割的子任務並匯總結果       分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執行。子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合並這些數據

具體實現以上兩步:

  1. 創建ForkJoinTask         它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join 框架提供了以下兩個子類:   RecursiveAction:用於沒有返回結果的任務   RecursiveTask :用於有返回結果的任務
  2. 使用ForkJoinPool執行ForkJoinTask       任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務

3、示例

3.1 使用fork

public class SumTask extends RecursiveTask <Integer>{
    private static final Integer THRESHOLD = 1000;
    private int start;
    private int end;
    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        Integer sum = 0;
        boolean isOk = (end - start) <= THRESHOLD;
        if(isOk) {
            for(int i = start; i <= end; i ++) {
                sum += i;
            }
            return sum;
        }

        int middle = (end + start) / 2;
        //子任務遞歸
        SumTask sumSubTask = new SumTask(start, middle);
        SumTask sumSubTask1 = new SumTask(middle + 1, end);

        //fork子任務
        sumSubTask.fork();
        sumSubTask1.fork();

        //join子任務
        Integer join = sumSubTask.join();
        Integer join1 = sumSubTask1.join();

        sum = join + join1;
        //計算結果
        return sum;
    }
}

3.2 使用invokeAll

public class SumTask2 extends RecursiveTask <Integer>{
    private static final Integer THRESHOLD = 1000;
    private int start;
    private int end;
    public SumTask2(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        Integer sum = 0;
        boolean isOk = end - start <= THRESHOLD;
        if(isOk) {
            for(int i = start; i <= end; i ++) {
                sum += i;
            }
//            System.out.println(String.format("compute %d-%d = %d", start, end, sum));
            return sum;
        }

        //除以2
        int middle = (end + start) / 2;
        //子任務遞歸
//        System.out.println(String.format("fork %d-%d => %d-%d&%d-%d", start, end, start, middle - 1, middle, end));
        SumTask2 sumSubTask = new SumTask2(start, middle - 1);
        SumTask2 sumSubTask1 = new SumTask2(middle, end);

        //fork子任務
        invokeAll(sumSubTask, sumSubTask1);

        //join子任務
        Integer join = sumSubTask.join();
        Integer join1 = sumSubTask1.join();

        sum = join + join1;
        //計算結果
        return sum;
    }
}

測試

     ForkJoinPool fjp2 = new ForkJoinPool();
        SumTask2 sumTask2 = new SumTask2(start, end);
        long begin3 = System.currentTimeMillis();
        Integer invoke = fjp2.invoke(sumTask2);
        long end3 = System.currentTimeMillis();
        System.out.println("計算結果3為 sum = " + invoke + ",計算時長為" + begin3 + "-" + end3 + "---  " + (end3 - begin3) + "ms");

        ForkJoinPool fjp = new ForkJoinPool();
        long begin2 = System.currentTimeMillis();
        SumTask sumTask = new SumTask(start, end);
        ForkJoinTask<Integer> submit = fjp.submit(sumTask);
        Integer join = submit.join();
        long end2 = System.currentTimeMillis();
        System.out.println("計算結果2為 sum = " + join + ",計算時長為" + begin2 + "-" + end2 + "---   " + (end2 - begin2) + "ms");
    

結果

從結果(可以多次運行測試)可以看出,使用invokeAll方式效率比使用單獨fork方式高,所以在使用時盡量采用invokeAll方式,這樣可以充分利用線程池中的線程去執行任務。

 

源碼參照Github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM