Java高並發之線程池詳解


原文出處http://www.yund.tech/zdetail.html?type=1&id=961cc31f7bb2409f3a401478dc2cc38e    

作者:jstarseven   


  

線程池優勢

在業務場景中, 如果一個對象創建銷毀開銷比較大, 那么此時建議池化對象進行管理.

例如線程, jdbc連接等等, 在高並發場景中, 如果可以復用之前銷毀的對象, 那么系統效率將大大提升.

另外一個好處是可以設定池化對象的上限, 例如預防創建線程數量過多導致系統崩潰的場景.

jdk中的線程池

下文主要從以下幾個角度講解:

  • 創建線程池
  • 提交任務
  • 潛在宕機風險
  • 線程池大小配置
  • 自定義阻塞隊列BlockingQueue
  • 回調接口
  • 自定義拒絕策略
  • 自定義ThreadFactory
  • 關閉線程池

創建線程池

我們可以通過自定義ThreadPoolExecutor或者jdk內置的Executors來創建一系列的線程池

  • newFixedThreadPool: 創建固定線程數量的線程池
  • newSingleThreadExecutor: 創建單一線程的池
  • newCachedThreadPool: 創建線程數量自動擴容, 自動銷毀的線程池
  • newScheduledThreadPool: 創建支持計划任務的線程池

 上述幾種都是通過new ThreadPoolExecutor()來實現的, 構造函數源碼如下: 

復制代碼
 1     /**
 2      * @param corePoolSize 池內核心線程數量, 超出數量的線程會進入阻塞隊列
 3      * @param maximumPoolSize 最大可創建線程數量
 4      * @param keepAliveTime 線程存活時間
 5      * @param unit 存活時間的單位
 6      * @param workQueue 線程溢出后的阻塞隊列
 7      */
 8     public ThreadPoolExecutor(int corePoolSize,
 9                               int maximumPoolSize,
10                               long keepAliveTime,
11                               TimeUnit unit,
12                               BlockingQueue<Runnable> workQueue) {
13         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
14     }
15 
16     public static ExecutorService newFixedThreadPool(int nThreads) {
17         return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
18     }
19 
20     public static ExecutorService newSingleThreadExecutor() {
21         return new Executors.FinalizableDelegatedExecutorService
22                 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
23     }
24 
25     public static ExecutorService newCachedThreadPool() {
26         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
27     }
28 
29     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
30         return new ScheduledThreadPoolExecutor(corePoolSize);
31     }
32 
33     public ScheduledThreadPoolExecutor(int corePoolSize) {
34         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
35     }
復制代碼

提交任務

直接調用executorService.execute(runnable)或者submit(runnable)即可,

execute和submit的區別在於submit會返回Future來獲取任何執行的結果.

我們看下newScheduledThreadPool的使用示例.

復制代碼
 1 public class SchedulePoolDemo {
 2 
 3     public static void main(String[] args){
 4         ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 5         // 如果前面的任務沒有完成, 調度也不會啟動
 6         service.scheduleAtFixedRate(new Runnable() {
 7             @Override
 8             public void run() {
 9                 try {
10                     Thread.sleep(2000);
11                     // 每兩秒打印一次.
12                     System.out.println(System.currentTimeMillis()/1000);
13                 } catch (InterruptedException e) {
14                     e.printStackTrace();
15                 }
16             }
17         }, 0, 2, TimeUnit.SECONDS);
18     }
19 }
復制代碼

潛在宕機風險

使用Executors來創建要注意潛在宕機風險.其返回的線程池對象的弊端如下:

  • FixedThreadPool和SingleThreadPoolPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM.
  • CachedThreadPool和ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM.

綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來創建線程池, 具體構造函數配置見下文.

線程池大小配置

一般根據任務類型進行區分, 假設CPU為N核

  • CPU密集型任務需要減少線程數量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
  • IO密集型任務則可以加大線程數量, 可配置線程池大小為 N * 2.
  • 混合型任務則可以拆分為CPU密集型與IO密集型, 獨立配置.

自定義阻塞隊列BlockingQueue

主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不同的排隊隊列.

  • ArrayBlockingQueue:先進先出隊列,創建時指定大小, 有界;
  • LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小為Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一起離開.
  • PriorityBlockingQueue: 支持優先級的隊列

回調接口

線程池提供了一些回調方法, 具體使用如下所示.

復制代碼
 1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
 2 
 3             @Override
 4             protected void beforeExecute(Thread t, Runnable r) {
 5                 System.out.println("准備執行任務: " + r.toString());
 6             }
 7 
 8             @Override
 9             protected void afterExecute(Runnable r, Throwable t) {
10                 System.out.println("結束任務: " + r.toString());
11             }
12 
13             @Override
14             protected void terminated() {
15                 System.out.println("線程池退出");
16             }
17         };
復制代碼

可以在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短時間等等, 還有一些其他的屬性如下:

  • taskCount:線程池需要執行的任務數量.
  • completedTaskCount:線程池在運行過程中已完成的任務數量.小於或等於taskCount.
  • largestPoolSize:線程池曾經創建過的最大線程數量.通過這個數據可以知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
  • getPoolSize:線程池的線程數量.如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減.
  • getActiveCount:獲取活動的線程數.

自定義拒絕策略

線程池滿負荷運轉后, 因為時間空間的問題, 可能需要拒絕掉部分任務的執行.

jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略

  • AbortPolicy: 直接拒絕策略, 拋出異常.
  • CallerRunsPolicy: 調用者自己執行任務策略.
  • DiscardOldestPolicy: 舍棄最老的未執行任務策略.

使用方式也很簡單, 直接傳參給ThreadPool

復制代碼
1         ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
2                 new SynchronousQueue<Runnable>(),
3                 Executors.defaultThreadFactory(),
4                 new RejectedExecutionHandler() {
5                     @Override
6                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
7                         System.out.println("reject task: " + r.toString());
8                     }
9                 });
復制代碼

自定義ThreadFactory

線程工廠用於創建池里的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.

或者統一指定線程優先級, 設置名稱等等.

復制代碼
 1 class NamedThreadFactory implements ThreadFactory {
 2     private static final AtomicInteger threadIndex = new AtomicInteger(0);
 3     private final String baseName;
 4     private final boolean daemon;
 5 
 6     public NamedThreadFactory(String baseName) {
 7         this(baseName, true);
 8     }
 9 
10     public NamedThreadFactory(String baseName, boolean daemon) {
11         this.baseName = baseName;
12         this.daemon = daemon;
13     }
14 
15     public Thread newThread(Runnable runnable) {
16         Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
17         thread.setDaemon(this.daemon);
18         return thread;
19     }
20 }
復制代碼

關閉線程池

跟直接new Thread不一樣, 局部變量的線程池, 需要手動關閉, 不然會導致線程泄漏問題.

默認提供兩種方式關閉線程池.

  • shutdown: 等所有任務, 包括阻塞隊列中的執行完, 才會終止, 但是不會接受新任務.
  • shutdownNow: 立即終止線程池, 打斷正在執行的任務, 清空隊列. 

  


 -END-


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM