ThreadPoolExecutor線程池參數設置技巧


一、ThreadPoolExecutor的重要參數
  • corePoolSize:核心線程數
    • 核心線程會一直存活,及時沒有任務需要執行
    • 當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
    • 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
  • queueCapacity:任務隊列容量(阻塞隊列)
    • 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
  • maxPoolSize:最大線程數
    • 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
    • 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
  • keepAliveTime:線程空閑時間
    • 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
    • 如果allowCoreThreadTimeout=true,則會直到線程數量=0
  • allowCoreThreadTimeout:允許核心線程超時
  • rejectedExecutionHandler:任務拒絕處理器
    • 兩種情況會拒絕處理任務:
      • 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
      • 當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
    • 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
    • ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
      • AbortPolicy:直接拋出一個異常,默認策略
      • DiscardPolicy: 直接丟棄任務
      • DiscardOldestPolicy:拋棄下一個將要被執行的任務(最舊任務)
      • CallerRunsPolicy:主線程中執行任務
    • 實現RejectedExecutionHandler接口,可自定義處理器
二、ThreadPoolExecutor執行順序:
     線程池按以下行為執行任務
  1. 當線程數小於核心線程數時,創建線程。
  2. 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
  3. 當線程數大於等於核心線程數,且任務隊列已滿
    1. 若線程數小於最大線程數,創建線程
    2. 若線程數等於最大線程數,拋出異常,拒絕任務
 
三、如何 設置 參數
 
  • 默認值
    • corePoolSize=1
    • queueCapacity=Integer.MAX_VALUE
    • maxPoolSize=Integer.MAX_VALUE
    • keepAliveTime=60s
    • allowCoreThreadTimeout=false
    • rejectedExecutionHandler=AbortPolicy()
  • 如何來設置
    • 需要根據幾個值來決定
      • tasks :每秒的任務數,假設為500~1000
      • taskcost:每個任務花費時間,假設為0.1s
      • responsetime:系統允許容忍的最大響應時間,假設為1s
    • 做幾個計算
      • corePoolSize = 每秒需要多少個線程處理? 
        • threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
        • 根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
      • queueCapacity = (coreSizePool/taskcost)*responsetime
        • 計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
        • 切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
      • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
        • 計算可得 maxPoolSize = (1000-80)/10 = 92
        • (最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
      • rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
      • keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
  • 以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
 

幾種典型的工作隊列

  • ArrayBlockingQueue:使用數組實現的有界阻塞隊列,特性先進先出
  • LinkedBlockingQueue:使用鏈表實現的阻塞隊列,特性先進先出,可以設置其容量,默認為Interger.MAX_VALUE,特性先進先出
  • PriorityBlockingQueue:使用平衡二叉樹堆,實現的具有優先級的無界阻塞隊列
  • DelayQueue:無界阻塞延遲隊列,隊列中每個元素均有過期時間,當從隊列獲取元素時,只有
  • 過期元素才會出隊列。隊列頭元素是最塊要過期的元素。
  • SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作,必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態
 

幾種典型的線程池

SingleThreadExecutor 單個線程線程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
創建單個線程。它適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動的應用場景。

SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1,使用無界隊列LinkedBlockingQueue作為線程池的工作隊列。

【260期】Java線程池,這篇能讓你和面試官聊了半小時

  • 當線程池中沒有線程時,會創建一個新線程來執行任務。
  • 當前線程池中有一個線程后,將新任務加入LinkedBlockingQueue
  • 線程執行完第一個任務后,會在一個無限循環中反復從LinkedBlockingQueue 獲取任務來執行。

**使用場景:**適用於串行執行任務場景

FixedThreadPool 創建一個線程數量固定的線程池,可控制線程最大並發數,超出的線程會在隊列中等待。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

corePoolSize等於maximumPoolSize,所以線程池中只有核心線程,使用無界阻塞隊列LinkedBlockingQueue作為工作隊列

FixedThreadPool是一種線程數量固定的線程池,當線程處於空閑狀態時,他們並不會被回收,除非線程池被關閉。當所有的線程都處於活動狀態時,新的任務都會處於等待狀態,直到有線程空閑出來。

【260期】Java線程池,這篇能讓你和面試官聊了半小時

  • 如果當前運行的線程數少於corePoolSize,則創建新線程來執行任務。
  • 在線程數目達到corePoolSize后,將新任務放到LinkedBlockingQueue阻塞隊列中。
  • 線程執行完(1)中任務后,會在循環中反復從LinkedBlockingQueue獲取任務來執行。

使用場景:適用於處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務。推薦:250期面試題匯總

CachedThreadPool 一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心線程數為0,總線程數量閾值為Integer.MAX_VALUE,即可以創建無限的非核心線程

執行流程

  • 先執行SynchronousQueue的offer方法提交任務,並查詢線程池中是否有空閑線程來執行SynchronousQueue的poll方法來移除任務。如果有,則配對成功,將任務交給這個空閑線程
  • 否則,配對失敗,創建新的線程去處理任務
  • 當線程池中的線程空閑時,會執行SynchronousQueue的poll方法等待執行SynchronousQueue中新提交的任務。若等待超過60s,空閑線程就會終止

流程形象圖

【260期】Java線程池,這篇能讓你和面試官聊了半小時

結構形象圖

【260期】Java線程池,這篇能讓你和面試官聊了半小時

使用場景:執行大量短生命周期任務。因為maximumPoolSize是無界的,所以提交任務的速度 > 線程池中線程處理任務的速度就要不斷創建新線程;每次提交任務,都會立即有線程去處理,因此CachedThreadPool適用於處理大量、耗時少的任務。

ScheduledThreadPool 創建一個周期線程池,支持定時及周期性任務執行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

線程總數閾值為Integer.MAX_VALUE,工作隊列使用DelayedWorkQueue,非核心線程存活時間為0,所以線程池僅僅包含固定數目的核心線程。

兩種方式提交任務:

  • scheduleAtFixedRate: 按照固定速率周期執行
  • scheduleWithFixedDelay:上個任務延遲固定時間后執行

使用場景:周期性執行任務,並且需要限制線程數量的場景

使用無界隊列的線程池會導致內存飆升嗎?

答案 :會的,newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程獲取一個任務后,任務的執行時間比較長,會導致隊列的任務越積越多,導致機器內存使用不停飆升, 最終導致OOM。

 

     ExecutorService executorService = new ThreadPoolExecutor(
                5,
                15,
                30L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
        );

 

參考:
https://www.javazhiyin.com/86333.html
  • https://blog.csdn.net/liuchangjie0112/article/details/90698401
  • https://zhuanlan.zhihu.com/p/73990200


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM