ForkJoinPool及並行流解析


parallelStream原理。

parallelStream是並行流,依賴jdk1.7出現的Fork/Join框架。

Fork/Join框架的核心是工作竊取(work-stealing)算法。那么什么是工作竊取算法呢?假如我們有一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,於是把這些子任務分別放到不同的隊列里,並為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等着,不如去幫其他線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

ForkJoinPool是一個運行ForkJoinTask的線程池,同ThreadPoolExecutor一樣,也繼承了AbstractExecutorService。ForkJoinPool的每個工作線程都維護着一個工作隊列,這是一個雙端隊列Deque,里面存放着任務ForkJoinTask。每個工作線程在運行過程中,產生的新任務會放到工作隊列的隊尾。工作線程在處理自己工作隊列任務時,每次是從隊尾取任務。當自己的工作隊列清空后,會嘗試去竊取其他工作隊列的任務,且是從隊首竊取。

並行流處理過程中,用的ForkJoinTask是CountedCompleter的幾個子類,如forEach()操作對應的是ForEachTask,forEachOrdered()操作對應的是ForEachOrderedTask,reduce()操作對應的是ReduceTask。

以Lists.newArrayList(1, 2, 3).parallelStream().forEach(System.out::println);為例,用的ForkJoinPool實例是

跟到ForEachTask的compute()方法,ForEachTask 第283行,AbstractTask.suggestTargetSize(sizeEstimate);

AbstractTask的suggestTargetSize()方法實現是:

    public static long suggestTargetSize(long sizeEstimate) {
        long est = sizeEstimate / LEAF_TARGET;
        return est > 0L ? est : 1L;
    }

其中,LEAF_TARGET值定義是

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

這里就調用了ForkJoinPool的getCommonPoolParallelism()靜態方法。ForkJoinPool有一個static塊,里面調用ForkJoinPool的makeCommonPool()靜態方法給靜態的ForkJoinPool實例common賦值。makeCommonPool()方法內部調用了ForkJoinPool的private的構造方法,其中第一個參數並行度的值是CPU核心數-1。取CPU核心數的代碼是Runtime.getRuntime().availableProcessors()。

        if (parallelism < 0 &&
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;

我們如果想用ForkJoinPool實現自己的業務,則需要繼承ForkJoinTask。更簡單點,只需繼承ForkJoinTask的子類RecursiveTask或者RecursiveAction,重寫compute()方法即可。

案例見:https://blog.csdn.net/niyuelin1990/article/details/78658251


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM