線程池ForkJoinPool簡介


ForkJoinPool線程池最大的特點就是分叉(fork)合並(join),將一個大任務拆分成多個小任務,並行執行,再結合工作竊取模式(worksteal)提高整體的執行效率,充分利用CPU資源。

一. 應用場景

ForkJoinPool使用分治算法,用相對少的線程處理大量的任務,將一個大任務一拆為二,以此類推,每個子任務再拆分一半,直到達到最細顆粒度為止,即設置的閾值停止拆分,然后從最底層的任務開始計算,往上一層一層合並結果,簡單的流程如下圖:

從圖中可以看出ForkJoinPool要先執行完子任務才能執行上一層任務,所以ForkJoinPool適合在有限的線程數下完成有父子關系的任務場景,比如:快速排序,二分查找,矩陣乘法,線性時間選擇等場景,以及數組和集合的運算。

下面是個簡單的代碼示例計算從1到1億之間所有數字之和:

package com.javakk;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * ForkJoinPool求和
 * @author 老K
 */
public class ForkJoinPoolTest {

    private static ForkJoinPool forkJoinPool;

    /**
     * 求和任務類繼承RecursiveTask
     * ForkJoinTask一共有3個實現:
     * RecursiveTask:有返回值
     * RecursiveAction:無返回值
     * CountedCompleter:無返回值任務,完成任務后可以觸發回調
     */
    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        /**
         * ForkJoin執行任務的核心方法
         * @return
         */
        @Override
        protected Long compute() {
            if (to - from < 10) { // 設置拆分的最細粒度,即閾值,如果滿足條件就不再拆分,執行計算任務
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            } else { // 否則繼續拆分,遞歸調用
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public static void main(String[] args) {
        // 也可以jdk8提供的通用線程池ForkJoinPool.commonPool
        // 可以在構造函數內指定線程數
        forkJoinPool = new ForkJoinPool();
        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();
        // 這里可以調用submit方法返回的future,通過future.get獲取結果
        Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));
        forkJoinPool.shutdown();
        System.out.println("最終結果:"+result);
        System.out.println("活躍線程數:"+forkJoinPool.getActiveThreadCount());
        System.out.println("竊取任務數:"+forkJoinPool.getStealCount());
    }
}

輸出結果(活躍線程數和竊取任務會根據本地環境和任務執行情況變化):

最終結果:5000000050000000
活躍線程數:4
竊取任務數:12

上例中在compute方法里拆分的最小粒度是10個元素,大家可以改成其他的值試下,會發現執行的效率差別很大,所以要注意拆分粒度對性能的影響。

ForkJoinPool內部的隊列能夠保證執行任務的順序,至於為什么它能夠在有限的線程數量下完成非常多的任務,后面會講到。

二. 與ThreadPoolExecutor原生線程池的區別

ForkJoinPool和ThreadPoolExecutor都實現了ExecutorExecutorService接口,都可以通過構造函數設置線程數,threadFactory,可以查看ForkJoinPool.makeCommonPool()方法的源碼查看通用線程池的構造細節。

在內部結構上我覺得兩個線程池最大的區別是在工作隊列的設計上,如下圖

ThreadPoolExecutor:

ForkJoinPool:

圖上細節畫的不嚴謹,但大致能看出區別:

  • ForkJoinPool每個線程都有自己的隊列
  • ThreadPoolExecutor共用一個隊列

通過上面的代碼示例可以看到使用ForkJoinPool可以在有限的線程數下來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過2000萬個任務。但是使用ThreadPoolExecutor是不可能的,因為ThreadPoolExecutor中的線程無法選擇優先執行子任務,要完成2000萬個具有父子關系的任務時,就需要2000萬個線程,這樣會導致ThreadPoolExecutor的任務隊列撐滿或創建的最大線程數把內存撐爆直接gg。

ForkJoinPool最適合計算密集型任務,而且最好是非阻塞任務,之前的一篇文章:Java踩坑記系列之線程池 也說了線程池的不同使用場景和注意事項。

所以ForkJoinPool是ThreadPoolExecutor線程池的一種補充,是對計算密集型場景的加強。

三. 工作竊取的實現原理

第一節的代碼示例輸出結果顯示活躍線程是4個,但卻完成了2000萬個子任務,竊取任務是12個(竊取數跟拆分層級和計算復雜度有關),這是work steal工作竊取的作用。

ForkJoinPool類中的WorkQueue正是實現工作竊取的隊列,javadoc中的注釋如下:

大意是大多數操作都發生在工作竊取隊列中(在嵌套類工作隊列中)。這些是特殊形式的Deques,主要有pushpoppoll操作。

Deque是雙端隊列(double ended queue縮寫),頭部和尾部任何一端都可以進行插入,刪除,獲取的操作,即支持FIFO(隊列)也支持LIFO(棧)順序。

Deque接口的實現最常見的是LinkedList,除此還有ArrayDequeConcurrentLinkedDeque

工作竊取模式主要分以下幾個步驟:

  1. 每個線程都有自己的雙端隊列
  2. 當調用fork方法時,將任務放進隊列頭部,線程以LIFO順序,使用push/pop方式處理隊列中的任務
  3. 如果自己隊列里的任務處理完后,會從其他線程維護的隊列尾部使用poll的方式竊取任務,以達到充分利用CPU資源的目的
  4. 從尾部竊取可以減少同原線程的競爭
  5. 當隊列中剩最后一個任務時,通過cas解決原線程和竊取線程的競爭

流程大致如下所示:

工作竊取便是ForkJoinPool線程池的優勢所在,在一般的線程池比如ThreadPoolExecutor中,如果一個線程正在執行的任務由於某種原因無法繼續運行,那么該線程會處於等待狀態,包括singleThreadPoolfixedThreadPoolcachedThreadPool這幾種線程池。

而在ForkJoinPool中,那么線程會主動尋找其他尚未被執行的任務然后竊取過來執行,減少線程等待時間。

JDK8中的並行流(parallelStream)功能是基於ForkJoinPool實現的,另外還有java.util.concurrent.CompletableFuture異步回調future,內部使用的線程池也是ForkJoinPool。

文章來源:http://javakk.com/215.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM