原文出處:http://www.yund.tech/zdetail.html?type=1&id=961cc31f7bb2409f3a401478dc2cc38e
作者:jstarseven
線程池優勢
在業務場景中, 如果一個對象創建銷毀開銷比較大, 那么此時建議池化對象進行管理.
例如線程, jdbc連接等等, 在高並發場景中, 如果可以復用之前銷毀的對象, 那么系統效率將大大提升.
另外一個好處是可以設定池化對象的上限, 例如預防創建線程數量過多導致系統崩潰的場景.
jdk中的線程池

下文主要從以下幾個角度講解:
- 創建線程池
- 提交任務
- 潛在宕機風險
- 線程池大小配置
- 自定義阻塞隊列BlockingQueue
- 回調接口
- 自定義拒絕策略
- 自定義ThreadFactory
- 關閉線程池
創建線程池
我們可以通過自定義ThreadPoolExecutor或者jdk內置的Executors來創建一系列的線程池
- newFixedThreadPool: 創建固定線程數量的線程池
- newSingleThreadExecutor: 創建單一線程的池
- newCachedThreadPool: 創建線程數量自動擴容, 自動銷毀的線程池
- newScheduledThreadPool: 創建支持計划任務的線程池
上述幾種都是通過new ThreadPoolExecutor()來實現的, 構造函數源碼如下:
1 /**
2 * @param corePoolSize 池內核心線程數量, 超出數量的線程會進入阻塞隊列
3 * @param maximumPoolSize 最大可創建線程數量
4 * @param keepAliveTime 線程存活時間
5 * @param unit 存活時間的單位
6 * @param workQueue 線程溢出后的阻塞隊列
7 */
8 public ThreadPoolExecutor(int corePoolSize,
9 int maximumPoolSize,
10 long keepAliveTime,
11 TimeUnit unit,
12 BlockingQueue<Runnable> workQueue) {
13 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
14 }
15
16 public static ExecutorService newFixedThreadPool(int nThreads) {
17 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
18 }
19
20 public static ExecutorService newSingleThreadExecutor() {
21 return new Executors.FinalizableDelegatedExecutorService
22 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
23 }
24
25 public static ExecutorService newCachedThreadPool() {
26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
27 }
28
29 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
30 return new ScheduledThreadPoolExecutor(corePoolSize);
31 }
32
33 public ScheduledThreadPoolExecutor(int corePoolSize) {
34 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
35 }
提交任務
直接調用executorService.execute(runnable)或者submit(runnable)即可,
execute和submit的區別在於submit會返回Future來獲取任何執行的結果.
我們看下newScheduledThreadPool的使用示例.
1 public class SchedulePoolDemo {
2
3 public static void main(String[] args){
4 ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
5 // 如果前面的任務沒有完成, 調度也不會啟動
6 service.scheduleAtFixedRate(new Runnable() {
7 @Override
8 public void run() {
9 try {
10 Thread.sleep(2000);
11 // 每兩秒打印一次.
12 System.out.println(System.currentTimeMillis()/1000);
13 } catch (InterruptedException e) {
14 e.printStackTrace();
15 }
16 }
17 }, 0, 2, TimeUnit.SECONDS);
18 }
19 }
潛在宕機風險
使用Executors來創建要注意潛在宕機風險.其返回的線程池對象的弊端如下:
- FixedThreadPool和SingleThreadPoolPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM.
- CachedThreadPool和ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM.
綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來創建線程池, 具體構造函數配置見下文.
線程池大小配置
一般根據任務類型進行區分, 假設CPU為N核
- CPU密集型任務需要減少線程數量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
- IO密集型任務則可以加大線程數量, 可配置線程池大小為 N * 2.
- 混合型任務則可以拆分為CPU密集型與IO密集型, 獨立配置.
自定義阻塞隊列BlockingQueue
主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不同的排隊隊列.
- ArrayBlockingQueue:先進先出隊列,創建時指定大小, 有界;
- LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小為Integer.MAX_VALUE;
- SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用於生產者和消費者互等對方, 一起離開.
- PriorityBlockingQueue: 支持優先級的隊列
回調接口
線程池提供了一些回調方法, 具體使用如下所示.
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
2
3 @Override
4 protected void beforeExecute(Thread t, Runnable r) {
5 System.out.println("准備執行任務: " + r.toString());
6 }
7
8 @Override
9 protected void afterExecute(Runnable r, Throwable t) {
10 System.out.println("結束任務: " + r.toString());
11 }
12
13 @Override
14 protected void terminated() {
15 System.out.println("線程池退出");
16 }
17 };
可以在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短時間等等, 還有一些其他的屬性如下:
- taskCount:線程池需要執行的任務數量.
- completedTaskCount:線程池在運行過程中已完成的任務數量.小於或等於taskCount.
- largestPoolSize:線程池曾經創建過的最大線程數量.通過這個數據可以知道線程池是否滿過.如等於線程池的最大大小,則表示線程池曾經滿了.
- getPoolSize:線程池的線程數量.如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減.
- getActiveCount:獲取活動的線程數.
自定義拒絕策略
線程池滿負荷運轉后, 因為時間空間的問題, 可能需要拒絕掉部分任務的執行.
jdk提供了RejectedExecutionHandler接口, 並內置了幾種線程拒絕策略
- AbortPolicy: 直接拒絕策略, 拋出異常.
- CallerRunsPolicy: 調用者自己執行任務策略.
- DiscardOldestPolicy: 舍棄最老的未執行任務策略.
使用方式也很簡單, 直接傳參給ThreadPool
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
2 new SynchronousQueue<Runnable>(),
3 Executors.defaultThreadFactory(),
4 new RejectedExecutionHandler() {
5 @Override
6 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
7 System.out.println("reject task: " + r.toString());
8 }
9 });
自定義ThreadFactory
線程工廠用於創建池里的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.
或者統一指定線程優先級, 設置名稱等等.
1 class NamedThreadFactory implements ThreadFactory {
2 private static final AtomicInteger threadIndex = new AtomicInteger(0);
3 private final String baseName;
4 private final boolean daemon;
5
6 public NamedThreadFactory(String baseName) {
7 this(baseName, true);
8 }
9
10 public NamedThreadFactory(String baseName, boolean daemon) {
11 this.baseName = baseName;
12 this.daemon = daemon;
13 }
14
15 public Thread newThread(Runnable runnable) {
16 Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
17 thread.setDaemon(this.daemon);
18 return thread;
19 }
20 }
關閉線程池
跟直接new Thread不一樣, 局部變量的線程池, 需要手動關閉, 不然會導致線程泄漏問題.
默認提供兩種方式關閉線程池.
- shutdown: 等所有任務, 包括阻塞隊列中的執行完, 才會終止, 但是不會接受新任務.
- shutdownNow: 立即終止線程池, 打斷正在執行的任務, 清空隊列.
-END-


