生產問題之CompletableFuture默認線程池踩坑,請務必自定義線程池


前言

先說結論,沒興趣了解原因的可以只看此處的結論

CompletableFuture是否使用默認線程池的依據,和機器的CPU核心數有關。當CPU核心數-1大於1時,才會使用默認的線程池,否則將會為每個CompletableFuture的任務創建一個新線程去執行

即,CompletableFuture的默認線程池,只有在雙核以上的機器內才會使用。在雙核及以下的機器中,會為每個任務創建一個新線程,等於沒有使用線程池,且有資源耗盡的風險

因此建議,在使用CompletableFuture時,務必要自定義線程池。因為即便是用到了默認線程池,池內的核心線程數,也為機器核心數-1。也就意味着假設你是4核機器,那最多也只有3個核心線程,對於CPU密集型的任務來說倒還好,但是我們平常寫業務代碼,更多的是IO密集型任務,對於IO密集型的任務來說,這其實遠遠不夠用的,會導致大量的IO任務在等待,導致吞吐率大幅度下降,即默認線程池比較適用於CPU密集型任務。

背景

最近接到一個工作任務,由於我們之前的下單接口速度過慢,光是下單接口需要1500ms左右,因此需要做一些優化,在梳理完業務邏輯后,發現有一些可以並行查詢或者異步執行的地方。於是打算采用CompletableFuture來做異步優化,提高執行速度。代碼示例如下

 			//查詢用戶信息
            CompletableFuture<JSONObject> userInfoFuture = CompletableFuture
                .supplyAsync(() -> proMemberService.queryUserByOpenIdInner(ordOrder.getOpenId()));
            //查詢積分商品信息
            CompletableFuture<JSONObject> integralProInfoFuture = CompletableFuture
                .supplyAsync(() -> proInfoService
                    .getIntegralInfoCache(ordOrderIntegral.getProId()));

            //查詢會員積分信息
            CompletableFuture<Integer> integerFuture = CompletableFuture
                .supplyAsync(() -> proMemberService
                    .getTotalIntegralByOpenId(ordOrder.getOpenId()));
          

經過

於是一頓操作,優化完畢,執行速度從1500ms下降到300ms左右,在經過本地和測試環境后,上線生產。眾所周知,CompletableFuture在沒有指定線程池的時候,會使用一個默認的ForkJoinPool線程池,也就是下面這個玩意。

 	public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
    }

等發了生產之后看日志打印的線程號,卻發現了一個極其詭異的事情。明明是同一套代碼,生產環境的沒有用到默認的線程池。而測試環境和本地環境都使用了默認的ForkJoinPool線程池

這是測試和本地環境打印的線程日志

這是生產環境打印的線程日志

從日志打印的線程編號可以看到,測試和本地環境都是從ForkJoinPool中取工作線程,但是生產環境卻是為每個任務創建了一個全新的線程。這是一個很危險的行為,假如這是一個並發比較高的接口,並且該接口使用了比較多的CompletableFuture來並行的執行任務。在高並發的時候,為每個任務都創建一個子線程,就會存在線程資源被耗盡的可能性,從而導致服務器崩潰。

那這是為什么呢?明明是同一套代碼,在不同的機器上卻有不同的線程使用情況。

原因

在帶着疑問翻閱了CompletableFuture的源碼之后,終於找到了原因:【是否使用默認的ForkJoinPool線程池,和機器的配置有關】

我們點進supplyAsync方法的源碼

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    

可以看到這里使用了默認使用了一個asyncPool,點進這個asyncPool

  //是否使用默認線程池的判斷依據
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//useCommonPool的來源
 private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

其實代碼看到這里就很清晰了,CompletableFuture是否使用默認線程池,是根據這個useCommonPool的boolean值來的,如果為true,就使用默認的ForkJoinPool,否則就為每個任務創建一個新線程,也就是這個ThreadPerTaskExecutor,見名知義。

那這個useCommonPool的布爾值什么情況下才為true,也就是什么時候才能使用到默認的線程池呢。即getCommonPoolParallelism()返回的值要大於1,我們繼續跟進這個getCommonPoolParallelism()方法

//類頂SMASK常量的值
static final int SMASK  = 0xffff;   
final int config;
static final ForkJoinPool common;

//該方法返回了一個commonParallelism的值
public static int getCommonPoolParallelism() {
        return commonParallelism;
    }


    //而commonParallelism的值是在一個靜態代碼塊里被初始化的,也就是類加載的時候初始化
static {
    	//初始化common,這個common即ForkJoinPool自身
        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
    //根據par的值來初始化commonParallelism的值
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }

總結一下上面三部分代碼,結合在一起看,這部分代碼主要是初始化了commonParallelism的值,也就是getCommonPoolParallelism()方法的返回值,這個返回值也決定了是否使用默認線程池。而commonParallelism的值又是通過par的值來確定的,par的值是common來確定的,而common則是在makeCommonPool()這個方法中初始化的。

我們繼續跟進makeCommonPool()方法

private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
       
        if (parallelism < 0 && // default 1 less than #cores
            //獲取機器的cpu核心數 將機器的核心數-1 賦值給parallelism 這一段是是否使用線程池的關鍵
            //同時 parallelism也是ForkJoinPool的核心線程數
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

//上面的那個構造方法,可以看到把parallelism賦值給了config變量
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

總結一下上面兩段代碼,獲取機器核心數-1的值,賦值給parallelism變量,再通過構造方法把parallelism的值賦值給config變量。

然后初始化ForkJoinPool的時候。再將config的值賦值給par變量。如果par大於0則將par的值賦給commonParallelism,如果commonParallelism的值大於1的話,useCommonPool的值就為true,就使用默認的線程池,否則就為每個任務創建一個新線程。另外即便是用到了默認線程池,池內的核心線程數,也為機器核心數-1。也就意味着假設你是4核機器,那最多也只有3個核心線程,對於IO密集型的任務來說,這其實遠遠不夠的

解釋

以上就是CompletableFuture中默認線程池使用依據的源碼分析了。看完這一系列源碼,就能解釋文章一開頭出現的那個問題。

因為我本地和測試環境機器的核心數是4核的,4減1大於1,所以在本地和測試環境的日志上可以看出,使用了默認的線程池ForkJoinPool,而我們生產環境是雙核的機器。2減1不大於1,所以從生產環境的日志看出,是為每個任務都創建了一個新線程。

總結

  • 使用CompletableFuture一定要自定義線程池
  • CompletableFuture是否使用默認線程池和機器核心數有關,當核心數減1大於1時才會使用默認線程池,否則將為每個任務創建一個新線程去處理
  • 即便使用到了默認線程池,池內最大線程數也是核心數減1,對io密集型任務是遠遠不夠的,會令大量任務等待,降低吞吐率
  • ForkJoinPool比較適用於CPU密集型的任務,比如說計算。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM