帶着問題閱讀
1、什么是池化,池化能帶來什么好處
2、如何設計一個資源池
3、Java的線程池如何使用,Java提供了哪些內置線程池
4、線程池使用有哪些注意事項
池化技術
池化思想介紹
池化思想是將重量級資源預先准備好,在使用時可重復使用這些預先准備好的資源。
池化思想的核心概念有:
- 資源創建/銷毀開銷大
- 提前創建,集中管理
- 重復利用,資源可回收
例如大街上的共享單車,用戶掃碼開鎖,使用完后歸還到停放點,下一個用戶可以繼續使用,共享單車由廠商統一管理,為用戶節省了購買單車的開銷。
池化技術的應用
常見的池化技術應用有:資源池、連接池、線程池等。
-
資源池
在各種電商平台大促活動時,平台需要支撐平時幾十倍的流量,因此各大平台在需要提前准備大量服務器進行擴容,在活動完畢以后,擴容的服務器資源又白白浪費。將計算資源池化,在業務高峰前進行分配,高峰結束后提供給其他業務或用戶使用,即可節省大量消耗,資源池化也是雲計算的核心技術之一。
-
連接池
網絡連接的建立和釋放也是一個開銷較大的過程,提前在服務器之間建立好連接,在需要使用的時候從連接池中獲取,使用完畢后歸還連接池,以供其他請求使用,以此可節省掉大量的網絡連接時間,如數據庫連接池、HttpClient連接池。
-
線程池
線程的建立銷毀都涉及到內核態切換,提前創建若干數量的線程提供給客戶端復用,可節約大量的CPU消耗以便處理業務邏輯。線程池也是接下來重點要講的內容。
如何設計一個線程池
設計一個線程池,至少需要提供的核心能力有:
- 線程池容器:用於容納初始化時預先創建的線程。
- 線程狀態管理:管理池內線程的生命周期,記錄每個線程當前的可服務狀態。
- 線程請求管理:對調用端提供獲取和歸還線程的接口。
- 線程耗盡策略:提供策略以處理線程耗盡問題,如拒絕服務、擴容線程池、排隊等待等。
基於以上角度,我們來分析Java是如何設計線程池功能的。
Java線程池解析
ThreadPoolExecutor使用介紹
大象裝冰箱總共分幾步
// 1.創建線程池
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 2.提交任務
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println("task running");
}
}});
// 3.關閉線程池
threadPool.shutDown();
Java通過ThreadPoolExecutor
提供線程池的實現,如示例代碼,初始化一個容量為1的線程池、然后提交任務、最后關閉線程池。
ThreadPoolExecutor
的核心方法主要有
-
構造函數:
ThreadPoolExecutor
提供了多個構造函數,以下對基礎構造函數進行說明。public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
-
corePoolSize:線程池的核心線程數。池內線程數小於
corePoolSize
時,線程池會創建新線程執行任務。 -
maximumPoolSize:線程池的最大線程數。池內線程數大於
corePoolSize
且workQueue
任務等待隊列已滿時,線程池會創建新線程執行隊列中的任務,直到線程數達到maximumPoolSize
為止。 -
keepAliveTime:非核心線程的存活時長。池內超過
corePoolSize
數量的線程可存活的時長。 -
unit:非核心線程存活時長單位。與
keepAliveTime
取值配合,如示例代碼表示1分鍾。 -
workQueue:任務提交隊列。當無空閑核心線程時,存儲待執行任務。
類型 作用 ArrayBlockingQueue 數組結構的有界阻塞隊列 LinkedBlockingQueue 鏈表結構的阻塞隊列,可設定是否有界 SynchronousQueue 不存儲元素的阻塞隊列,直接將任務提交給線程池執行 PriorityBlockingQueue 支持優先級的無界阻塞隊列 DelayQueue 支持延時執行的無界阻塞隊列 -
threadFactory:線程工廠。用於創建線程對象。
-
handler:拒絕策略。線程池線程數量達到
maximumPoolSize
且workQueue
已滿時的處理策略。類型 作用 AbortPolicy 拒絕並拋出異常。默認 CallerRunsPolicy 由提交任務的線程執行任務 DiscardOldestPolicy 拋棄隊列頭部任務 DiscardPolicy 拋棄該任務
-
-
執行函數:
execute
和submit
,主要分別用於執行Runnable
和Callable
。// 提交Runnable void execute(Runnable command); // 提交Callable並返回Future <T> Future<T> submit(Callable<T> task); // 提交Runnable,執行結束后Future.get會返回result <T> Future<T> submit(Runnable task, T result); // 提交Runnable,執行結束后Future.get會返回null Future<?> submit(Runnable task);
-
停止函數:
shutDown
和shutDownNow
。// 不再接收新任務,等待剩余任務執行完畢后停止線程池 void shutdown(); // 不再接收新任務,並嘗試中斷執行中的任務,返回還在等待隊列中的任務列表 List<Runnable> shutdownNow();
內置線程池使用
To be useful across a wide range of contexts, this class provieds many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread poll, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor}(single background thread), that preconfigure settings for the most common usage scenarios.
由於ThreadPoolExecutor
參數復雜,Java提供了三種內置線程池newCachedThreadPool
、newFixedThreadPool
和newSingleThreadExecutor
應對大多數場景。
-
Executors.newCachedThreadPool()
無界線程池,核心線程池大小為0,最大為Integer.MAX_VALUE
,因此嚴格來講並不算無界。采用SynchronousQueue
作workQueue
,意味着任務不會被阻塞保存在隊列,而是直接遞交到線程池,如線程池無可用線程,則創建新線程執行。public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
Executors.newFixedThreadPool(int nThreads)
固定大小線程池,其中coreSize
和maxSize
相等,且過期時間為0,表示經過一定數量任務提交后,線程池將始終維持在nThreads
數量大小,不會新增也不會回收線程。public static ExecutorService new FixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
Executors.newSingleThreadExecutor()
單線程池,參數與fixedThreadPool
類似,只是將數量限制在1,單線程池主要避免重復創建銷毀線程對象,也可用於串行化執行任務。不同與其他線程池,單線程池采用FinallizableDelegatedExecutorService
對ThreadPoolExecutor
對象進行包裝,感興趣的同學可以看下源碼,其方法實現僅僅是對被包裝對象方法的直接調用。包裝對象主要用於避免用戶將線程池強制轉換為ThreadPoolExecutor
來修改線程池大小。public static ExecutorService newSingleThreadExecutor() { return new FinallizableDelegatedExecutorService( (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockQueue<Runnable>())) ); }
ThreadPoolExecutor解析
整體設計

ThreadPoolExecutor
基於ExecutorService
接口實現提交任務,未采取常規資源池獲取/歸還資源的形式,整個線程池和線程的生命周期都由ThreadPoolExecutor
進行管理,線程對象不對外暴露;ThreadPoolExecutor
的任務管理機制類似於生產者消費者模型,其內部維護一個任務隊列和消費者,一般情況下,任務被提交到隊列中,消費線程從隊列中拉取任務並將其執行。
線程池生命周期
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c) { return c & ~CAPACITY; } //計算當前運行狀態
private static int workerCountOf(int c) { return c & CAPACITY; } //計算當前線程數量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通過狀態和線程數生成ctl
TreadPoolExecutor
通過ctl
維護線程池的狀態和線程數量,其中高3位存儲運行狀態,低29位存儲線程數量。
位運算操作推薦參考第三篇文章。
線程池設定了RUNNING
、SHUTDOWN
、STOP
、TIDYING
和TERMINATED
五種狀態,其轉移圖如下:
在這5種狀態中,只有RUNNING
時線程池可接收新任務,其余4種狀態在調用shutDown
或shutDownNow
后觸發轉換,且在這4種狀態時,線程池均不再接收新任務。
任務管理解析
// 用於存放提交任務的隊列
private final BlockingQueue<Runnable> workQueue;
// 用於保存池內的工作線程,Java將Thread包裝成Worker存儲
private final HashSet<Worder> workers = new HashSet<Worker>();
ThreadPoolExecutor
主要通過workQueue
和workers
兩個字段用於管理和執行任務。

線程池任務執行流程如圖,結合ThreadPoolExecutor.execute
源碼,對任務執行流程進行說明:
-
當任務提交到線程池時,如果當前線程數量小於核心線程數,則會將為該任務直接創建一個
worker
並將任務交由worker
執行。if (workerCountOf(c) < corePoolSize) { // 創建新worker執行任務,true表示核心線程 if (addWorker(command, true)) return; c = ctl.get(); }
-
當已經達到核心線程數后,任務會提交到隊列保存;
// 放入workQueue隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 這里采用double check再次檢測線程池狀態 if (! isRunning(recheck) && remove(command)) reject(command); // 避免加入隊列后,所有worker都已被回收無可用線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); }
-
如果隊列已滿,則依據最大線程數量創建新
worker
執行。如果新增worker
失敗,則依據設定策略拒絕任務。// 接上,放入隊列失敗 // 添加新worker執行任務,false表示非核心線程 else if (!addWorker(command, false)) // 如添加失敗,執行拒絕策略 reject(command);
woker對象
ThreadPoolExecutor
沒有直接使用Thread
記錄線程,而是定義了worker
用於包裝線程對象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// worker對象被創建后就會執行
public void run() {
runWorker(this);
}
}
worker
對象通過addWorker
方法創建,一般會為其指定一個初始任務firstTask
,當worker
執行完畢以后,worker
會從阻塞隊列中讀取任務,如果沒有任務,則該worker
會陷入阻塞狀態給出worker
的核心邏輯代碼:
private boolean addWorker(Runnable firstTask, boolean core) {
...
// 指定firstTask,可能為null
w = new Worker(firstTask);
...
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
}
...
// 執行新添加的worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
final void runWorker(Worker w) {
// 等待workQueue的任務
while (task != null || (task = getTask()) != null) {
...
}
}
private Runnable getTask() {
...
for (;;) {
...
// 如果是普通工作線程,則根據線程存活時間讀取阻塞隊列
// 如果是核心工作線程,則直接陷入阻塞狀態,等待workQueue獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
...
}
}
如下圖,任務提交后觸發addWorker
創建worker
對象,該對象執行任務完畢后,則循環獲取隊列中任務等待執行。
Java線程池實踐建議
不建議使用Exectuors
線程池不允許使用
Executors
去創建,而是通過ThreadPoolExecutor
的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。《阿里巴巴開發手冊》
雖然Java推薦開發者直接使用Executors
提供的線程池,但實際開發中通常不使用。主要考慮問題有:
-
潛在的OOM問題
CachedThreadPool
將最大數量設置為Integer.MAX_VALUE
,如果一直提交任務,可能造成Thread
對象過多引起OOM
。FixedThreadPool
和SingleThreadPoo
的隊列LinkedBlockingQueue
無容量限制,阻塞任務過多也可能造成OOM
。 -
線程問題定位不便
由於未指定
ThreadFactory
,線程名稱默認為pool-poolNumber-thread-thredNumber
,線程出現問題后不便定位具體線程池。 -
線程池分散
通常在完善的項目中,由於線程是重量資源,因此線程池由統一模塊管理,重復創建線程池容易造成資源分散,難以管理。
線程池大小設置
通常按照IO繁忙型和CPU繁忙型任務分別采用以下兩個普遍公式。
在理論場景中,如一個任務IO耗時40ms,CPU耗時10ms,那么在IO處理期間,CPU是空閑的,此時還可以處理4個任務(40/10),因此理論上可以按照IO和CPU的時間消耗比設定線程池大小。
《JAVA並發編程實踐》中還考慮數量乘以目標CPU的利用率
在實際場景中,我們通常無法准確測算IO和CPU的耗時占比,並且隨着流量變化,任務的耗時占比也不能固定。因此可根據業務需求,開設線程池運維接口,根據線上指標動態調整線程池參數。
推薦參考第二篇美團線程池應用
線程池監控
ThreadPoolExecutor
提供以下方法監控線程池:
-
getTaskCount()
返回被調度過的任務數量 -
getCompletedTaskCount()
返回完成的任務數量 -
getPoolSize()
返回當前線程池線程數量 -
getActiveCount()
返回活躍線程數量 -
getQueue()
獲取隊列,一般用於監控阻塞任務數量和隊列空間大小
參考
-
《Java並發編程實踐》