原文出處:http://www.yund.tech/zdetail.html?type=1&id=961cc31f7bb2409f3a401478dc2cc38e
作者:jstarseven
線程池優勢
在業務場景中, 如果一個對象創建銷毀開銷比較大, 那么此時建議池化對象進行管理.
例如線程, jdbc連接等等, 在高並發場景中, 如果可以復用之前銷毀的對象, 那么系統效率將大大提升.
另外一個好處是可以設定池化對象的上限, 例如預防創建線程數量過多導致系統崩潰的場景.
jdk中的線程池
下文主要從以下幾個角度講解:
- 創建線程池
- 提交任務
- 潛在宕機風險
- 線程池大小配置
- 自定義阻塞隊列BlockingQueue
- 回調接口
- 自定義拒絕策略
- 自定義ThreadFactory
- 關閉線程池
創建線程池
我們可以通過自定義ThreadPoolExecutor或者jdk內置的Executors來創建一系列的線程池
- newFixedThreadPool: 創建固定線程數量的線程池
- newSingleThreadExecutor: 創建單一線程的池
- newCachedThreadPool: 創建線程數量自動擴容, 自動銷毀的線程池
- newScheduledThreadPool: 創建支持計划任務的線程池
上述幾種都是通過new ThreadPoolExecutor()來實現的, 構造函數源碼如下:
1 /** 2 * @param corePoolSize 池內核心線程數量, 超出數量的線程會進入阻塞隊列 3 * @param maximumPoolSize 最大可創建線程數量 4 * @param keepAliveTime 線程存活時間 5 * @param unit 存活時間的單位 6 * @param workQueue 線程溢出后的阻塞隊列 7 */ 8 public ThreadPoolExecutor(int corePoolSize, 9 int maximumPoolSize, 10 long keepAliveTime, 11 TimeUnit unit, 12 BlockingQueue<Runnable> workQueue) { 13 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); 14 } 15 16 public static ExecutorService newFixedThreadPool(int nThreads) { 17 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 18 } 19 20 public static ExecutorService newSingleThreadExecutor() { 21 return new Executors.FinalizableDelegatedExecutorService 22 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); 23 } 24 25 public static ExecutorService newCachedThreadPool() { 26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 27 } 28 29 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 30 return new ScheduledThreadPoolExecutor(corePoolSize); 31 } 32 33 public ScheduledThreadPoolExecutor(int corePoolSize) { 34 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); 35 }
提交任務
直接調用executorService.execute(runnable)或者submit(runnable)即可,
execute和submit的區別在於submit會返回Future來獲取任何執行的結果.
我們看下newScheduledThreadPool的使用示例.
1 public class SchedulePoolDemo { 2 3 public static void main(String[] args){ 4 ScheduledExecutorService service = Executors.newScheduledThreadPool(10); 5 // 如果前面的任務沒有完成, 調度也不會啟動 6 service.scheduleAtFixedRate(new Runnable() { 7 @Override 8 public void run() { 9 try { 10 Thread.sleep(2000); 11 // 每兩秒打印一次. 12 System.out.println(System.currentTimeMillis()/1000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 } 17 }, 0, 2, TimeUnit.SECONDS); 18 } 19 }
潛在宕機風險
使用Executors來創建要注意潛在宕機風險.其返回的線程池對象的弊端如下:
- FixedThreadPool和SingleThreadPoolPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM.
- CachedThreadPool和ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM.
綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來創建線程池, 具體構造函數配置見下文.
線程池大小配置
一般根據任務類型進行區分, 假設CPU為N核
- CPU密集型任務需要減少線程數量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
- IO密集型任務則可以加大線程數量, 可配置線程池大小為 N * 2.
- 混合型任務則可以拆分為CPU密集型與IO密集型, 獨立配置.
自定義阻塞隊列BlockingQueue
主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不同的排隊隊列.
- ArrayBlockingQueue:先進先出隊列,創建時指定大小, 有界;
- LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小為Integer.MAX_VALUE;
- SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一起離開.
- PriorityBlockingQueue: 支持優先級的隊列
回調接口
線程池提供了一些回調方法, 具體使用如下所示.
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) { 2 3 @Override 4 protected void beforeExecute(Thread t, Runnable r) { 5 System.out.println("准備執行任務: " + r.toString()); 6 } 7 8 @Override 9 protected void afterExecute(Runnable r, Throwable t) { 10 System.out.println("結束任務: " + r.toString()); 11 } 12 13 @Override 14 protected void terminated() { 15 System.out.println("線程池退出"); 16 } 17 };
可以在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短時間等等, 還有一些其他的屬性如下:
- taskCount:線程池需要執行的任務數量.
- completedTaskCount:線程池在運行過程中已完成的任務數量.小於或等於taskCount.
- largestPoolSize:線程池曾經創建過的最大線程數量.通過這個數據可以知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
- getPoolSize:線程池的線程數量.如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減.
- getActiveCount:獲取活動的線程數.
自定義拒絕策略
線程池滿負荷運轉后, 因為時間空間的問題, 可能需要拒絕掉部分任務的執行.
jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略
- AbortPolicy: 直接拒絕策略, 拋出異常.
- CallerRunsPolicy: 調用者自己執行任務策略.
- DiscardOldestPolicy: 舍棄最老的未執行任務策略.
使用方式也很簡單, 直接傳參給ThreadPool
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 2 new SynchronousQueue<Runnable>(), 3 Executors.defaultThreadFactory(), 4 new RejectedExecutionHandler() { 5 @Override 6 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 7 System.out.println("reject task: " + r.toString()); 8 } 9 });
自定義ThreadFactory
線程工廠用於創建池里的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.
或者統一指定線程優先級, 設置名稱等等.
1 class NamedThreadFactory implements ThreadFactory { 2 private static final AtomicInteger threadIndex = new AtomicInteger(0); 3 private final String baseName; 4 private final boolean daemon; 5 6 public NamedThreadFactory(String baseName) { 7 this(baseName, true); 8 } 9 10 public NamedThreadFactory(String baseName, boolean daemon) { 11 this.baseName = baseName; 12 this.daemon = daemon; 13 } 14 15 public Thread newThread(Runnable runnable) { 16 Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement()); 17 thread.setDaemon(this.daemon); 18 return thread; 19 } 20 }
關閉線程池
跟直接new Thread不一樣, 局部變量的線程池, 需要手動關閉, 不然會導致線程泄漏問題.
默認提供兩種方式關閉線程池.
- shutdown: 等所有任務, 包括阻塞隊列中的執行完, 才會終止, 但是不會接受新任務.
- shutdownNow: 立即終止線程池, 打斷正在執行的任務, 清空隊列.
-END-