你好呀,我是歪歪。
前幾天和一個大佬聊天的時候他說自己最近在做線程池的監控,剛剛把動態調整的功能開發完成。
想起我之前寫過這方面的文章,就找出來看了一下:《如何設置線程池參數?美團給出了一個讓面試官虎軀一震的回答。》
然后給我指出了一個問題,我仔細思考了一下,好像確實是留了一個坑。
為了更好的描述這個坑,我先給大家回顧一下線程池動態調整的幾個關鍵點。
首先,為什么需要對線程池的參數進行動態調整呢?
因為隨着業務的發展,有可能出現一個線程池開始夠用,但是漸漸的被塞滿的情況。
這樣就會導致后續提交過來的任務被拒絕。
沒有一勞永逸的配置方案,相關的參數應該是隨着系統的浮動而浮動的。
所以,我們可以對線程池進行多維度的監控,比如其中的一個維度就是隊列使用度的監控。
當隊列使用度超過 80% 的時候就發送預警短信,提醒相應的負責人提高警惕,可以到對應的管理后台頁面進行線程池參數的調整,防止出現任務被拒絕的情況。
以后有人問你線程池的各個參數怎么配置的時候,你先把分為 IO 密集型和 CPU 密集型的這個八股文答案背完之后。
加上一個:但是,除了這些方案外,我在實際解決問題的時候用的是另外一套方案”。
然后把上面的話復述一遍。

那么線程池可以修改的參數有哪些呢?
正常來說是可以調整核心線程數和最大線程數的。
線程池也直接提供了其對應的 set 方法:

但是其實還有一個關鍵參數也是需要調整的,那就是隊列的長度。
哦,對了,說明一下,本文默認使用的隊列是 LinkedBlockingQueue
。
其容量是 final 修飾的,也就是說指定之后就不能修改:

所以隊列的長度調整起來稍微要動點腦筋。
至於怎么繞過 final 這個限制,等下就說,先先給大家上個代碼。
我一般是不會貼大段的代碼的,但是這次為什么貼了呢?
因為我發現我之前的那篇文章就沒有貼,之前寫的代碼也早就不知道去哪里了。
所以,我又苦哈哈的敲了一遍...

import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadChangeDemo {
public static void main(String[] args) {
dynamicModifyExecutor();
}
private static ThreadPoolExecutor buildThreadPoolExecutor() {
return new ThreadPoolExecutor(2,
5,
60,
TimeUnit.SECONDS,
new ResizeableCapacityLinkedBlockingQueue<>(10),
new NamedThreadFactory("why技術", false));
}
private static void dynamicModifyExecutor() {
ThreadPoolExecutor executor = buildThreadPoolExecutor();
for (int i = 0; i < 15; i++) {
executor.execute(() -> {
threadPoolStatus(executor,"創建任務");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolStatus(executor,"改變之前");
executor.setCorePoolSize(10);
executor.setMaximumPoolSize(10);
ResizeableCapacityLinkedBlockingQueue<Runnable> queue = (ResizeableCapacityLinkedBlockingQueue)executor.getQueue();
queue.setCapacity(100);
threadPoolStatus(executor,"改變之后");
}
/**
* 打印線程池狀態
*
* @param executor
* @param name
*/
private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {
BlockingQueue<Runnable> queue = executor.getQueue();
System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
"核心線程數:" + executor.getCorePoolSize() +
" 活動線程數:" + executor.getActiveCount() +
" 最大線程數:" + executor.getMaximumPoolSize() +
" 線程池活躍度:" +
divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
" 任務完成數:" + executor.getCompletedTaskCount() +
" 隊列大小:" + (queue.size() + queue.remainingCapacity()) +
" 當前排隊線程數:" + queue.size() +
" 隊列剩余大小:" + queue.remainingCapacity() +
" 隊列使用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
}
private static String divide(int num1, int num2) {
return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 + "") * 100);
}
}
當你把這個代碼粘過去之后,你會發現你沒有 NamedThreadFactory
這個類。
沒有關系,我用的是 hutool 工具包里面的,你要是沒有,可以自定義一個,也可以在構造函數里面不傳,這不是重點,問題不大。
問題大的是 ResizeableCapacityLinkedBlockingQueue
這個玩意。
它是怎么來的呢?
在之前的文章里面提到過:
就是把 LinkedBlockingQueue 粘貼一份出來,修改個名字,然后把 Capacity 參數的 final 修飾符去掉,並提供其對應的 get/set 方法。

感覺非常的簡單,就能實現 capacity 參數的動態變更。
但是,我當時寫的時候就感覺是有坑的。
畢竟這么簡單的話,為什么官方要把它給設計為 final 呢?

坑在哪里?
關於 LinkedBlockingQueue
的工作原理就不在這里說了,都是屬於必背八股文的內容。
主要說一下前面提到的場景中,如果我直接把 final 修飾符去掉,並提供其對應的 get/set 方法,這樣的做法坑在哪里。
先說一下,如果沒有特殊說明,本文中的源碼都是 JDK 8 版本。
我們看一下這個 put 方法:

主要看這個被框起來的部分。
while 條件里面的 capacity 我們知道代表的是當前容量。
那么 count.get 是個什么玩意呢?

就是當前隊列里面有多少個元素。
count.get == capacity 就是說隊列已經滿了,然后執行 notFull.await()
把當前的這個 put 操作掛起來。
來個簡單的例子驗證一下:

申請一個長度為 5 的隊列,然后在循環里面調用 put 方法,當隊列滿了之后,程序就阻塞住了。
通過 dump 當前線程可以知道主線程確實是阻塞在了我們前面分析的地方:

所以,你想想。如果我把隊列的 capacity 修改為了另外的值,這地方會感知到嗎?
它感知不到啊,它在等着別人喚醒呢。

現在我們把隊列換成我修改后的隊列驗證一下。
下面驗證程序的思路就是在一個子線程中執行隊列的 put 操作,直到容量滿了,被阻塞。
然后主線程把容量修改為 100。

上面的程序其實我想要達到的效果是當容量擴大之后,子線程不應該繼續阻塞。
但是經過前面的分析,我們知道這里並不會去喚醒子線程。
所以,輸出結果是這樣的:

子線程還是阻塞着,所以並沒有達到預期。
所以這個時候我們應該怎么辦呢?
當然是去主動喚醒一下啦。
也就是修改一下 setCapacity 的邏輯:
public void setCapacity(int capacity) {
final int oldCapacity = this.capacity;
this.capacity = capacity;
final int size = count.get();
if (capacity > size && size >= oldCapacity) {
signalNotFull();
}
}
核心邏輯就是發現如果容量擴大了,那么就調用一下 signalNotFull
方法:

喚醒一下被 park 起來的線程。
如果看到這里你覺得你有點懵,不知道 LinkedBlockingQueue 的這幾個玩意是干啥的:

趕緊去花一小時時間補充一下 LinkedBlockingQueue 相關的知識點。這樣玩意,面試也經常考的。
好了,我們說回來。
修改完我們自定義的 setCapacity 方法后,再次執行程序,就出現了我們預期的輸出:

除了改 setCapacity 方法之外,我在寫文章的時候不經意間還觸發了另外一個答案:

在調用完 setCapacity 方法之后,再次調用 put 方法,也能得到預期的輸出:

我們觀察 put 方法就能發現其實道理是一樣的:

當調用完 setCapacity 方法之后,再次調用 put 方法,由於不滿足標號為 ① 的代碼的條件,所以就不會被阻塞。
於是可以順利走到標號為 ② 的地方喚醒被阻塞的線程。
所以也就變相的達到了改變隊列長度,喚醒被阻塞的任務目的。
而究根結底,就是需要執行一次喚醒的操作。
那么那一種優雅一點呢?
那肯定是第一種把邏輯封裝在 setCapacity 方法里面操作起來更加優雅。
第二種方式,大多適用於那種“你也不知道為什么,反正這樣寫程序就是正常了”的情況。

現在我們知道在線程池里面動態調整隊列長度的坑是什么了。
那就是隊列滿了之后,調用 put 方法的線程就會被阻塞住,即使此時另外的線程調用了 setCapacity 方法,改變了隊列長度,如果沒有線程再次觸發 put 操作,被阻塞的線程也不會被喚醒。
是不是?
了不了解?
對不對?
這是不對的,朋友們。

看到前面內容,頻頻點頭的朋友,要注意了。
這地方要開始轉彎了。
開始轉彎
線程池里面往隊列里面添加對象的時候,用的是 offer 命令,並沒有用 put 命令:

我們看看 offer 命令在干啥事兒:

隊列滿了之后,直接返回 false,不會出現阻塞的情況。
也就是說,線程池中根本就不會出現我前面說的需要喚醒的情況,因為根本就沒有阻塞中的線程。
在和大佬交流的過程中,他提到了一個 VariableLinkedBlockingQueue
的東西。
這個類位於 MQ 包里面,我前面提到的 setCapacity 方法的修改方式就是在它這里學來的:

同時,項目里面也用到了它的 put 方法:

所以,它是有可能出現我們前面分析的情況,有需要被喚醒的線程。
但是,你想想,線程池里面並沒有使用 put 方法,是不是就剛好避免這樣的情況?
是的,確實是。
但是,不夠嚴謹,如果知道有問題了的話,為什么要留個坑在這里呢?
你學 MQ 的 VariableLinkedBlockingQueue 考慮的周全一點,就算 put 方法阻塞的時候也能用,它不香嗎?
寫到這里其實好像除了讓你熟悉一下 LinkedBlockingQueue 外,似乎是一個沒啥卵用的知識點,
但是,我能讓這個沒有卵用的知識點起到大作用。
因為這其實是一個小細節。
假設我出去面試,在面試的時候提到動態調整方法的時候,在不經意間拿捏一下這個小細節,即使我沒有真的落地過動態調整,但是我提到這樣的一個小細節,就顯得很真實。
面試官一聽:很不錯,有整體,有局部,應該是假不了。

在 VariableLinkedBlockingQueue 里面還有幾處細節,拿 put 方法來說:

判斷條件從 count.get() >= capacity
變成了 count.get() = capacity
,目的是為了支持 capacity 由大變小的場景。
這樣的地方還有好幾處,就不一一列舉了。
魔鬼,都在細節里面。
同學們得好好的拿捏一下。
JDK bug
其實原計划寫到前面,就打算收尾了,因為我本來就只是想補充一下我之前沒有注意到的細節。
但是,我手賤,跑到 JDK bug 列表里面去搜索了一下 LinkedBlockingQueue,想看看還有沒有什么其他的收獲。
我是萬萬沒想到,確實是有一點意外收獲的。
首先是這一個 bug ,它是在 2019-12-29 被提出來的:
https://bugs.openjdk.java.net/browse/JDK-8236580

看標題的意思也是想要給 LinkedBlockingQueue 賦能,可以讓它的容量進行修改。
加上他下面的場景描述,應該也想要和線程池配合,找到隊列的抓手,下鑽到底層邏輯,聯動監控系統,拉通配置頁面,打出一套動態適應的組合拳。

但是官方並沒有采納這個建議。

回復里面說寫 concurrent 包的這些哥們對於在並發類里面加東西是非常謹慎的。他們覺得給 ThreadPoolExecutor 提供可動態修改的特性會帶來或者已經帶來眾多的 bug 了。
我理解就是簡單一句話:建議還是不錯的,但是我不敢動。並發這塊,牽一發動全身,不知道會出些什么幺蛾子。
所以要實現這個功能,還是得自己想辦法。
這里也就解釋了為什么用 final 去修飾了隊列的容量,畢竟把功能縮減一下,出現 bug 的幾率也少了很多。

第二個 bug 就有意思了,和我們動態調整線程池的需求非常匹配:
https://bugs.openjdk.java.net/browse/JDK-8241094

這是一個 2020 年 3 月份提出的 bug,描述的是說在更新線程池的核心線程數的時候,會拋出一個拒絕異常。
在 bug 描述的那部分他貼了很多代碼,但是他給的代碼寫的很復雜,不太好理解。
好在 Martin 大佬寫了一個簡化版,一目了然,就好理解的多:

這段代碼是干了個啥事兒呢,簡單給大家匯報一下。
首先 main 方法里面有個循環,循環里面是調用了 test 方法,當 test 方法拋出異常的時候循環結束。
然后 test 方法里面是每次都搞一個新的線程池,接着往線程池里面提交隊列長度加最大線程數個任務,最后關閉這個線程池。
同時還有另外一個線程把線程池的核心線程數從 1 修改為 5。
你可以打開前面提到的 bug 鏈接,把這段代碼貼出來跑一下,非常的匪夷所思。
Martin 大佬他也認為這是一個 BUG.
說實在的,我跑了一下案例,我覺得這應該算是一個 bug,但是經過 Doug Lea 老爺子的親自認證,他並不覺得這是一個 Bug。
主要是這個 bug 確實也有點超出我的認知,而且在鏈接中並沒有明確的說具體原因是什么,導致我定位的時間非常的長,甚至一度想要放棄。
但是最終定位到問題之后也是長嘆一口:害,就這?沒啥意思。
先看一下問題的表現是怎么樣的:

上面的程序運行起來后,會拋出 RejectedExecutionException,也就是線程池拒絕執行該任務。
但是我們前面分析了,for 循環的次數是線程池剛好能容納的任務數:

按理來說不應該有問題啊?
這也就是提問的哥們納悶的地方:

他說:我很費解啊,我提交的任務數量根本就不會超過 queueCapacity+maxThreads,為什么線程池還拋出了一個 RejectedExecutionException?而且這個問題非常的難以調試,因為在任務中添加任何形式的延遲,這個問題都不會復現。
他的言外之意就是:這個問題非常的莫名其妙,但是我可以穩定復現,只是每次復現出現問題的時機都非常的隨機,我搞不定了,我覺得是一個 bug,你們幫忙看看吧。
我先不說我定位到的 Bug 的主要原因是啥吧。
先看看老爺子是怎么說的:

老爺子的觀點簡單來說就是四個字:

老爺子說他沒有說服自己上面的這段程序應該被正常運行成功。
意思就是他覺得拋出異常也是正常的事情。但是他沒有說為什么。
一天之后,他又補了一句話:

我先給大家翻譯一下:
他說當線程池的 submit 方法和 setCorePoolSize 或者 prestartAllCoreThreads 同時存在,且在不同的線程中運行的時候,它們之間會有競爭的關系。
在新線程處於預啟動但還沒完全就緒接受隊列中的任務的時候,會有一個短暫的窗口。在這個窗口中隊列還是處於滿的狀態。
解決方案其實也很簡單,比如可以在 setCorePoolSize 方法中把預啟動線程的邏輯拿掉,但是如果是用 prestartAllCoreThreads 方法,那么還是會出現前面的問題。
但是,不管是什么情況吧,我還是不確定這是一個需要被修復的問題。
怎么樣,老爺子的話看起來是不是很懵?
是的,這段話我最開始的時候讀了 10 遍,都是懵的,但是當我理解到這個問題出現的原因之后,我還是不得不感嘆一句:
還是老爺子總結到位,沒有一句廢話。
到底啥原因?
首先我們看一下示例代碼里面操作線程池的這兩個地方:

修改核心線程數的是一個線程,即 CompletableFuture 的默認線程池 ForkJoinPool 中的一個線程。
往線程池里面提交任務是另外一個線程,即主線程。
老爺子的第一句話,說的就是這回事:

racing,就是開車,就是開快車,就是與...比賽的意思。
這是一個多線程的場景,主線程和 ForkJoinPool 中的線程正在 race,即可能出現誰先誰后的問題。
接着我們看看 setCorePoolSize 方法干了啥事:

標號為 ① 的地方是計算新設置的核心線程數與原核心線程數之間的差值。
得出的差值,在標號為 ② 的地方進行使用。
也就是取差值和當前隊列中正在排隊的任務數中小的那一個。
比如當前的核心線程數配置就是 2,這個時候我要把它修改為 5。隊列里面有 10 個任務在排隊。
那么差值就是 5-2=3,即標號為 ① 處的 delta=3。
workQueue.size 就是正在排隊的那 10 個任務。
也就是 Math.min(3,10),所以標號為 ② 處的 k=3。
含義為需要新增 3 個核心線程數,去幫忙把排隊的任務給處理一下。
但是,你想新增 3 個就一定是對的嗎?
會不會在新增的過程中,隊列中的任務已經被處理完了,有可能根本就不需要 3 個這么多了?
所以,循環終止的條件除了老老實實的循環 k 次外,還有什么?
就是隊列為空的時候:

同時,你去看代碼上面的那一大段注釋,你就知道,其實它描述的和我是一回事。
好,我們接着看 addWorker 里面,我想要讓你看到地方:

在這個方法里面經過一系列判斷后,會走入到 new Worker() 的邏輯,即工作線程。
然后把這個線程加入到 workers 里面。
workers 就是一個存放工作線程的 HashSet 集合:

你看我框起來的這兩局代碼,從 workers.add(w)
到 t.start()
。
從加入到集合到真正的啟動,中間還有一些邏輯。
執行中間的邏輯的這一小段時間,就是老爺子說的 “window”。
there's a window while new threads are in the process of being prestarted but not yet taking tasks。
就是在新線程處於預啟動,但尚未接受任務時,會有一個窗口。
這個窗口會發生啥事兒呢?
就是下面這句話:
the queue may remain (transiently) full。
隊列有可能還是滿的,但是只是暫時的。
接下來我們連起來看:

所以怎么理解上面被划線的這句話呢?
帶入一個實際的場景,也就是前面的示例代碼,只是調整一下參數:

這個線程池核心線程數是 1,最大線程數是 2,隊列長度是 5,最多能容納的任務數是 7。
另外有一個線程在執行把核心線程池從 1 修改為 2 的操作。
假設我們記線程池 submit 提交了 6 個任務,正在提交第 7 個任務的時間點為 T1。
為什么是要強調這個時間點呢?
因為當提交第 7 個任務的時候,就需要去啟用非核心線程數了。
具體的源碼在這里:
java.util.concurrent.ThreadPoolExecutor#execute

也就是說此時隊列滿了, workQueue.offer(command)
返回的是 fasle。因此要走到 addWorker(command, false)
方法中去了。
代碼走到 1378 行這個時間點,是 T1。
如果 1378 行的 addWorker 方法返回 false,說明添加工作線程失敗,拋出拒絕異常。
前面示例程序拋出拒絕異常就是因為這里返回了 fasle。
那么問題就變成了:為什么 1378 行中的 addWorker 執行后返回了 false 呢?
因為當前不滿足這個條件了 wc >= (core ? corePoolSize : maximumPoolSize)
:

wc 就是當前線程池,正在工作的線程數。
把我們前面的條件帶進去,就是這樣的 wc >=(false?2:2)
。
即 wc=2。
為什么會等於 2,不應該是 1 嗎?
多的哪一個是哪里來的呢?
真相只有一個:恰好此時 setCorePoolSize 方法中的 addWorker 也執行到了 workers.add(w)
,導致 wc 從 1 變成了 2。
撞車了,所以拋出拒絕異常。
那么為什么大多數情況下不會拋出異常呢?
因為從 workers.add(w)
到 t.start()
這個時間窗口,非常的短暫。
大多數情況下,setCorePoolSize 方法中的 addWorker 執行了后,就會理解從隊列里面拿一個任務出來執行。
而這個情況下,另外的任務通過線程池提交進來后,發現隊列還有位子,就放到隊列里面去了,根本不會去執行 addWorker 方法。
道理,就是這樣一個道理。
這個多線程問題確實是比較難復現,我是怎么定位到的呢?
加日志。
源碼里面怎么加日志呢?
我不僅搞了一個自定義隊列,還把線程池的源碼粘出來了一份,這樣就可以加日志了:

另外,其實我這個定位方案也是很不嚴謹的。
調試多線程的時候,最好是不要使用 System.out.println,有坑!

場景
我們再回頭看看老爺子給出的方案:

其實它給了兩個。
第一個是拿掉 setCorePoolSize 方法中的 addworker 的邏輯。
第二個是說原程序中,即提問者給的程序中,使用的是 prestartAllCoreThreads 方法,這個里面必須要調用 addWorker 方法,所以還是有一定的幾率出現前面的問題。

但是,老爺子不明白為什么會這樣寫?
我想也許他是沒有想到什么合適的場景?
其實前面提到的這個 Bug,其實在動態調整的這個場景下,還是有可能會出現的。
雖然,出現的概率非常低,條件也非常苛刻。
但是,還是有幾率出現的。
萬一出現了,當同事都在摳腦殼的時候,你就說:這個嘛,我見過,是個 Bug。不一定每次都出現的。
這又是一個你可以拿捏的小細節。
但是,如果你在面試的時候遇到這個問題了,這屬於一個傻逼問題。
毫無意義。
屬於,面試官不知道在哪看到了一個感覺很厲害的觀點,一定要展現出自己很厲害的樣子。
但是他不知道的是,這個題:

最后說一句
好了,看到了這里了,安排一個點贊吧。寫文章很累的,需要一點正反饋。
給各位讀者朋友們磕一個了:

本文已收錄自個人博客,歡迎大家來玩:
https://www.whywhy.vip/