前言
先說結論,沒興趣了解原因的可以只看此處的結論
CompletableFuture是否使用默認線程池的依據,和機器的CPU核心數有關。當CPU核心數-1大於1時,才會使用默認的線程池,否則將會為每個CompletableFuture的任務創建一個新線程去執行。
即,CompletableFuture的默認線程池,只有在雙核以上的機器內才會使用。在雙核及以下的機器中,會為每個任務創建一個新線程,等於沒有使用線程池,且有資源耗盡的風險。
因此建議,在使用CompletableFuture時,務必要自定義線程池。因為即便是用到了默認線程池,池內的核心線程數,也為機器核心數-1。也就意味着假設你是4核機器,那最多也只有3個核心線程,對於CPU密集型的任務來說倒還好,但是我們平常寫業務代碼,更多的是IO密集型任務,對於IO密集型的任務來說,這其實遠遠不夠用的,會導致大量的IO任務在等待,導致吞吐率大幅度下降,即默認線程池比較適用於CPU密集型任務。
背景
最近接到一個工作任務,由於我們之前的下單接口速度過慢,光是下單接口需要1500ms左右,因此需要做一些優化,在梳理完業務邏輯后,發現有一些可以並行查詢或者異步執行的地方。於是打算采用CompletableFuture來做異步優化,提高執行速度。代碼示例如下
//查詢用戶信息
CompletableFuture<JSONObject> userInfoFuture = CompletableFuture
.supplyAsync(() -> proMemberService.queryUserByOpenIdInner(ordOrder.getOpenId()));
//查詢積分商品信息
CompletableFuture<JSONObject> integralProInfoFuture = CompletableFuture
.supplyAsync(() -> proInfoService
.getIntegralInfoCache(ordOrderIntegral.getProId()));
//查詢會員積分信息
CompletableFuture<Integer> integerFuture = CompletableFuture
.supplyAsync(() -> proMemberService
.getTotalIntegralByOpenId(ordOrder.getOpenId()));
經過
於是一頓操作,優化完畢,執行速度從1500ms下降到300ms左右,在經過本地和測試環境后,上線生產。眾所周知,CompletableFuture在沒有指定線程池的時候,會使用一個默認的ForkJoinPool線程池,也就是下面這個玩意。
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
等發了生產之后看日志打印的線程號,卻發現了一個極其詭異的事情。明明是同一套代碼,生產環境的沒有用到默認的線程池。而測試環境和本地環境都使用了默認的ForkJoinPool線程池,
這是測試和本地環境打印的線程日志
這是生產環境打印的線程日志
從日志打印的線程編號可以看到,測試和本地環境都是從ForkJoinPool中取工作線程,但是生產環境卻是為每個任務創建了一個全新的線程。這是一個很危險的行為,假如這是一個並發比較高的接口,並且該接口使用了比較多的CompletableFuture來並行的執行任務。在高並發的時候,為每個任務都創建一個子線程,就會存在線程資源被耗盡的可能性,從而導致服務器崩潰。
那這是為什么呢?明明是同一套代碼,在不同的機器上卻有不同的線程使用情況。
原因
在帶着疑問翻閱了CompletableFuture的源碼之后,終於找到了原因:【是否使用默認的ForkJoinPool線程池,和機器的配置有關】
我們點進supplyAsync方法的源碼
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
可以看到這里使用了默認使用了一個asyncPool,點進這個asyncPool
//是否使用默認線程池的判斷依據
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
//useCommonPool的來源
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
其實代碼看到這里就很清晰了,CompletableFuture是否使用默認線程池,是根據這個useCommonPool的boolean值來的,如果為true,就使用默認的ForkJoinPool,否則就為每個任務創建一個新線程,也就是這個ThreadPerTaskExecutor,見名知義。
那這個useCommonPool的布爾值什么情況下才為true,也就是什么時候才能使用到默認的線程池呢。即getCommonPoolParallelism()返回的值要大於1,我們繼續跟進這個getCommonPoolParallelism()方法
//類頂SMASK常量的值
static final int SMASK = 0xffff;
final int config;
static final ForkJoinPool common;
//該方法返回了一個commonParallelism的值
public static int getCommonPoolParallelism() {
return commonParallelism;
}
//而commonParallelism的值是在一個靜態代碼塊里被初始化的,也就是類加載的時候初始化
static {
//初始化common,這個common即ForkJoinPool自身
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
//根據par的值來初始化commonParallelism的值
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;
}
總結一下上面三部分代碼,結合在一起看,這部分代碼主要是初始化了commonParallelism的值,也就是getCommonPoolParallelism()方法的返回值,這個返回值也決定了是否使用默認線程池。而commonParallelism的值又是通過par的值來確定的,par的值是common來確定的,而common則是在makeCommonPool()這個方法中初始化的。
我們繼續跟進makeCommonPool()方法
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
if (parallelism < 0 && // default 1 less than #cores
//獲取機器的cpu核心數 將機器的核心數-1 賦值給parallelism 這一段是是否使用線程池的關鍵
//同時 parallelism也是ForkJoinPool的核心線程數
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
//上面的那個構造方法,可以看到把parallelism賦值給了config變量
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
總結一下上面兩段代碼,獲取機器核心數-1的值,賦值給parallelism變量,再通過構造方法把parallelism的值賦值給config變量。
然后初始化ForkJoinPool的時候。再將config的值賦值給par變量。如果par大於0則將par的值賦給commonParallelism,如果commonParallelism的值大於1的話,useCommonPool的值就為true,就使用默認的線程池,否則就為每個任務創建一個新線程。另外即便是用到了默認線程池,池內的核心線程數,也為機器核心數-1。也就意味着假設你是4核機器,那最多也只有3個核心線程,對於IO密集型的任務來說,這其實遠遠不夠的。
解釋
以上就是CompletableFuture中默認線程池使用依據的源碼分析了。看完這一系列源碼,就能解釋文章一開頭出現的那個問題。
因為我本地和測試環境機器的核心數是4核的,4減1大於1,所以在本地和測試環境的日志上可以看出,使用了默認的線程池ForkJoinPool,而我們生產環境是雙核的機器。2減1不大於1,所以從生產環境的日志看出,是為每個任務都創建了一個新線程。
總結
- 使用CompletableFuture一定要自定義線程池
- CompletableFuture是否使用默認線程池和機器核心數有關,當核心數減1大於1時才會使用默認線程池,否則將為每個任務創建一個新線程去處理
- 即便使用到了默認線程池,池內最大線程數也是核心數減1,對io密集型任務是遠遠不夠的,會令大量任務等待,降低吞吐率
- ForkJoinPool比較適用於CPU密集型的任務,比如說計算。