前言
Executors
Executors 是一個Java中的工具類。提供工廠方法來創建不同類型的線程池。
常用方法:
1.newSingleThreadExecutor
介紹:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。
此線程池保證所有任務的執行順序按照任務的提交順序執行。
優點:單線程的線程池,保證線程的順序執行
缺點:不適合並發
2.newFixedThreadPool
介紹:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
優點:固定大小線程池,超出的線程會在隊列中等待
缺點:不支持自定義拒絕策略,大小固定,難以擴展
3.newCachedThreadPool
介紹:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。
優點:很靈活,彈性的線程池線程管理,用多少線程給多大的線程池,不用后及時回收,用則新建
缺點:一旦線程無限增長,會導致內存溢出
4.newScheduledThreadPool
介紹:創建一個定長線程池,支持定時及周期性任務執行
優點:一個固定大小線程池,可以定時或周期性的執行任務
缺點:任務是單線程方式執行,一旦一個任務失敗其他任務也受影響
總結
1)以上線程池都不支持自定義拒絕策略。
2)newFixedThreadPool 和 newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至 OOM。
3)newCachedThreadPool 和 newScheduledThreadPool:
主要問題是線程數最大數是 Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至 OOM。
ThreadPoolExecutor
阿里巴巴的JAVA開發手冊推薦用ThreadPoolExecutor創建線程池。集以上優點於一身。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
參數解釋:
corePoolSize : 線程池核心池的大小。
maximumPoolSize : 線程池的最大線程數。
keepAliveTime : 當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間。
unit : keepAliveTime 的時間單位。
workQueue : 用來儲存等待執行任務的隊列。
threadFactory : 線程工廠。
handler 拒絕策略。
原理:
有請求時,創建線程執行任務,當線程數量等於corePoolSize時,請求加入阻塞隊列里,當隊列滿了時,接着創建線程,線程數等於maximumPoolSize。 當任務處理不過來的時候,線程池開始執行拒絕策略。
阻塞隊列:
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
DelayQueue: 一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue: 一個不存儲元素的阻塞隊列。
LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。
拒絕策略:
ThreadPoolExecutor.AbortPolicy: 丟棄任務並拋出RejectedExecutionException異常。 (默認)
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務。(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。
Demo
1 package com.xxx; 2 3 import com.google.common.util.concurrent.ThreadFactoryBuilder; 4 5 import java.util.concurrent.*; 6 7 /** 8 * 線程池 9 * @author xhq 10 */ 11 public class ThreadPoolService { 12 13 /** 14 * 自定義線程名稱,方便的出錯的時候溯源 15 */ 16 private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build(); 17 18 /** 19 * corePoolSize 線程池核心池的大小 20 * maximumPoolSize 線程池中允許的最大線程數量 21 * keepAliveTime 當線程數大於核心時,此為終止前多余的空閑線程等待新任務的最長時間 22 * unit keepAliveTime 的時間單位 23 * workQueue 用來儲存等待執行任務的隊列 24 * threadFactory 創建線程的工廠類 25 * handler 拒絕策略類,當線程池數量達到上線並且workQueue隊列長度達到上限時就需要對到來的任務做拒絕處理 26 */ 27 private static ExecutorService service = new ThreadPoolExecutor( 28 4, 29 40, 30 0L, 31 TimeUnit.MILLISECONDS, 32 new LinkedBlockingQueue<>(1024), 33 namedThreadFactory, 34 new ThreadPoolExecutor.AbortPolicy() 35 ); 36 37 /** 38 * 獲取線程池 39 * @return 線程池 40 */ 41 public static ExecutorService getEs() { 42 return service; 43 } 44 45 /** 46 * 使用線程池創建線程並異步執行任務 47 * @param r 任務 48 */ 49 public static void newTask(Runnable r) { 50 service.execute(r); 51 } 52 }