ForkJoinPool線程池


介紹

分而治之是一個有效的處理大數據的方法,著名的MapReduce就是采用這種分而治之的思路。簡單的說,如果要處理1000個數據,但是我們不具備處理1000個數據的能力,只可以處理10個數據。我們可以將這個任務分成100份,每份處理10個,並將最后的結果進行合成,形成1000個數據的處理結果。

把一個大任務調用fork()方法分解為若干小的任務,把小的任務處理結果進行join()合並為大任務的結果。

image-20210309161832822

ForkJoinPool線程池最常用的方法:

//向線程池提交一個ForkJoinTask任務,
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

ForkJoinTask支持fork()分解與join()等待的任務。

它有兩個重要子類:RecursiveAction和RecursiveTask。它們區別在於RecursiveAction任務沒有返回值,RecursiveTask任務帶有返回值。

ForkJoinPool 的工作特點 是“工作竊取”,何為工作竊取,ForkJoinPool底層維護着一個雙端隊列,當一個線程的任務隊列執行完畢后,其他線程的任務隊列還沒有執行完畢,這時,已經執行完畢的線程就會到另一個線程的雙端任務隊列的尾部去偷取任務執行。

image-20210309171014650

基本使用

以等查數列求和為例:

public class PoolDemo {

    private static class CountTask extends RecursiveTask<Long> {

        //定義數據規模的閾值,允許計算10000個數內的和,超過該閾值需要分解
        //如果閾值太小,1.會導致系統內線程數量會越積越多,導致性能下降
        //2.分解次數過多,方法調用過多,可能會導致棧溢出
        private static final int THRESHOLD = 10000;

        //每次把大任務分解為100個小任務
        private static final int TASKNUM = 100;

        private long start;

        private long end;

        public CountTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long sum = 0;

            if(end - start < THRESHOLD){
               for(long i =start; i<=end;i++){
                   sum += i;
               }
            }else{
                //數值超過閾值,需要分解
                //約定每次分解成100個任務,計算每個任務的計算量
                long step = (start + end) /100;
                ArrayList<CountTask> subList = new ArrayList<>();
                long pos = start;
                for(long i =0; i < TASKNUM; i++){
                    long lastOne = pos + step;
                    //調整最后一個任務的結束位置
                    if(lastOne > end){
                        lastOne = end;
                    }
                    //創建子任務
                    CountTask task = new CountTask(pos, lastOne);
                    subList.add(task);
                    //提交子任務
                    task.fork();
                    //調整下個任務的起始位置
                    pos += step + 1;
                }

                //合並計算結果
                for (CountTask countTask : subList) {
                    sum += countTask.join();
                }
            }
            return sum;
        }
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        long end = 200000;
        CountTask task = new CountTask(0, end);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            Long aLong = result.get();
            System.out.println(aLong);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM