- corePoolSize:核心線程數
-
- 核心線程會一直存活,及時沒有任務需要執行
- 當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
- 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
- queueCapacity:任務隊列容量(阻塞隊列)
-
- 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
- maxPoolSize:最大線程數
-
- 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
- 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
- keepAliveTime:線程空閑時間
-
- 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
- 如果allowCoreThreadTimeout=true,則會直到線程數量=0
- allowCoreThreadTimeout:允許核心線程超時
- rejectedExecutionHandler:任務拒絕處理器
-
- 兩種情況會拒絕處理任務:
-
- 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
- 當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
- 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
- ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
-
- AbortPolicy:直接拋出一個異常,默認策略
- DiscardPolicy: 直接丟棄任務
- DiscardOldestPolicy:拋棄下一個將要被執行的任務(最舊任務)
- CallerRunsPolicy:主線程中執行任務
- 實現RejectedExecutionHandler接口,可自定義處理器
- 當線程數小於核心線程數時,創建線程。
- 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
- 當線程數大於等於核心線程數,且任務隊列已滿
- 若線程數小於最大線程數,創建線程
- 若線程數等於最大線程數,拋出異常,拒絕任務
- 默認值
-
- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()
- 如何來設置
-
- 需要根據幾個值來決定
-
- tasks :每秒的任務數,假設為500~1000
- taskcost:每個任務花費時間,假設為0.1s
- responsetime:系統允許容忍的最大響應時間,假設為1s
- 做幾個計算
-
- corePoolSize = 每秒需要多少個線程處理?
-
- threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
- 根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
-
- 計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
- 切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
-
- 計算可得 maxPoolSize = (1000-80)/10 = 92
- (最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
- rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
- keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
- 以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
幾種典型的工作隊列
- ArrayBlockingQueue:使用數組實現的有界阻塞隊列,特性先進先出
- LinkedBlockingQueue:使用鏈表實現的阻塞隊列,特性先進先出,可以設置其容量,默認為Interger.MAX_VALUE,特性先進先出
- PriorityBlockingQueue:使用平衡二叉樹堆,實現的具有優先級的無界阻塞隊列
- DelayQueue:無界阻塞延遲隊列,隊列中每個元素均有過期時間,當從隊列獲取元素時,只有
- 過期元素才會出隊列。隊列頭元素是最塊要過期的元素。
- SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作,必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態
幾種典型的線程池
SingleThreadExecutor 單個線程線程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
創建單個線程。它適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景。
SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1,使用無界隊列LinkedBlockingQueue作為線程池的工作隊列。
- 當線程池中沒有線程時,會創建一個新線程來執行任務。
- 當前線程池中有一個線程后,將新任務加入LinkedBlockingQueue
- 線程執行完第一個任務后,會在一個無限循環中反復從LinkedBlockingQueue 獲取任務來執行。
**使用場景:**適用於串行執行任務場景
FixedThreadPool 創建一個線程數量固定的線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
corePoolSize等於maximumPoolSize,所以線程池中只有核心線程,使用無界阻塞隊列LinkedBlockingQueue作為工作隊列
FixedThreadPool是一種線程數量固定的線程池,當線程處於空閑狀態時,他們並不會被回收,除非線程池被關閉。當所有的線程都處於活動狀態時,新的任務都會處於等待狀態,直到有線程空閑出來。
- 如果當前運行的線程數少於corePoolSize,則創建新線程來執行任務。
- 在線程數目達到corePoolSize后,將新任務放到LinkedBlockingQueue阻塞隊列中。
- 線程執行完(1)中任務后,會在循環中反復從LinkedBlockingQueue獲取任務來執行。
使用場景:適用於處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務。推薦:250期面試題匯總
CachedThreadPool 一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
核心線程數為0,總線程數量閾值為Integer.MAX_VALUE,即可以創建無限的非核心線程
執行流程
- 先執行SynchronousQueue的offer方法提交任務,並查詢線程池中是否有空閑線程來執行SynchronousQueue的poll方法來移除任務。如果有,則配對成功,將任務交給這個空閑線程
- 否則,配對失敗,創建新的線程去處理任務
- 當線程池中的線程空閑時,會執行SynchronousQueue的poll方法等待執行SynchronousQueue中新提交的任務。若等待超過60s,空閑線程就會終止
流程形象圖

結構形象圖
使用場景:執行大量短生命周期任務。因為maximumPoolSize是無界的,所以提交任務的速度 > 線程池中線程處理任務的速度就要不斷創建新線程;每次提交任務,都會立即有線程去處理,因此CachedThreadPool適用於處理大量、耗時少的任務。
ScheduledThreadPool 創建一個周期線程池,支持定時及周期性任務執行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
線程總數閾值為Integer.MAX_VALUE,工作隊列使用DelayedWorkQueue,非核心線程存活時間為0,所以線程池僅僅包含固定數目的核心線程。
兩種方式提交任務:
- scheduleAtFixedRate: 按照固定速率周期執行
- scheduleWithFixedDelay:上個任務延遲固定時間后執行
使用場景:周期性執行任務,並且需要限制線程數量的場景
使用無界隊列的線程池會導致內存飆升嗎?
答案 :會的,newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程獲取一個任務后,任務的執行時間比較長,會導致隊列的任務越積越多,導致機器內存使用不停飆升, 最終導致OOM。
ExecutorService executorService = new ThreadPoolExecutor( 5, 15, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );
- https://blog.csdn.net/liuchangjie0112/article/details/90698401
- https://zhuanlan.zhihu.com/p/73990200