Java線程池ThreadPoolExecutor類使用詳解


一、Executors創建線程池

二、ThreadPoolExecutor類

三、ThreadPoolExecutor類擴展

 

一、Executors創建線程池

  Java中創建線程池很簡單,只需要調用Executors中相應的便捷方法即可,如Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()、Executors.newCachedThreadPool()等方法。這些方法雖然便捷,但是也有其局限性,如:OOM,線程耗盡。

  小程序使用這些便捷方法沒什么問題,對於服務端需要長期運行的程序,創建線程池應該直接使用ThreadPoolExecutor進行創建。上述便捷方法的創建也是通過ThreadPoolExecutor實現的。

二、ThreadPoolExecutor類

1、線程池工作順序

  線程池的工作順序為:corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略。

  即:核心線程 -> 任務隊列 -> 除核心線程外的其他線程 -> 拒絕策略

2、ThreadPoolExecutor構造函數

  Executors中創建線程池的便捷方法,實際上是調用了ThreadPoolExecutor的構造方法,定時任務線程池便捷方法Executors.newScheduledThreadPool()內部使用的是ScheduledThreadPoolExecutor。

  ThreadPoolExecutor構造函數參數列表如下:

1 public ThreadPoolExecutor(int corePoolSize,                     //線程池核心線程數量
2                           int maximumPoolSize,                  //線程池最大線程數量
3                           long keepAliveTime,                   //超過corePooleSize的空閑線程的存活時長
4                           TimeUnit unit,                        //空閑線程存活時長單位
5                           BlockingQueue<Runnable> workQueue,    //任務的排隊隊列
6                           ThreadFactory threadFactory,          //新線程的線程工廠
7                           RejectedExecutionHandler handler)     //拒絕策略

  比較容易出問題的參數有corePoolSize、maximumPoolSize、workQueue以及handler:

  • corePoolSize和maximumPoolSize設置不當會影響效率,甚至耗盡線程
  • workQueue設置不當容易導致OOM
  • handler設置不當會導致提交任務時拋出異常

3、workQueue任務隊列

  任務隊列一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列。

  • 直接任務隊列:設置為SynchronousQueue隊列。SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒;反之,每一個刪除操作也要等待對應的插入操作。
 1 public class SynchronousQueueTest {
 2 
 3     private static ExecutorService pool;
 4     
 5     public static void main(String[] args) {
 6         
 7         //核心線程數設為1,最大線程數設為2,任務隊列為SynchronousQueue,拒絕策略為AbortPolicy,直接拋出異常
 8         pool = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
 9         for(int i = 0;i < 3;i++){
10             pool.execute(new ThreadTask());
11         }
12     }
13 }
14 
15 class ThreadTask implements Runnable{
16     
17     @Override
18     public void run() {
19         System.out.println(Thread.currentThread().getName());
20     }
21 }

  執行結果如下:

pool-1-thread-2
pool-1-thread-1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.aisino.threadPool.ThreadTask@2f0e140b rejected from java.util.concurrent.ThreadPoolExecutor@7440e464[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.aisino.threadPool.SynchronousQueueTest.main(SynchronousQueueTest.java:18)

   由執行結果可知,當任務隊列為SynchronousQueue,創建的線程數大於maximumPoolSize時,直接執行拒絕策略拋出異常。

  使用SynchronousQueue隊列時,提交的任務不會被保存,總是會馬上提交執行。如果用於執行任務的線程數小於maximumPoolSize,則嘗試創建新的線程,如果達到maximumPoolSize設置的最大值,則根據設置的handler執行對應的拒絕策略。因此使用SynchronousQueue隊列時,任務不會被緩存起來,而是馬上執行,在這種情況下,需要對程序的並發量有個准確的評估,才能設置合適的maximumPoolSize數量,否則很容易執行拒絕策略。

  • 有界任務隊列:可使用ArrayBlockingQueue實現,如下所示:
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

  使用有界任務隊列ArrayBlockingQueue時,如果有新的任務需要執行,線程池會創建新的線程,直到創建的線程數量達到corePoolSize,之后新的任務會被加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大於maximumPoolSize,則執行拒絕策略。在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界任務隊列初始量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize及以下;反之,當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。

  • 無界的任務隊列:可使用LinkedBlockingQueue實現,如下所示:
ExecutorService pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

   使用無界任務隊列LinkedBlockingQueue時,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是corePoolSize。在這種情況下maximumPoolSize參數是無效的,哪怕任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,線程數也不會再增加了。若后續有新的任務加入,則直接進入隊列等待。使用這種任務隊列模式時,要注意任務提交與處理之間的協調控制,不然會出現隊列中的任務由於無法及時處理導致的一直增長,直到最后出現資源耗盡的問題。

  • 優先任務隊列:通過PriorityBlockingQueue實現,如下所示:
 1 public class PriorityBlockingQueueTest {
 2 
 3     private static ExecutorService pool;
 4     public static void main(String[] args) {
 5         
 6         //使用優先任務隊列
 7         pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
 8         for(int i = 0;i < 20;i++){
 9             pool.execute(new PriorityThreadTask(i));
10         }
11     }
12     
13 }
14 
15 class PriorityThreadTask implements Runnable, Comparable<PriorityThreadTask>{
16 
17     private int priority;
18     
19     public int getPriority(){
20         return priority;
21     }
22     public void setPriority(int priority){
23         this.priority = priority;
24     }
25     
26     public PriorityThreadTask(){}
27     
28     public PriorityThreadTask(int priority){
29         this.priority = priority;
30     }
31 
32     @Override
33     public void run() {
34         
35         try{
36             //讓線程阻塞,使后續任務進入緩存隊列
37             Thread.sleep(1000);
38             
39             System.out.println("priority:" + this.priority + ", ThreadName:" + Thread.currentThread().getName());
40         }catch(InterruptedException e){
41             e.printStackTrace();
42         }
43     }
44 
45     //當前對象和其他對象作比較,當前優先級大就返回-1,當前優先級小就返回1,值越小優先級越高
46     @Override
47     public int compareTo(PriorityThreadTask o) {
48         return this.priority > o.priority ? -1 : 1;
49     }
50 }

  執行結果如下:

priority:0, ThreadName:pool-1-thread-1
priority:19, ThreadName:pool-1-thread-1
priority:18, ThreadName:pool-1-thread-1
priority:17, ThreadName:pool-1-thread-1
priority:16, ThreadName:pool-1-thread-1
priority:15, ThreadName:pool-1-thread-1
priority:14, ThreadName:pool-1-thread-1
priority:13, ThreadName:pool-1-thread-1
priority:12, ThreadName:pool-1-thread-1
priority:11, ThreadName:pool-1-thread-1
priority:10, ThreadName:pool-1-thread-1
priority:9, ThreadName:pool-1-thread-1
priority:8, ThreadName:pool-1-thread-1
priority:7, ThreadName:pool-1-thread-1
priority:6, ThreadName:pool-1-thread-1
priority:5, ThreadName:pool-1-thread-1
priority:4, ThreadName:pool-1-thread-1
priority:3, ThreadName:pool-1-thread-1
priority:2, ThreadName:pool-1-thread-1
priority:1, ThreadName:pool-1-thread-1

   由執行結果可看出,除了第一個任務直接創建線程執行外,其他的任務都被放入了優先任務隊列PriorityBlockingQueue中,按優先級進行了重新排列執行,且線程池的線程數一直為corePoolSize,在本例中corePoolSize為1,即線程數一直為1。

  PriorityBlockingQueue其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數量也不會超過corePoolSize。其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。

4、handler拒絕策略

  在創建線程池時,為防止資源被耗盡,任務隊列都會選擇創建有界任務隊列。在創建有界任務隊列模式下,當任務隊列已滿且線程池創建的線程數達到最大線程數時,需要指定ThreadPoolExecutor的RejectedExecutionHandler參數來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:

  • AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作
  • DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。使用此策略時,業務場景中需允許任務的丟失
  • DiscardOldestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,即任務隊列中最先被添加進去的、馬上要被執行的任務,並嘗試再次提交任務(重復此過程)
  • CallerRunsPolicy策略:如果線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行

  以上內置的拒絕策略均實現了RejectedExecutionHandler接口,也可自己擴展RejectedExecutionHandler接口,定義自己的拒絕策略。示例代碼如下:

 1 /**
 2  * 自定義拒絕策略
 3  */
 4 public class CustomRejectedExecutionHandlerTest {
 5 
 6     private static ExecutorService pool;
 7 
 8     public static void main(String[] args) {
 9 
10         //自定義拒絕策略
11         pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
12                 new ArrayBlockingQueue<>(5),
13                 Executors.defaultThreadFactory(),
14                 new RejectedExecutionHandler() {
15                     @Override
16                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
17                         System.out.println(r.toString() + " 執行了拒絕策略");
18                     }
19                 });
20 
21         for (int i = 0; i < 10; i++) {
22             pool.execute(new CustomRejectedExecutionHandlerThreadTask());
23         }
24     }
25 }
26 
27 class CustomRejectedExecutionHandlerThreadTask implements Runnable {
28 
29     @Override
30     public void run() {
31         try {
32             //讓線程阻塞,使后續任務機進入緩存隊列
33             Thread.sleep(1000);
34             System.out.println("線程名稱:" + Thread.currentThread().getName());
35         } catch (InterruptedException e) {
36             e.printStackTrace();
37         }
38     }
39 }

  執行結果如下:

com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@3cd1a2f1 執行了拒絕策略
com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@2f0e140b 執行了拒絕策略
com.aisino.threadPool.CustomRejectedExecutionHandlerThreadTask@7440e464 執行了拒絕策略
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2

   由執行結果可看出,由於任務添加了休眠阻塞,執行任務需要花費一定時間,導致有一定數量的任務被丟棄,從而執行自定義的拒絕策略。

5、ThreadFactory自定義線程創建

  線程池中的線程是通過ThreadPoolExecutor中的線程工廠ThreadFactory創建的。可自定義ThreadFactory對線程池中的線程進行一些特殊的設置(命名、設置優先級等)。示例代碼如下:

 1 public class CustomThreadFactoryTest {
 2     
 3     private static ExecutorService pool;
 4 
 5     public static void main(String[] args) {
 6         //自定義線程工廠
 7         pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() {
 8             @Override
 9             public Thread newThread(Runnable r) {
10                 System.out.println("創建線程:" + r.hashCode());
11                 //線程名稱
12                 Thread th = new Thread(r, "threadPool-" + r.hashCode());
13                 return th;
14             }
15         }, new ThreadPoolExecutor.CallerRunsPolicy());
16         
17         for(int i = 0;i < 10;i++){
18             pool.execute(new CustomThreadFactoryThreadTask());
19         }
20     }
21 }
22 
23 class CustomThreadFactoryThreadTask implements Runnable{
24     
25     @Override
26     public void run(){
27         //輸出執行線程的名稱
28         System.out.println("線程名稱:" + Thread.currentThread().getName());
29     }
30 }

  執行結果如下:

創建線程:1259475182
創建線程:1300109446
創建線程:1020371697
線程名稱:threadPool-1259475182
線程名稱:threadPool-1300109446
線程名稱:threadPool-1259475182
創建線程:789451787
線程名稱:threadPool-1020371697
線程名稱:threadPool-1259475182
線程名稱:threadPool-1300109446
線程名稱:threadPool-1259475182
線程名稱:threadPool-1020371697
線程名稱:threadPool-789451787
線程名稱:threadPool-1300109446

   由執行結果可看出,每個線程的創建都進行了記錄輸出與命名。

6、正確構造線程池

int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, queue, policy);

 

三、ThreadPoolExecutor類擴展

  ThreadPoolExecutor類擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口。

  • beforeExecute:線程池中任務運行前執行
  • afterExecute:線程池中任務運行完畢后執行
  • terminated:線程池退出后執行

  通過這三個接口可以監控每個任務的開始時間和結束時間,或者其他功能。示例代碼如下:

 1 public class ThreadPoolExecutorExtensionTest {
 2     
 3     private static ExecutorService pool;
 4 
 5     public static void main(String[] args) {
 6         
 7         //自定義線程,為線程重命名
 8         pool = new ThreadPoolExecutor(1, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new ThreadFactory() {
 9             @Override
10             public Thread newThread(Runnable r) {
11                 System.out.println("創建線程:" + r.hashCode());
12                 Thread th = new Thread(r, "ThreadPool-" + r.hashCode());
13                 return th;
14             }
15         }, new ThreadPoolExecutor.CallerRunsPolicy()){
16             
17             protected void beforeExecute(Thread t,Runnable r) {
18                 System.out.println("准備執行的任務名稱:"+ ((ThreadPoolExecutorExtensionThreadTask)r).getTaskName());
19             }
20 
21             protected void afterExecute(Runnable r,Throwable t) {
22                 System.out.println("執行完畢的任務名稱:"+((ThreadPoolExecutorExtensionThreadTask)r).getTaskName());
23             }
24 
25             protected void terminated() {
26                 System.out.println("線程池退出");
27             }
28         };
29         
30         for(int i = 0;i < 10;i++){
31             pool.execute(new ThreadPoolExecutorExtensionThreadTask("Task-" + i));
32         }
33         pool.shutdown();
34     }
35 }
36 
37 class ThreadPoolExecutorExtensionThreadTask implements Runnable{
38     
39     private String taskName;
40     
41     public String getTaskName(){
42         return taskName;
43     }
44     
45     public void setTaskName(String taskName){
46         this.taskName = taskName;
47     }
48     
49     public ThreadPoolExecutorExtensionThreadTask(){}
50     
51     public ThreadPoolExecutorExtensionThreadTask(String taskName){
52         this.taskName = taskName;
53     }
54 
55     @Override
56     public void run() {
57         //輸出任務名稱以及對應的執行線程名稱
58         System.out.println("任務名稱:" + this.taskName + ", 執行線程名稱:" + Thread.currentThread().getName());
59     }
60 }

  執行結果如下:

創建線程:1259475182
創建線程:1300109446
創建線程:1020371697
准備執行的任務名稱:Task-0
創建線程:789451787
任務名稱:Task-0, 執行線程名稱:ThreadPool-1259475182
准備執行的任務名稱:Task-6
任務名稱:Task-9, 執行線程名稱:main
執行完畢的任務名稱:Task-0
准備執行的任務名稱:Task-7
准備執行的任務名稱:Task-8
任務名稱:Task-6, 執行線程名稱:ThreadPool-1300109446
任務名稱:Task-8, 執行線程名稱:ThreadPool-789451787
執行完畢的任務名稱:Task-8
准備執行的任務名稱:Task-1
任務名稱:Task-7, 執行線程名稱:ThreadPool-1020371697
執行完畢的任務名稱:Task-7
任務名稱:Task-1, 執行線程名稱:ThreadPool-1259475182
執行完畢的任務名稱:Task-1
准備執行的任務名稱:Task-2
任務名稱:Task-2, 執行線程名稱:ThreadPool-789451787
執行完畢的任務名稱:Task-2
執行完畢的任務名稱:Task-6
准備執行的任務名稱:Task-5
任務名稱:Task-5, 執行線程名稱:ThreadPool-789451787
執行完畢的任務名稱:Task-5
准備執行的任務名稱:Task-4
准備執行的任務名稱:Task-3
任務名稱:Task-4, 執行線程名稱:ThreadPool-1259475182
執行完畢的任務名稱:Task-4
任務名稱:Task-3, 執行線程名稱:ThreadPool-1020371697
執行完畢的任務名稱:Task-3
線程池退出

   由執行結果可看出,通過對beforeExecute()、afterExecute()和terminated()的實現,可以對線程池中線程的狀態進行監控,在線程執行前后輸出了相關的打印信息。另外,使用shutdown()方法可以比較安全的關閉線程池,當線程池調用該方法后,線程池將不再接受后續添加的任務。但是,線程池不會立刻退出,而是等到添加到線程池中的任務都處理完成,才會退出。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM