1、概述
在Java中,我們一般通過集成Thread類和實現Runnnable接口,調用線程的start()方法實現線程的啟動。但如果並發的數量很多,而且每個線程都是執行很短的時間便結束了,那樣頻繁的創建線程和銷毀進程會大大的降低系統運行的效率。線程池正是為了解決多線程效率低的問題而產生的,他使得線程可以被復用,就是線程執行結束后不被銷毀,而是可以繼續執行其他任務。(這里可以用tomcat做例子進行思考)
很多人想問,線程池聽起來高大上,但在實際工作中卻很少使用。其實不然,在各種流行框架或者高性能的架構中,池化技術是無處不在的。所以有人就想問了,線程池有什么用呢?
一言以蔽之,就是提高系統效率和吞吐量。如果服務器對每個請求都分別創建一個線程的話,在很短時間內就會產生很多創建和銷毀的動作,然而服務器在創建和銷毀線程上花費的時間和消耗的系統資源都相當大。線程池就可以盡量減少這種情況的發生。
因此java.util.concurrent.ThreadPoolExecutor這個類(java5以后才出現,由大師 Doug Lea 完成的),我們就不能不講了,它就是今天的主菜
2、栗子
一、ThreadPoolExecutor的重要參數
corePoolSize:核心線程數
核心線程會一直存活,即使沒有任務需要執行
當線程數小於核心線程數時(還未滿,就會一直增),即使有線程空閑,線程池也會優先創建新線程處理
設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
queueCapacity:任務隊列容量(阻塞隊列)
當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
maxPoolSize:最大線程數
當線程數>corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務,直到線程數量達到maxPoolSize
當線程數已經=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
keepAliveTime:線程空閑時間
當線程空閑時間達到keepAliveTime時,線程會被銷毀,直到線程數量=corePoolSize
如果allowCoreThreadTimeout=true,則會直到線程數量=0(這個特性需要注意)
allowCoreThreadTimeout:允許核心線程超時(如上,會影響keepAliveTime哦)
rejectedExecutionHandler:任務拒絕處理器(用戶可以自定義拒絕后的處理方式)
兩種情況會拒絕處理任務:
1、當線程數已經達到maxPoolSize,且任務隊列已滿時,會拒絕新任務
2、當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務(並不是立馬停止,而是執行完再停止)。
若拒絕后,此時,線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置,默認值是AbortPolicy,會拋出異常
hreadPoolExecutor類有幾個內部實現類來處理這類情況:
1: AbortPolicy 丟棄任務,拋運行時異常
2:CallerRunsPolicy 執行任務(這個策略重試添加當前的任務,他會自動重復調用 execute() 方法,直到成功) 如果執行器已關閉,則丟棄.
3:DiscardPolicy 對拒絕任務直接無聲拋棄,沒有異常信息
4:DiscardOldestPolicy 對拒絕任務不拋棄,而是拋棄隊列里面等待最久的(隊列頭部的任務將被刪除)一個線程,然后把拒絕任務加到隊列(Queue是先進先出的任務調度算法,具體策略會咋下面有分析)(如果再次失敗,則重復此過程)
5:實現RejectedExecutionHandler接口,可自定義處理器(可以自己實現然后set進去)
二、ThreadPoolExecutor處理任務的順序、原理
一個任務通過 execute(Runnable) 方法被添加到線程池,任務就是一個 Runnable 類型的對象,任務的執行方法就是 Runnable 類型對象的 run() 方法。
當一個任務通過 execute(Runnable) 方法欲添加到線程池時,線程池采用的策略如下(即添加任務的策略):
如果此時線程池中的數量小於 corePoolSize ,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
如果此時線程池中的數量等於 corePoolSize ,但是緩沖隊列 workQueue 未滿,那么任務被放入緩沖隊列。
如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量小於maximumPoolSize ,建新的線程來處理被添加的任務。
如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量等於maximumPoolSize ,那么通過 handler 所指定的策略來處理此任務。
任務處理的優先級(順序)為:
核心線程 corePoolSize 、任務隊列 workQueue 、最大線程 maximumPoolSize ,如果三者都滿了,使用 handler處理被拒絕的任務。當線程池中的線程數量大於 corePoolSize 時,如果某線程空閑時間超過 keepAliveTime ,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
簡要概括如下:
當線程數小於核心線程數時,創建線程。
當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
當線程數大於等於核心線程數,且任務隊列已滿
若線程數小於最大線程數,創建線程
若線程數等於最大線程數,拋出異常,拒絕任務
線程池處理流程圖:
這里提醒各位:如果失敗處理策略選擇了DiscardOldestPolicy,你是有可能丟掉任務的哦。
另外 Executors 類里面還有幾個方法:newFixedThreadPool(),newCachedThreadPool() 等幾個方法,實際上也是間接調用了ThreadPoolExocutor ,不過是傳的不同的構造參數。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池
Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池
ThreadPoolExecutor 的繼承關系如下
Executor->ExecutorService->AbstractExecutorService->ThreadPoolExecutor
其中有一個構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
發現我們可以指定workQueue和handler。當然還有其余的構造函數,有類似的效果
線程池構造函數7大參數解釋:
corePoolSize:核心線程數。
maximumPoolSize:最大線程數。表明線程中最多能夠創建的線程數量。
keepAliveTime:空閑的線程保留的時間。
unit:空閑線程的保留時間單位。
BlockingQueue workQueue:用於保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
1、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列
3、SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
4、PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
threadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程做些更有意義的事情,比如設置daemon和優先級等等
handler:飽和策略處理器。默認提供的4中策略上面已經有解釋了
強烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)和Executors.newSingleThreadExecutor()(單個后台線程),它們均為大多數使用場景預定義了設置。
但是,但是,但是。。。用fix是有坑的。詳情請見我這篇博文(在生產環境的一個活生生的血案):
另外,此處我說一下ThreadPoolTaskExecutor:
ThreadPoolTaskExecutor是一個spring的線程池技術,其實,它的實現方式完全是使用ThreadPoolExecutor進行實現(有點類似於裝飾者模式。當然Spring提供的功能更加強大些,因為還有定時調度功能)。
三、如何設置線程池的參數:
系統默認值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
那我們如何來設置呢?需要根據幾個值來決定
tasks :每秒的任務數,假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s
做幾個計算
corePoolSize = 每秒需要多少個線程處理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
JDK1.5 的線程池由 Executor 框架提供。 Executor 框架將處理請求任務的提交和它的執行解耦。可以制定執行策略。在線程池中執行線程可以重用已經存在的線程,而不是創建新的線程,可以在處理多請求時抵消線程創建、消亡產生的開銷。如果線程池過大,會導致內存的高使用量,還可能耗盡資源。如果過小,會由於存在很多的處理器資源未工作,對吞吐量造成損失。
如何合理配置線程池大小,一般需要根據任務的類型來配置線程池大小:
1、如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1(比如是4核心 就配置為5)
2、如果是IO密集型任務,參考值可以設置為2*NCPU
當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。
3、使用場景
1、當你的任務是非必要的時候。比如記錄操作日志、通知第三方服務非必要信息等,可以使用線程池處理非阻塞任務
2、當你的任務非常耗時時候,可以采用線程池技術
3、當請求並發很高時,可以采用線程池技術優化處理
可以通過Executors靜態工廠構建線程池,但一般不建議這樣使用。
提醒:能夠用線程池的時候,不要自己的去new線程start,在高並發環境下,系統資源是寶貴的,需要節約資源才能提高可用性。
小彩蛋
Executors工具類給我們提供了不少快捷創建線程池的方法,雖然我們不推薦使用。但是里面有個方法我覺得還是不錯的:Executors.newSingleThreadExecutor() 如果把這個當作全局線程池,可以很好實現異步,並且還能保證任務的順序執行,進而達到消峰的效果:
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private static AtomicInteger num = new AtomicInteger();
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
executorService.execute(() -> {
num.getAndIncrement();
System.out.println(Thread.currentThread().getName() + "-->>>>" + num);
});
}
executorService.shutdown();
}
輸出:
pool-1-thread-1-->>>>1
pool-1-thread-1-->>>>2
pool-1-thread-1-->>>>3
pool-1-thread-1-->>>>4
pool-1-thread-1-->>>>5
pool-1-thread-1-->>>>6
pool-1-thread-1-->>>>7
pool-1-thread-1-->>>>8
pool-1-thread-1-->>>>9
pool-1-thread-1-->>>>10
pool-1-thread-1-->>>>11
pool-1-thread-1-->>>>12
pool-1-thread-1-->>>>13
pool-1-thread-1-->>>>14
pool-1-thread-1-->>>>15
pool-1-thread-1-->>>>16
pool-1-thread-1-->>>>17
pool-1-thread-1-->>>>18
pool-1-thread-1-->>>>19
pool-1-thread-1-->>>>20
我們發現pool只有一個,且thread也只有一個,而且任務都是順序執行的。當我們用Fiexed的話,也是能夠達到順序執行的效果的。因為內部的阻塞隊列是FIFO的實現:
private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
private static AtomicInteger num = new AtomicInteger();
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "-->>>>" + num.getAndIncrement());
});
}
executorService.shutdown();
}
輸出:
pool-1-thread-2-->>>>0
pool-1-thread-1-->>>>1
pool-1-thread-2-->>>>2
pool-1-thread-1-->>>>3
pool-1-thread-2-->>>>4
pool-1-thread-1-->>>>5
pool-1-thread-2-->>>>6
pool-1-thread-1-->>>>7
pool-1-thread-2-->>>>8
pool-1-thread-1-->>>>9
pool-1-thread-2-->>>>10
pool-1-thread-1-->>>>11
pool-1-thread-2-->>>>12
pool-1-thread-1-->>>>13
pool-1-thread-2-->>>>14
pool-1-thread-1-->>>>15
pool-1-thread-2-->>>>16
pool-1-thread-4-->>>>17
pool-1-thread-3-->>>>18
pool-1-thread-5-->>>>19
————————————————
轉載:https://blog.csdn.net/f641385712/article/details/80832636