本文總結一下線程池是怎么回事,分以下幾個部分,對哪個部分感興趣,可以直接跳到對應的章節
第一部分:線程池類的結構
線程池存在於Java的並發包J.U.C中,線程池可以根據項目靈活控制並發的數目,避免頻繁的創建和銷毀線程,達到線程對象的重用。
下面是線程池的類圖:
1、 接口Executor
接口Executor中,只有一個方法,為execute()
2、 接口ExecutorService,繼承自Executor
幾個重要的方法:
(1) 關閉線程池的方法,有兩種
一個ExecutorService(J.U.C)可以關閉,這將導致它拒絕新的任務。 ExecutorService的兩種關閉線程池的方式,shutdown和shutdownNow方法:
① shutdown():拒收新的任務,立馬關閉正在執行的任務,可能會引起報錯,需要異常捕獲
② shutdownNow():拒收新的任務,等待任務執行完畢,要確保任務里不會有永久等待阻塞的邏輯,否則會導致線程關閉不了
③ 不是馬上關閉,要想等待線程池關閉,還需要調用waitFermination來阻塞等待
④ 還有一些業務場景下,需要知道線程池中的任務是否全部執行完成,當我們關閉線程池之后,可以用isTerminated來判斷所有的線程是否執行完成,千萬不要用isShutdown, 它只是返回你是否調用過shutdown的結果
(2) submit()方法
方法submit延伸的方法Executor.execute(Runnable)通過創建並返回一個Future可用於取消執行和/或等待完成。submit()與execute()的一個區別是submit()有返回值,並且能夠處理異常
3、 Executors(J.U.C),提供了6個靜態方法,分別創建6種不同的線程池,六大靜態方法 內部都是直接或間接調用ThreadPoolExecutor類的構造方法創建線程池對象,這六個靜態方法本身是沒有技術含量的。
Executors(類) |
Executors靜態方法 |
實現類 |
newCachedThreadPool |
ThreadPoolExecutor |
|
newFixedThreadPool |
ThreadPoolExecutor |
|
newSingleThreadExecutor |
ThreadPoolExecutor |
|
newScheduledThreadPool |
ScheduledThreadPoolExecutor |
|
newSingleThreadScheduledExecutor |
ScheduledThreadPoolExecutor |
|
|
newWorkStealingPool |
ForkJoinPool |
Executor(接口):只有一個方法execute() |
下面介紹常用的四種:
(1)FixedThreadPool
FixedThreadPool的特點:固定池子中線程的個數。使用靜態方法newFixedThreadPool()創建線程池的時候指定線程池個數。
(2)CachedThreadPool(彈性緩存線程池)
CachedThreadPool的特點:用newCachedThreadPool()方法創建該線程池對象, 創建之初里面一個線程都沒有,當execute方法或submit方法向線程池提交任務時, 會自動新建 線程;如果線程池中有空余線程,則不會新建;這種線程池一般最多情況可 以容納幾萬個線程,里面的線程空余60s會被回收。
(3)SingleThreadPool(單線程線程池)
SingleThreadPool的特點:池中只有一個線程,如果扔5個任務進來,那么有4個任務將排隊;作用是保證任務的順序執行。
(4)ScheduledThreadpool(定時器線程池)
注意:要用ScheduledExecutorService去創建ScheduledThreadpool,如果用Executor去引用,就只能調用Executor接口中定義的方法;如果用ExecutorService接 口去引用,就只能調用ExecutorService接口中定義的方法,無法使用ScheduledExecutorService接口中新增的方法,那么也就失去了這種線程池的意義
第二部分:線程池的使用
第一種方式,構建一個線程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);
第二種方式,使用ThreadPoolExecutor構建一個線程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class test {
public static void main(String args[]) {
ExecutorService executorService = new ThreadPoolExecutor(5,10,
10,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(5));
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("開始執行線程池中的任務");
}
});
}
}
如果只是簡單的想要改變線程名稱的前綴的話可以自定義ThreadFactory來實現,在Executors.new…中有一個ThreadFactory的參數,如果沒有指定則用的是DefaultThreadFactory。
第三種方式,使用工具來創建線程池,Apache的guava中ThreadFactoryBuilder()來創建線程池,不僅可以避免OOM問題,還可以自定義線程名稱,方便出錯時溯源
第三部分:線程池的流程梳理
1、線程池的參數
(1) corePoolSize:核心線程數的大小
(2) maximumPoolSize:最大線程數的大小
(3) keepAliveTime:線程的空閑時間
(4) TimeUnit:空閑時間的單位
(5) workQueue:阻塞隊列
(6) threadFactory:線程工廠
(7) Handler:拒絕策略
參數的詳細說明:
(1) corePoolSize:池子里的線程數的大小,設置allowCoreThreadTimeOut(true)使核心線程數內的線程也可以被回收
(2) maximumPoolSize:當池子里的線程數達到核心線程數的大小,隊列也滿了,可以繼續創建的線程,直到線程數達到maximumPoolSize
(3) keepAliveTime:線程的空閑時間,是跟核心線程數和最大線程數之間的線程相關,這部分線程,當到達規定的空閑時間還沒有獲取到任務,則會被回收
(4) TimeUnit:空閑時間的單位
(5) workQueue:默認支持4種阻塞隊列
①ArrayBlockingQueue,基於數組的有界阻塞隊列,按FIFO排序。新任務進來后,會放到該隊列的隊尾,有界的數組可以防止資源耗盡問題。
②LinkedBlockingQuene,基於鏈表的無界阻塞隊列(其實最大容量為Interger.MAX),按照FIFO排序。由於該隊列的近似無界性,當線程池中線程數量達到corePoolSize 后,再有新任務進來,會一直存入該隊列,而不會去創建新線程直到maxPoolSize,因此使用該工作隊列時,參數maxPoolSize其實是不起作用的。
③SynchronousQuene,一個不緩存任務的阻塞隊列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會緩存,而是直接被調度執行該任務, 如果沒有可用線程,則創建新線程,如果線程數量達到maxPoolSize,則執行拒絕策略。
④PriorityBlockingQueue,具有優先級的無界阻塞隊列,優先級通過參數Comparator實現。
(6) threadFactory:線程工廠,用來創建一個新線程時使用的工廠,可以用來設定線程名,是否為daemon線程等
(7) Handler:拒絕策略
①CallerRunsPolicy(直接拒絕任務),該策略下,在調用者線程中直接執行被拒絕任務的run方法,除非線程池已經shutdown,則直接拋棄任務。
②AbortPolicy(直接丟棄任務,並拋異常),該策略下,直接丟棄任務,並拋出RejectedExecutionException異常。
③DiscardPolicy(直接丟棄任務,什么都不做),該策略下,直接丟棄任務,什么都不做。
④DiscardOldestPolicy(嘗試添加新任務),該策略下,拋棄進入隊列最早的那個任務,然后嘗試把這次拒絕的任務放入隊列
2、線程池的運行過程
(1) 剛開始運行時,線程池是空的
(2) 一個任務進來,檢查池中的線程數量,是否達到corePoolSize,如果沒有達到,則創建線程,執行任務
(3) 任務執行完成之后,線程不會銷毀,而是阻塞的等待下一個任務
(4) 又進來一個任務,不是直接使用阻塞的線程,而是檢查線程池中的線程數大小,是否達到corePoolSize,如果沒有達到,則繼續創建新的線程,來執行新的任務,如此往復, 直到線程池中的線程數達到corePoolSize,此時停止創建新的線程
(5) 此時,又來新的任務,會選擇線程池中阻塞等待的線程來執行任務,有一個任務進來,喚醒一個線程來執行這個任務,處理完之后,再次阻塞,嘗試在workQueue上獲取下一 個任務,如果線程池中沒有可喚醒的線程,則任務進入workQueue,排隊等待
(6) 如果隊列是無界隊列,比如LinkedBlockingQueue,默認最大容量為Integer.MAX,接近於無界,可用無限制的接收任務,如果隊列是有界隊列,比如ArrayBlockingQueue,可限定隊列大小,當線程池中的線程來不及處理,然后,所有的任務都進入隊列,隊列的任務數也達到限定大小,此時,再來新的任務,就會入隊失敗,然后,就會再次嘗試在線程池里創建線程,直到線程數達到maximumPoolSize,停止創建線程
(7)此時,隊列滿了,新的任務無法入隊,創建的線程數也達到了maximumPoolSize,無法再創建新的線程,此時,就會reject掉,使用拒絕策略RejectedExecutionHandler,不讓繼續提交任務,默認的是AbortPolicy策略,拒絕,並拋出異常
(8) 超出corePoolSize數創建的那部分線程,是跟空閑時間keepAliveTime相關的,如果超過keepAliveTime時間還獲取不到任務,線程會被銷毀,自動釋放掉
第四部分:線程池的應用場景
1、newSingleThreadExecutor:一個單線程的線程池,可以用於需要保證順序執行的場景,並且只有一個線程在執行。
2、newFixedThreadPool:一個固定大小的線程池,可以用於已知並發壓力的情況下,對線程數做限制。
3、newCachedThreadPool:一個可以無限擴大的線程池,比較適合處理執行時間比較小的任務。
4、newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用於需要多個后台線程執行周期任務的場景。
5、newWorkStealingPool:一個擁有多個任務隊列的線程池,可以減少連接數,創建當前可用cpu數量的線程來並行執行。
線程池的實際業務場景:線程池適合單系統的大量的異步任務處理,比如發送短信、保存日志。
第五部分:線程池相關的面試題
1、為什么使用線程池,線程池有什么作用?
線程池,就是一個池子,存放指定數量的線程來執行任務,處理完任務的線程不進行回收,而是阻塞等待下一個任務,避免了頻繁的創建和銷毀線程,達到了線程的重用。
2、如何創建一個線程池?
最常用的,使用ThreadPoolExecutor實現類來創建線程池
3、如何關閉一個線程池?
ExecutorService提供了兩種方法來關閉線程池,shutdown()和shutdownNow()
(1) shutdown:拒收新的任務,立馬關閉正在執行的任務,可能會引起報錯,需要異常捕獲
(2) shutdownNow:拒收新的任務,等待任務執行完畢,要確保任務里不會有永久等待阻塞的邏輯,否則會導致線程關閉不了
不是馬上關閉,要想等待線程池關閉,還需要調用waitFermination來阻塞等待
還有一些業務場景下,需要知道線程池中的任務是否全部執行完成,當我們關閉線程池之后,可以用isTerminated來判斷所有的線程是否執行完成,千萬不要用isShutdown,它只 是返回你是否調用過shutdown的結果
4、submit()和execute()方法的區別?
execute()方法在Executor()接口中,且是接口中唯一的方法
submit()方法在ExecutorService中,ExecutorService接口繼承Executor 接口
execute()方法,開啟線程執行池中的任務
submit()方法,也可以做到execute()的作用,它還可以返回執行結果,它的 功能是提交指定的任務去執行並且返回Future對象(即執行的結果)
submit()和execute()的區別:
(1) 接收的參數不一樣
(2) submit()方法有返回值Future,而execute()方法沒有返回值
(3) submit()方法方便處理Exception異常,意思就是,你在task里會拋出checked或者unchecked exception, 而又希望外面的調用者能夠感知這些exception並作出及時的處理, 用 submit,通過捕獲Future.get拋出的異常
5、為什么不建議使用Executors創建線程,而使用ThreadPoolExecutor實現類來創建線程?
Executors中FixedThreadPool使用的是LinkedBlockingQueue隊列,近乎於無界,隊列大小默認為Integer.MAX_VALUE,幾乎可以無限制的放任務到隊列中,線程池中數量是固定的,當線程池中線程數量達到corePoolSize,不會再創建新的線程,所有任務都會入隊到workQueue中,線程從workQueue中獲取任務,但這個隊列幾乎永遠不會滿,只要隊列不滿,就不會再去創建新的線程,就跟maximumPoolSize和keepAliveTime沒有關系,此時,如果線程池中的線程處理任務的時間特別長,導致無法處理新的任務,隊列中的任務就會不斷的積壓,這個過程,會導致機器的內存使用不停的飆升,極端情況下會導致JVM OOM,系統就掛了。
總結:Executors中FixedThreadPool指定使用無界隊列LinkedBlockingQueue會導致內存溢出,所以,最好使用ThreadPoolExecutor自定義線程池
換一種問法:線程池中,無界隊列導致的內存飆升問題,同上
6、線程池如何調優
(1)首先,根據不同的需求選擇線程池,如果需要單線程順序執行,使用SingleThreadExecutor,如果已知並發壓力,使用FixedThreadPool,固定線程數的大小,執行時間小的任務,可以使用CachedThreadPool,創建可緩存的線程池,可以無限擴大線程池,可以靈活回收空閑線程,最多可容納幾萬個線程,線程空余60s會被回收,需要后台執行周期任務的,可以使用ScheduledThreadPool,可以延時啟動和定時啟動線程池,
(2)如何確認線程池的最大線程數目,分CPU密集型和IO密集型,如果是CPU密集型或計算密集型,因為CPU的利用率高,核心線程數可設置為n(核數)+1,如果是IO密集型,CPU利用率不高,可多給幾個線程數,來進行工作,核心線程數可設置為2n(核數)
線程池的使用分析:線程池,就是一個池子,存放大量創建好的線程,快速的執行任務,並且有隊列可用存放待處理的任務,那么,我們處理批量任務的時候,需要執行大量的任務,同時我們也希望任務執行的越快越好,這時考慮使用多線程,而線程池幫我們封裝好了池子,我們拿過來用就可用,避免了自己去創建回收等管理線程的動作,但是也要注意,線程池的一些參數,對實際的執行影響很大,所以,需要根據具體的場景,調整參數來獲得較高的吞吐量,減少上下文切換,並且,線程池執行的情況和任務類型相關性較大,比如IO密集型和CPU密集型的任務運行起來的情況差異非常大,需要格外注意。
線程池適合單系統的大量的異步任務處理,比如發送短信、保存日志等。
1、幾個真實的場景中如何選擇線程池?
(1) 高並發、任務執行時間短,此類任務可用充分利用CPU,盡可能的減少上下 文切換,線程池的線程數可用設置為CPU核數+1
(2)並發不高、任務執行時間長
此種類型的任務分兩種情況:
① IO密集型的任務,業務長時間集中在IO操作上,因為IO操作並不占用 CPU,所以盡可能的不要讓所有的CPU閑下來,可用加大線程池中的線 程數目,讓CPU處理更多的業務,如設置線程池的線程數為2 * CPU核 數
② 計算密集型的任務,業務長時間集中在計算操作上,和(1)一樣,線程數 可設置為CPU核數+1,減少一下線程數,以便減少線程的上下文切換
(3) 並發高、業務執行時間長,這種類型的任務就不單單要關注線程池了,而是要 從整體架構上來考慮,看能否使用中間件對任務進行拆分和解耦,部分數據做 緩存處理,以及增加服務器等
2、續上👆 線程池參數設置的一些分析 (參考:https://blog.csdn.net/qq_17045385/article/details/79820847)
1) 幾個參數:
tasks:每秒的任務數,假設為500~1000
taskcost:每個任務花費的時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s
2) 做幾個計算:
① corePoolSize:每秒需要多少個線程處理
threadcount = tasks/(1/taskcount) = (500~1000)*0.1 = 50~100
線程數應該設置為大於50個,根據8020原則,如果80%的每秒任務數 小於800,那么corePoolSize設置為80即可
② queueCapacity = (coreSizePool/taskcost)*responsetime = 80/0.1*1 = 80
③ 注意阻塞隊列的大小,LinkedBlockingQueue的大小為
Integer.MAX_VALUE,接近於無界,會導致內存溢出,因為當任務徒增 時,都會進入隊列中,不能開新的線程來執行
④ maxPoolSize = (max(tasks) - queueCapacity)/(1/taskcount)=(最大 任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
計算可得,最大線程數maxPoolSize = (1000-80)/10 = 92
⑤ rejectedExecutionHandler:根據具體情況來決定,任務不重 要可丟棄,任務重要則要利用一些緩沖機制來處理
⑥ keepAliveTime和allowCoreThreadTimeout:采用默認通常 能滿足
3、續上👆 幾個具體場景的分析(8核CPU為例) (參考:https://www.jianshu.com/p/71b5e40f94e0)
(1) 任務數多但資源占用不大,電商平台的消息推送或短信通知,該場景需要被處理的消息對象內容簡單占用資源非常少,通常為百字節量級,但在高並發訪問下,可能瞬間產生大量的任務數,而此類任務的處理通常效率非常高,因此處理的重點在於控制並發線程數,不要以為大量的線程啟用及線程的上下文頻繁切換而導致內存使用率過高,CPU的內核態使用率過高等不良情況發生,通常可以在創建線程池時設置較長的任務隊列,並以CPU內核數2-4倍(經驗值)的值設置核心線程與擴展線程數,合理固定的線程數使得CPU的使用率更加平滑,如:
BlockingQueue queue = newArrayBlockingQueue<>(4096);
ThreadPoolExecutor executor = newThreadPoolExecutor(16, 16, 0, TimeUnit.SECONDS, queue);
(2) 任務數不多但資源占用大,非社交流媒體的使用場景下,該情況多發生於文件流、長文本對象或批量數據加工的處理,如日志收集、圖片流壓縮或批量訂單處理等場景,而此類場景下的單個資源處理,往往會發生較大的資源消耗,因此為使系統達到較強處理能力,同時又可以控制任務資源對內存過大的使用,通常可以在創建線程池時適當加大擴展線程數量,同時設置相對較小的任務隊列長度,如此,當遇到任務數突增的情況,可以有更多的並發線程來應對,此外需要合理設置擴展線程空閑回收的等待時長以節省不必要的開銷,如:
BlockingQueue queue = newArrayBlockingQueue<>(512);
ThreadPoolExecutor executor =newThreadPoolExecutor(16, 64, 30, TimeUnit.SECONDS, queue);
(3) 極端場景的情況,如遇到任務資源較大,任務數較多,同時處理效率不高的場景,首先需要考慮任務的產生發起需要限流,理論上講為保障系統的可用性及穩定運行,任務的發起能力應當略小於任務的處理能力,其次,對於類似場景可以采用以時間換取空間的思想,充分利用系統計算資源,當遇到任務處理能力不足的情況,任務發起方的作業將被阻塞,從而充分保護系統的資源開銷邊界,但可能會導致CPU核心態的使用率高,如:
BlockingQueue queue = newSynchronousQueue<>();
ThreadPoolExecutor executor =newThreadPoolExecutor(64, 64, 0, TimeUnit.SECONDS, queue);
4、線程池中的參數,有corePoolSize和maximumPoolSize,假如隊列能放下所有的任務,線程池里的線程數只會創建corePoolSize的線程,而Tomcat不同,它里面的線程池的運行過程就是先把最大線程數用完,然后再提交任務到隊列里面去的。Tomcat中自帶的線程池,其 maxThreads 默認值是 200(假定 BIO 模式),maxThreads 用完了之后,進隊列。隊列長度(acceptCount)默認是 100,在 BIO 的模式下,Tomcat 的默認配置,最多可以接受到 300 (200+100)個請求。再多就是連接拒絕,connection refused。
有寫的不足的地方,或者需要補充的地方,期待交流,會及時補充。希望聽到意見和建議,互相學習,共同進步。