java線程池優化


ThreadPoolExecutor機制
一、概述
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部線程池的形式對外提供管理任務執行,線程調度,線程池管理等等服務;
2、Executors方法提供的線程服務,都是通過參數設置來實現不同的線程池機制。
3、先來了解其線程池管理的機制,有助於正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的線程池

二、核心構造方法講解
下面是ThreadPoolExecutor最核心的構造方法
Java代碼
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

構造方法參數講解
參數名 作用
corePoolSize 核心線程池大小
maximumPoolSize 最大線程池大小
keepAliveTime 線程池中超過corePoolSize數目的空閑線程最大存活時間;可以allowCoreThreadTimeOut(true)使得核心線程有效時間
TimeUnit keepAliveTime時間單位
workQueue 阻塞任務隊列
threadFactory 新建線程工廠
RejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

一、ThreadPoolExecutor的重要參數

corePoolSize:核心線程數
核心線程會一直存活,及時沒有任務需要執行
當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
queueCapacity:任務隊列容量(阻塞隊列)
當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
maxPoolSize:最大線程數
當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
keepAliveTime:線程空閑時間
當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
如果allowCoreThreadTimeout=true,則會直到線程數量=0
allowCoreThreadTimeout:允許核心線程超時
rejectedExecutionHandler:任務拒絕處理器
兩種情況會拒絕處理任務:
當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
AbortPolicy 丟棄任務,拋運行時異常
CallerRunsPolicy 執行任務
DiscardPolicy 忽視,什么都不會發生
DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執行)的任務
實現RejectedExecutionHandler接口,可自定義處理器

二、ThreadPoolExecutor執行順序:
線程池按以下行為執行任務

當線程數小於核心線程數時,創建線程。
當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
當線程數大於等於核心線程數,且任務隊列已滿
若線程數小於最大線程數,創建線程
若線程數等於最大線程數,拋出異常,拒絕任務

三、如何設置參數

默認值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何來設置
需要根據幾個值來決定
tasks :每秒的任務數,假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s
做幾個計算
corePoolSize = 每秒需要多少個線程處理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
————————————————

重點講解:
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關系。

1.當線程池小於corePoolSize時,新提交任務將創建一個新線程執行任務,即使此時線程池中存在空閑線程。
2.當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會創建新線程執行任務
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
5.當線程池中超過corePoolSize線程,空閑時間達到keepAliveTime時,關閉空閑線程
6.當設置allowCoreThreadTimeOut(true)時,線程池中corePoolSize線程空閑時間達到keepAliveTime也將關閉

線程管理機制圖示:


三、Executors提供的線程池配置方案

1、構造一個固定線程數目的線程池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多余的任務將存在再阻塞隊列,不會由RejectedExecutionHandler處理
Java代碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

2、構造一個緩沖功能的線程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞隊列 SynchronousQueue,因此任務提交之后,將會創建新的線程執行;線程空閑超過60s將會銷毀
Java代碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

3、構造一個只支持一個線程的線程池,配置corePoolSize=maximumPoolSize=1,無界阻塞隊列LinkedBlockingQueue;保證任務由一個線程串行執行
Java代碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

4、構造有定時功能的線程池,配置corePoolSize,無界延遲阻塞隊列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界隊列,所以這個值是沒有意義的
Java代碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

————————————————

總結:
1、用ThreadPoolExecutor自定義線程池,看線程是的用途,如果任務量不大,可以用無界隊列,如果任務量非常大,要用有界隊列,防止OOM
2、如果任務量很大,還要求每個任務都處理成功,要對提交的任務進行阻塞提交,重寫拒絕機制,改為阻塞提交。保證不拋棄一個任務
3、最大線程數一般設為2N+1最好,N是CPU核數
4、核心線程數,看應用,如果是任務,一天跑一次,設置為0,合適,因為跑完就停掉了,如果是常用線程池,看任務量,是保留一個核心還是幾個核心線程數
5、如果要獲取任務執行結果,用CompletionService,但是注意,獲取任務的結果的要重新開一個線程獲取,如果在主線程獲取,就要等任務都提交后才獲取,就會阻塞大量任務結果,隊列過大OOM,所以最好異步開個線程獲取結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM