一、Executors創建線程池
二、ThreadPoolExecutor類
三、ThreadPoolExecutor類擴展
一、Executors創建線程池
Java中創建線程池很簡單,只需要調用Executors中相應的便捷方法即可,如Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()、Executors.newCachedThreadPool()等方法。這些方法雖然便捷,但是也有其局限性,如:OOM,線程耗盡。
小程序使用這些便捷方法沒什么問題,對於服務端需要長期運行的程序,創建線程池應該直接使用ThreadPoolExecutor進行創建。上述便捷方法的創建也是通過ThreadPoolExecutor實現的。
二、ThreadPoolExecutor類
1、線程池工作順序
線程池的工作順序為:corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略。
即:核心線程 -> 任務隊列 -> 除核心線程外的其他線程 -> 拒絕策略
2、ThreadPoolExecutor構造函數
Executors中創建線程池的便捷方法,實際上是調用了ThreadPoolExecutor的構造方法,定時任務線程池便捷方法Executors.newScheduledThreadPool()內部使用的是ScheduledThreadPoolExecutor。
ThreadPoolExecutor構造函數參數列表如下:
1 public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數量
2 int maximumPoolSize, //線程池最大線程數量
3 long keepAliveTime, //超過corePooleSize的空閑線程的存活時長
4 TimeUnit unit, //空閑線程存活時長單位
5 BlockingQueue<Runnable> workQueue, //任務的排隊隊列
6 ThreadFactory threadFactory, //新線程的線程工廠
7 RejectedExecutionHandler handler) //拒絕策略
比較容易出問題的參數有corePoolSize、maximumPoolSize、workQueue以及handler:
- corePoolSize和maximumPoolSize設置不當會影響效率,甚至耗盡線程
- workQueue設置不當容易導致OOM
- handler設置不當會導致提交任務時拋出異常
3、workQueue任務隊列
任務隊列一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列。
- 直接任務隊列:設置為SynchronousQueue隊列。SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒;反之,每一個刪除操作也要等待對應的插入操作。
1 public class SynchronousQueueTest { 2 3 private static ExecutorService pool; 4 5 public static void main(String[] args) { 6 7 //核心線程數設為1,最大線程數設為2,任務隊列為SynchronousQueue,拒絕策略為AbortPolicy,直接拋出異常 8 pool = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 9 for(int i = 0;i < 3;i++){ 10 pool.execute(new ThreadTask()); 11 } 12 } 13 } 14 15 class ThreadTask implements Runnable{ 16 17 @Override 18 public void run() { 19 System.out.println(Thread.currentThread().getName()); 20 } 21 }
執行結果如下:
pool-1-thread-2 pool-1-thread-1 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.aisino.threadPool.ThreadTask@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at com.aisino.threadPool.SynchronousQueueTest.main(SynchronousQueueTest.java:18)
由執行結果可知,當任務隊列為SynchronousQueue,創建的線程數大於maximumPoolSize時,直接執行拒絕策略拋出異常。
使用SynchronousQueue隊列時,提交的任務不會被保存,總是會馬上提交執行。如果用於執行任務的線程數小於maximumPoolSize,則嘗試創建新的線程,如果達到maximumPoolSize設置的最大值,則根據設置的handler執行對應的拒絕策略。因此使用SynchronousQueue隊列時,任務不會被緩存起來,而是馬上執行,在這種情況下,需要對程序的並發量有個准確的評估,才能設置合適的maximumPoolSize數量,否則很容易執行拒絕策略。
- 有界任務隊列:可使用ArrayBlockingQueue實現,如下所示:
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
使用有界任務隊列ArrayBlockingQueue時,如果有新的任務需要執行,線程池會創建新的線程,直到創建的線程數量達到corePoolSize,之后新的任務會被加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大於maximumPoolSize,則執行拒絕策略。在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界任務隊列初始量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize及以下;反之,當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。
- 無界的任務隊列:可使用LinkedBlockingQueue實現,如下所示:
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用無界任務隊列LinkedBlockingQueue時,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是corePoolSize。在這種情況下maximumPoolSize參數是無效的,哪怕任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,線程數也不會再增加了。若后續有新的任務加入,則直接進入隊列等待。使用這種任務隊列模式時,要注意任務提交與處理之間的協調控制,不然會出現隊列中的任務由於無法及時處理導致的一直增長,直到最后出現資源耗盡的問題。
- 優先任務隊列:通過PriorityBlockingQueue實現,如下所示:
1 public class PriorityBlockingQueueTest { 2 3 private static ExecutorService pool; 4 public static void main(String[] args) { 5 6 //使用優先任務隊列 7 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 8 for(int i = 0;i < 20;i++){ 9 pool.execute(new PriorityThreadTask(i)); 10 } 11 } 12 13 } 14 15 class PriorityThreadTask implements Runnable, Comparable<PriorityThreadTask>{ 16 17 private int priority; 18 19 public int getPriority(){ 20 return priority; 21 } 22 public void setPriority(int priority){ 23 this.priority = priority; 24 } 25 26 public PriorityThreadTask(){} 27 28 public PriorityThreadTask(int priority){ 29 this.priority = priority; 30 } 31 32 @Override 33 public void run() { 34 35 try{ 36 //讓線程阻塞,使后續任務進入緩存隊列 37 Thread.sleep(1000); 38 39 System.out.println("priority:" + this.priority + ", ThreadName:" + Thread.currentThread().getName()); 40 }catch(InterruptedException e){ 41 e.printStackTrace(); 42 } 43 } 44 45 //當前對象和其他對象作比較,當前優先級大就返回-1,當前優先級小就返回1,值越小優先級越高 46 @Override 47 public int compareTo(PriorityThreadTask o) { 48 return this.priority > o.priority ? -1 : 1; 49 } 50 }
執行結果如下:
priority:0, ThreadName:pool-1-thread-1 priority:19, ThreadName:pool-1-thread-1 priority:18, ThreadName:pool-1-thread-1 priority:17, ThreadName:pool-1-thread-1 priority:16, ThreadName:pool-1-thread-1 priority:15, ThreadName:pool-1-thread-1 priority:14, ThreadName:pool-1-thread-1 priority:13, ThreadName:pool-1-thread-1 priority:12, ThreadName:pool-1-thread-1 priority:11, ThreadName:pool-1-thread-1 priority:10, ThreadName:pool-1-thread-1 priority:9, ThreadName:pool-1-thread-1 priority:8, ThreadName:pool-1-thread-1 priority:7, ThreadName:pool-1-thread-1 priority:6, ThreadName:pool-1-thread-1 priority:5, ThreadName:pool-1-thread-1 priority:4, ThreadName:pool-1-thread-1 priority:3, ThreadName:pool-1-thread-1 priority:2, ThreadName:pool-1-thread-1 priority:1, ThreadName:pool-1-thread-1
由執行結果可看出,除了第一個任務直接創建線程執行外,其他的任務都被放入了優先任務隊列PriorityBlockingQueue中,按優先級進行了重新排列執行,且線程池的線程數一直為corePoolSize,在本例中corePoolSize為1,即線程數一直為1。
PriorityBlockingQueue其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數量也不會超過corePoolSize。其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。
4、handler拒絕策略
在創建線程池時,為防止資源被耗盡,任務隊列都會選擇創建有界任務隊列。在創建有界任務隊列模式下,當任務隊列已滿且線程池創建的線程數達到最大線程數時,需要指定ThreadPoolExecutor的RejectedExecutionHandler參數來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:
- AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作
- DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。使用此策略時,業務場景中需允許任務的丟失
- DiscardOldestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,即任務隊列中最先被添加進去的、馬上要被執行的任務,並嘗試再次提交任務(重復此過程)
- CallerRunsPolicy策略:如果線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行
以上內置的拒絕策略均實現了RejectedExecutionHandler接口,也可自己擴展RejectedExecutionHandler接口,定義自己的拒絕策略。示例代碼如下:
1 /** 2 * 自定義拒絕策略 3 */ 4 public class CustomRejectedExecutionHandlerTest { 5 6 private static ExecutorService pool; 7 8 public static void main(String[] args) { 9 10 //自定義拒絕策略 11 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, 12 new ArrayBlockingQueue<>(5), 13 Executors.defaultThreadFactory(), 14 new RejectedExecutionHandler() { 15 @Override 16 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 17 System.out.println(r.toString() + " 執行了拒絕策略"); 18 } 19 }); 20 21 for (int i = 0; i < 10; i++) { 22 pool.execute(new CustomRejectedExecutionHandlerThreadTask()); 23 } 24 } 25 } 26 27 class CustomRejectedExecutionHandlerThreadTask implements Runnable { 28 29 @Override 30 public void run() { 31 try { 32 //讓線程阻塞,使后續任務機進入緩存隊列 33 Thread.sleep(1000); 34 System.out.println("線程名稱:" + Thread.currentThread().getName()); 35 } catch (InterruptedException e) { 36 e.printStackTrace(); 37 } 38 } 39 }
執行結果如下:
com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@3cd1a2f1 執行了拒絕策略 com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@2f0e140b 執行了拒絕策略 com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@7440e464 執行了拒絕策略 線程名稱:pool-1-thread-2 線程名稱:pool-1-thread-1 線程名稱:pool-1-thread-2 線程名稱:pool-1-thread-1 線程名稱:pool-1-thread-2 線程名稱:pool-1-thread-1 線程名稱:pool-1-thread-2
由執行結果可看出,由於任務添加了休眠阻塞,執行任務需要花費一定時間,導致有一定數量的任務被丟棄,從而執行自定義的拒絕策略。
5、ThreadFactory自定義線程創建
線程池中的線程是通過ThreadPoolExecutor中的線程工廠ThreadFactory創建的。可自定義ThreadFactory對線程池中的線程進行一些特殊的設置(命名、設置優先級等)。示例代碼如下:
1 public class CustomThreadFactoryTest { 2 3 private static ExecutorService pool; 4 5 public static void main(String[] args) { 6 //自定義線程工廠 7 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { 8 @Override 9 public Thread newThread(Runnable r) { 10 System.out.println("創建線程:" + r.hashCode()); 11 //線程名稱 12 Thread th = new Thread(r, "threadPool-" + r.hashCode()); 13 return th; 14 } 15 }, new ThreadPoolExecutor.CallerRunsPolicy()); 16 17 for(int i = 0;i < 10;i++){ 18 pool.execute(new CustomThreadFactoryThreadTask()); 19 } 20 } 21 } 22 23 class CustomThreadFactoryThreadTask implements Runnable{ 24 25 @Override 26 public void run(){ 27 //輸出執行線程的名稱 28 System.out.println("線程名稱:" + Thread.currentThread().getName()); 29 } 30 }
執行結果如下:
創建線程:1259475182 創建線程:1300109446 創建線程:1020371697 線程名稱:threadPool-1259475182 線程名稱:threadPool-1300109446 線程名稱:threadPool-1259475182 創建線程:789451787 線程名稱:threadPool-1020371697 線程名稱:threadPool-1259475182 線程名稱:threadPool-1300109446 線程名稱:threadPool-1259475182 線程名稱:threadPool-1020371697 線程名稱:threadPool-789451787 線程名稱:threadPool-1300109446
由執行結果可看出,每個線程的創建都進行了記錄輸出與命名。
6、正確構造線程池
int poolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); executorService = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, queue, policy);
三、ThreadPoolExecutor類擴展
ThreadPoolExecutor類擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口。
- beforeExecute:線程池中任務運行前執行
- afterExecute:線程池中任務運行完畢后執行
- terminated:線程池退出后執行
通過這三個接口可以監控每個任務的開始時間和結束時間,或者其他功能。示例代碼如下:
1 public class ThreadPoolExecutorExtensionTest { 2 3 private static ExecutorService pool; 4 5 public static void main(String[] args) { 6 7 //自定義線程,為線程重命名 8 pool = new ThreadPoolExecutor(1, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() { 9 @Override 10 public Thread newThread(Runnable r) { 11 System.out.println("創建線程:" + r.hashCode()); 12 Thread th = new Thread(r, "ThreadPool-" + r.hashCode()); 13 return th; 14 } 15 }, new ThreadPoolExecutor.CallerRunsPolicy()){ 16 17 protected void beforeExecute(Thread t,Runnable r) { 18 System.out.println("准備執行的任務名稱:"+ ((ThreadPoolExecutorExtensionThreadTask)r).getTaskName()); 19 } 20 21 protected void afterExecute(Runnable r,Throwable t) { 22 System.out.println("執行完畢的任務名稱:"+((ThreadPoolExecutorExtensionThreadTask)r).getTaskName()); 23 } 24 25 protected void terminated() { 26 System.out.println("線程池退出"); 27 } 28 }; 29 30 for(int i = 0;i < 10;i++){ 31 pool.execute(new ThreadPoolExecutorExtensionThreadTask("Task-" + i)); 32 } 33 pool.shutdown(); 34 } 35 } 36 37 class ThreadPoolExecutorExtensionThreadTask implements Runnable{ 38 39 private String taskName; 40 41 public String getTaskName(){ 42 return taskName; 43 } 44 45 public void setTaskName(String taskName){ 46 this.taskName = taskName; 47 } 48 49 public ThreadPoolExecutorExtensionThreadTask(){} 50 51 public ThreadPoolExecutorExtensionThreadTask(String taskName){ 52 this.taskName = taskName; 53 } 54 55 @Override 56 public void run() { 57 //輸出任務名稱以及對應的執行線程名稱 58 System.out.println("任務名稱:" + this.taskName + ", 執行線程名稱:" + Thread.currentThread().getName()); 59 } 60 }
執行結果如下:
創建線程:1259475182 創建線程:1300109446 創建線程:1020371697 准備執行的任務名稱:Task-0 創建線程:789451787 任務名稱:Task-0, 執行線程名稱:ThreadPool-1259475182 准備執行的任務名稱:Task-6 任務名稱:Task-9, 執行線程名稱:main 執行完畢的任務名稱:Task-0 准備執行的任務名稱:Task-7 准備執行的任務名稱:Task-8 任務名稱:Task-6, 執行線程名稱:ThreadPool-1300109446 任務名稱:Task-8, 執行線程名稱:ThreadPool-789451787 執行完畢的任務名稱:Task-8 准備執行的任務名稱:Task-1 任務名稱:Task-7, 執行線程名稱:ThreadPool-1020371697 執行完畢的任務名稱:Task-7 任務名稱:Task-1, 執行線程名稱:ThreadPool-1259475182 執行完畢的任務名稱:Task-1 准備執行的任務名稱:Task-2 任務名稱:Task-2, 執行線程名稱:ThreadPool-789451787 執行完畢的任務名稱:Task-2 執行完畢的任務名稱:Task-6 准備執行的任務名稱:Task-5 任務名稱:Task-5, 執行線程名稱:ThreadPool-789451787 執行完畢的任務名稱:Task-5 准備執行的任務名稱:Task-4 准備執行的任務名稱:Task-3 任務名稱:Task-4, 執行線程名稱:ThreadPool-1259475182 執行完畢的任務名稱:Task-4 任務名稱:Task-3, 執行線程名稱:ThreadPool-1020371697 執行完畢的任務名稱:Task-3 線程池退出
由執行結果可看出,通過對beforeExecute()、afterExecute()和terminated()的實現,可以對線程池中線程的狀態進行監控,在線程執行前后輸出了相關的打印信息。另外,使用shutdown()方法可以比較安全的關閉線程池,當線程池調用該方法后,線程池將不再接受后續添加的任務。但是,線程池不會立刻退出,而是等到添加到線程池中的任務都處理完成,才會退出。