你好呀,我是歪歪。
今天給大家分享一個經過擴展后的線程池,且我覺得擴展的思路非常好的。
放心,我標題黨來着,我覺得面試不會有人考這個玩意,但是工作中是有可能真的會遇到相應的場景。
為了引出這個線程池,我先給大家搞個場景,方便理解。
就拿下面這個表情包來做例子吧。

假設我們有兩個程序員,就叫富貴和旺財吧。
上面這個表情包就是這兩個程序員一天的工作寫照,用程序來表示是這樣的。
首先我們搞一個對象,表示程序員當時正在做的事兒:
public class CoderDoSomeThing {
private String name;
private String doSomeThing;
public CoderDoSomeThing(String name, String doSomeThing) {
this.name = name;
this.doSomeThing = doSomeThing;
}
}
然后,用代碼描述一下富貴和旺財做的事兒:
public class NbThreadPoolTest {
public static void main(String[] args) {
CoderDoSomeThing rich1 = new CoderDoSomeThing("富貴", "啟動Idea");
CoderDoSomeThing rich2 = new CoderDoSomeThing("富貴", "搞數據庫,連tomcat,crud一頓輸出");
CoderDoSomeThing rich3 = new CoderDoSomeThing("富貴", "嘴角瘋狂上揚");
CoderDoSomeThing rich4 = new CoderDoSomeThing("富貴", "接口訪問報錯");
CoderDoSomeThing rich5 = new CoderDoSomeThing("富貴", "心態崩了,卸載Idea");
CoderDoSomeThing www1 = new CoderDoSomeThing("旺財", "啟動Idea");
CoderDoSomeThing www2 = new CoderDoSomeThing("旺財", "搞數據庫,連tomcat,crud一頓輸出");
CoderDoSomeThing www3 = new CoderDoSomeThing("旺財", "嘴角瘋狂上揚");
CoderDoSomeThing www4 = new CoderDoSomeThing("旺財", "接口訪問報錯");
CoderDoSomeThing www5 = new CoderDoSomeThing("旺財", "心態崩了,卸載Idea");
}
}
簡單解釋一下變量的名稱,表明我還是經過深思熟慮了的。
富貴,就是有錢,所以變量名叫做 rich。
旺財,就是汪汪汪,所以變量名叫做 www。
你看我這個類的名稱,NbThreadPoolTest,就知道我是要用到線程池了。
實際情況中,富貴和旺財兩個人是可以各干各的事兒,互不干擾的,也就是他們應該是各自的線程。
各干各的事兒,互不干擾,這聽起來好像是可以用線程池的。
所以,我把程序修改成了下面這個樣子,把線程池用起來:
public class NbThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<CoderDoSomeThing> coderDoSomeThingList = new ArrayList<>();
coderDoSomeThingList.add(new CoderDoSomeThing("富貴", "啟動Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("富貴", "搞數據庫,連tomcat,crud一頓輸出"));
coderDoSomeThingList.add(new CoderDoSomeThing("富貴", "嘴角瘋狂上揚"));
coderDoSomeThingList.add(new CoderDoSomeThing("富貴", "接口訪問報錯"));
coderDoSomeThingList.add(new CoderDoSomeThing("富貴", "心態崩了,卸載Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺財", "啟動Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺財", "搞數據庫,連tomcat,crud一頓輸出"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺財", "嘴角瘋狂上揚"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺財", "接口訪問報錯"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺財", "心態崩了,卸載Idea"));
coderDoSomeThingList.forEach(coderDoSomeThing -> {
executorService.execute(() -> {
System.out.println(coderDoSomeThing.toString());
});
});
}
}
上面程序就是把富貴和旺財兩人做的事情都封裝到了 list 里面,然后遍歷這個 list 把里面的東西,即“做的事情”都扔到線程池里面去。
那么上面的程序執行后,一種可能的輸出是這樣的:

乍一看沒問題,富貴和旺財都在同時做事。
但是仔細一看,每個人做的事情的順序不對了啊。
比如旺財看起來有點“精神分裂”,剛剛啟動 Idea,嘴角就開始瘋狂上揚了。
所以,到這里可以引出我想要的東西了。
我想要的是什么樣的東西呢?
就是在保證富貴和旺財在同時做事的情況下,還要保證他們的做的事情是有一定順序的,即按照我投放到線程池里面的順序來執行。
用正式一點的話來描述是這樣的:
我需要這樣的一個線程池,它可以確保投遞進來的任務按某個維度划分出任務,然后按照任務提交的順序依次執行。這個線程池可以通過並行處理(多個線程)來提高吞吐量、又要保證一定范圍內的任務按照嚴格的先后順序來運行。
用我前面的例子,“按某個維度”就是人名,就是富貴和旺財這個維度。
請問你怎么做?

一頓分析
我會怎么做?
首先,我可以肯定的是 JDK 的線程池是干不成這個事兒的。
因為從線程池原理的角度來說,並行和先后順序它是不能同時滿足的。
你明白我意思吧?
比如我要用線程池來保證先后順序,那么它是這樣的:

只有一個線程的線程池,它可以保證先后順序。
但是這玩意有意義嗎?
有點意義,因為它並不占用主線程,但是意義不大,畢竟閹割了重要的“多線程”能力。
所以我們怎么在這個場景下把並行能力給提上去呢?
等等,我們好像已經有一個可以保證先后順序的線程池了。
那么我們把它橫向擴容,多搞幾個,不就具備了並行的能力了嗎?

然后前面提到的“按某個維度”,如果有多個只有一個線程的線程池了,那我也可以按照這個維度去映射“維度”和“每個線程池”呀。
用程序來說就是這樣的:

標號為 ① 的地方就是搞了多個只有一個線程的線程池,目的是為了保證消費的順序性。
標號為 ② 的地方就是通過一個 map 映射人名和線程池之間的關系。這里只是一個示意,比如我們還可以用用戶號取模的方式去定位對應的線程池,比如用戶號為奇數的用一個線程池,為偶數的用另外一個線程。
所以並不是“某個維度”里面有多少個數據就要定義多少個只有一個線程的線程池,它們也是可以復用的,這個地方有個小彎要轉過來。
標號為 ③ 的地方就是根據名稱去 map 里面去對應的線程池。
從輸出結果來看,也是沒有毛病的:

看到這里有的朋友就要說:你這不是作弊嗎?
不是說好一個線程池嗎,你這都弄了多個了。
你要這個角度看問題的話,那就把路走窄了。
你要想着有一個大的線程池,里面又放了很多個只有一個線程的線程池。
這樣格局就打開了。

我上面的寫法是一個非常簡陋的 Demo,主要是引出這個方案的思路。
我要介紹的,就是基於這個思路搞出的一個開源項目。
是一位大公司的大佬寫的,我看了一下源碼,拍案叫絕:寫的真他娘的好。
我先給你上一個使用案例和輸出結果:

從案例看起來,使用方式也是非常的簡單。
和 JDK 原生的用法的差異點就是我框起來的部分。
首先搞一個 KeyAffinityExecutor 的對象,來代替原生的線程池。
KeyAffinityExecutor 其中涉及到一個單詞,Affinity。
翻譯過來有類同的含義:

所以 KeyAffinityExecutor 翻譯過來就是 key 類同的線程池,當你明白它的功能和作用范圍后會覺得這個名字取的是針不戳。
接着是調用了 KeyAffinityExecutor 對象的 executeEx 方法,可以多傳入一個參數,這個參數就是區分某一類相同任務的維度,比如我這里就給的是 name 字段。
從使用案例上看來,可以說封裝的非常好,開箱即用。

KeyAffinityExecutor用法
先說說這個類的用法吧。
其對應的開源項目地址是這個:
https://github.com/PhantomThief/more-lambdas-java
如果你想把它用起來,得引入下面這個 maven 地址:
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>more-lambdas</artifactId>
<version>0.1.55</version>
</dependency>
其核心代碼是這個接口:
com.github.phantomthief.pool.KeyAffinityExecutor
這個接口里面有大量的注釋,大家可以拉下來看一下。
我這里主要給大家看一下接口上面,作者寫的注釋,他是這樣介紹自己的這個工具的。
這是一個按指定的 Key 親和順序消費的線程池。
KeyAffinityExecutor 是一個特殊的任務線程池。
它可以確保投遞進來的任務按 Key 相同的任務依照提交順序依次執行。在既要通過並行處理來提高吞吐量、又要保證一定范圍內的任務按照嚴格的先后順序來運行的場景下非常適用。
KeyAffinityExecutor 的內建實現方式,是將指定的 Key 映射到固定的單線程線程池上,它內部會維護多個(數量可配)這樣的單線程線程池,來保持一定的任務並行度。
需要注意的是,此接口定義的 KeyAffinityExecutor,並不要求 Key 相同的任務在相同的線程上運行,盡管實現類可以按照這種方式來實現,但它並非一個強制性的要求,因此在使用時也請不要依賴這樣的假定。
很多人問,這和自己使用一個線程池的數組,並通過簡單取模的方式來實現有什么區別?
事實上,大多數場景的確差異不大,但是當數據傾斜發生時,被散列到相同位置的數據可能會因為熱點傾斜數據被延誤。
本實現在並發度較低時(閾值可設置),會挑選最閑置的線程池投遞,盡最大可能隔離傾斜數據,減少對其它數據帶來的影響。
在作者的這段介紹里面,簡單的說明了該項目的應用場景和內部原理,和我們前面分析的差不多。
除此之外,還有兩個需要特別注意的地方。
第一個地方是這里:

作為區分的任務維度的對象,如果是自定義對象,那么一定要重寫其 hashCode、equals,以確保可以起到標識作用。
這一處的提醒就和 HashMap 的 key 如果是對象的話,應該要重寫 hashCode、equals 方法的原因是一樣一樣的。
編程基礎,只提一下,不多贅述。
第二個地方得好好說一下,屬於他的核心思想。
他沒有采用簡單取模的方式,因為在簡單取模的場景上,數據是有可能發生傾斜的。
我個人是這樣理解作者的思路的。
首先說明一下取模的數據傾斜是咋回事,舉個簡單的例子:

上面的代碼片段中,我加入了一個新角色“摸魚大師”。同時給對象新增了一個 id 字段。
假設,我們對 id 字段用 2 取余:

那么會出現的情況就是大師和富貴對應的 id 取余結果都是 1,它們將同用一個線程池。
很明顯,由於大師的頻繁操作,導致“摸魚”變成了熱點數據,從而導致編號為 0 的連接池發了傾斜,進而影響到了富貴的正常工作。
而 KeyAffinityExecutor 的策略是什么樣的呢?
它會挑選最閑置的線程池進行投遞。
怎么理解呢?
還是上面的例子,如果我們構建這樣的線程池:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "MY-POOL-%d");
第一個參數 3,代表它會在這里線程池里面構建 3 個只有一個線程的線程池。
那么當用它來提交任務的時候,由於維度是 id 維度,我們剛好三個 id,所以剛好把這個線程池占滿:

這個時候是不存在數據傾斜的。
但是,如果我把前面構建線程池的參數從 3 變成 2 呢?
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(2, 200, "MY-POOL-%d");
提交方式不變,里面加上對 id 為 1 和 2 的任務延遲的邏輯,目的是觀察 id 為 3 的數據怎么處理:

毋庸置疑,當提交執行大師的摸魚操作的時候線程池肯定不夠用了,怎么辦?
這個時候,根據作者描述“會挑選最閑置的線程池投遞”。
我用這樣的數據來說明:

所以,當執行大師摸魚操作的時候,會去從僅有的兩個選項中選一個出來。
怎么選?
誰的並發度低,就選誰。
由於有延遲時間在任務里面,所以我們可以觀察到執行富貴的線程的並發度是 5,而執行旺財的線程的並發度是 6。
因此執行大師的摸魚操作的時候,會選擇並發度為 5 的線程進行處理。

這個場景下就出現了數據傾斜。但是傾斜的前提發生了變化,變成了當前已經沒有可用線程了。
所以,作者說“盡最大可能隔離傾斜數據”。
這兩個方案最大的差異就是對線程資源的利用程度,如果是單純的取模,那么有可能出現發生數據傾斜的時候,還有可用線程。
如果是 KeyAffinityExecutor 的方式,它可以保證發生數據傾斜的時候,線程池里面的線程一定是已經用完了。
然后,你再品一品這兩個方案之間的細微差異。
KeyAffinityExecutor源碼
源碼不算多,一共就這幾個類:

但是他的源碼里面絕大部分都是 lambdas 的寫法,基本上都是函數式編程,如果你對這方面比較薄弱的話那么看起來會比較吃力一點。
如果你想掌握其源碼的話,我建議是把項目拉到本地,然后從他的測試用例入手:
https://github.com/PhantomThief/more-lambdas-java

我給大家匯報一下我看到的一些關鍵的地方,方便大家自己去看的時候梳理思路。
首先肯定是從它的構造方法入手,每一個入參的含義作者都標注的非常清楚了:

假設我們的構造函數是這樣的,含義是構建 3 個只有一個線程的線程池,每個線程池的隊列大小是 200:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "WHY-POOL-%d");
首先我們要找到構建“只有一個線程的線程池”的邏輯在哪。
就藏在構造函數里面的這個方法:
com.github.phantomthief.pool.KeyAffinityExecutorUtils#executor(java.lang.String, int)
在這里可以看到我們一直提到的“只有一個線程的線程池”,隊列的長度也可以指定:

該方法返回的是一個 Supplier 接口,等下就要用到。
接下來,我們要找到 “3” 這個數字是體現在哪兒的呢?
就藏在構造函數的 build 方法里面,該方法最終會調用到這個方法來:
com.github.phantomthief.pool.impl.KeyAffinityImpl#KeyAffinityImpl
你到時候在這個地方打個斷點,然后 Debug 看一眼,就非常明確了:

關於框起來的這部分的幾個關鍵參數,我解釋一下:

首先是 count 參數,就是我們定義的 3。那么 range(0,3),就是 0,1,2。
然后是 supplier,這玩意就是前面我們說的 executor 方法返回的 supplier 接口,可以看到里面封裝的就是個線程池。
接着是里面有一個非常關鍵的操作 :map(ValueRef::new)。
這個操作里面的 ValueRef 對象,很關鍵:
com.github.phantomthief.pool.impl.KeyAffinityImpl.ValueRef

關鍵的地方就是這個對象里面的 concurrency 變量。
還記得最前面說的“挑選最閑置的執行器(線程池)”這句話嗎?
怎么判斷是否閑置?
靠的就是 concurrency 變量。
其對應的代碼在這:
com.github.phantomthief.pool.impl.KeyAffinityImpl#select

能走到斷點的地方,說明當前這個 key 是之前沒有被映射過的,所以需要為其指定一個線程池。
而指定這個線程池的操作,就是循環這個 all 集合,集合里面裝的就是 ValueRef 對象:

所以,comparingInt(ValueRef::concurrency) 方法就是在選當前所有的線程池,並發度最小的一個。
如果這個線程池從來沒有用過或者目前沒有任務在使用,那么並發度必然是 0 ,所有會被選出來。
如果所有線程池正在被使用,就會選 concurrency 這個值最低的線程池。
我這里只是給大家說一個大概的思路,如果要深入了解的話,自己去翻源碼去。
如果你非常了解 lambdas 的用法的話,你會覺得寫的真的很優雅,看起來很舒服。
如果你不了解 lambdas 的話...
那你還不趕緊去學?
另外我還發現了兩個熟悉的東西。
朋友們,請看這是什么:

這難道不就是線程池參數的動態調整嗎?
第二個是這樣的:

RabbitMQ 里面的動態調整我也寫過啊,也是強調過這三處地方:
-
增加 {@link #setCapacity(int)} 和 {@link #getCapacity()} -
{@link #capacity} 判斷邊界從 == 改為 >= -
部分 signal() 信號觸發改為 signalAll()
另外作者還提到了 RabbitMQ 的版本里面會有導致 NPE 的 BUG 的問題。
這個就沒細研究了,有興趣的可以去對比一下代碼,就應該能知道問題出在哪里。
說說 Dubbo
為什么要說一下 Dubbo 呢?
因為我似乎在 Dubbo 里面也發現了 KeyAffinityExecutor 的蹤跡。
為什么說是似乎呢?
因為最終沒有被合並到代碼庫里面去。

其對應的鏈接是這里:
https://github.com/apache/dubbo/pull/8975
這一次提交一共提交了這么多文件:

里面是可以找到我們熟悉的東西:

其實思路都是一樣的,但是你會發現即使是思路一樣,但是兩個不同的人寫出來的代碼結構還是很不一樣的。
Dubbo 這里把代碼的層次分的更加明顯一點,比如定義了一個抽象的 AbstractKeyAffinity 對象,然后在去實現了隨機和最小並發兩種方案。
在這些細節處上是有不同的。
但是這個代碼的提供者最終沒有用這些代碼,而是拿出了一個替代方案:

https://github.com/apache/dubbo/pull/8999
在這一次提交里面,他主要提交了這個類:
org.apache.dubbo.common.threadpool.serial.SerializingExecutor
這個類從名字上你就知道了,它強調的是串行化。
帶大家看看它的測試用例,你就知道它是怎么用的了:

首先是它的構造方法入參是另外一個線程池。
然后提交任務的時候用 SerializingExecutor 的 execute 方法進行提交。
在任務內部,干的事就是從 map 里面取出 val 對應的 key ,然后進行加 1 操作再放回去。
大家都知道上面的這個操作在多線程的情況是線程不安全的,最終加出來的結果一定是小於循環次數的。
但是,如果是單線程的情況下,那肯定是沒問題的。
那么怎么把線程池映射為單線程呢?
SerializingExecutor 干得就是這事。
而且它的原理特別簡單,核心代碼就幾行。
首先它自己搞了個隊列:

提交進來的任務都扔到隊列里面去。
接下來再一個個的執行。
怎么保證一個個的執行呢?
方法有很多,它這里是搞了個 AtomicBoolean 對象來控制:

這樣就實現了把多線程任務搞成串行化的場景。
只是讓我奇怪的是 SerializingExecutor 這個類目前在 Dubbo 里面並沒有使用場景。
但是,如果你時候你就要實現這樣奇怪的功能,比如別人給你一個線程池,但是到你的流程里面出入某種考慮,需要把任務串行化,這個時候肯定是不能動別人的線程池的,那么你可以想起 Dubbo 這里有一個現成的,比較優雅的、逼格較高的解決方案。
最后說一句
好了,看到了這里了, 轉發、在看、點贊隨便安排一個吧,要是你都安排上我也不介意。寫文章很累的,需要一點正反饋。
給各位讀者朋友們磕一個了:

本文已收錄至個人博客,歡迎大家來玩。
https://www.whywhy.vip/