並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略


 
 
概述
   1.1 創建
 

第1 部分 配置ThreadPoolExecutor

1.1 創建

ThreadPoolExecutor為一些Executor提供了基本的實現,比如,newCachedThreadPool,newFixedThreadPool等等。ThreadPoolExecutor允許各種定制

它的構造函數如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}

最常用的newCachedThreadPool。

1.2 管理隊列任務

據線程池的大小來選擇合適的隊列有利於充分利用資源和防止耗盡資源。

基本的排隊方法有3種:

  • 無界隊列
  • 有界隊列
  • 同步移交
對於Executor,newCachedThreadPool工廠方法是一種很好的默認選擇,它能提供比固定大小的線程池更好的排隊性能。
 

1.3 飽和策略

我們在ThreadPoolExecutor的構造函數中看到了最后一個參數。  RejectedExecutionHandler handler。這個就是飽和策略。

JDK提供了幾種不同的實現:

  • DiscardOldestPolicy
  • AbortPolicy
  • CallerRunsPolicy
  • discardPolicy

AbortPolicy是默認的飽和策略,就是中止任務,該策略將拋出RejectedExecutionException。調用者可以捕獲這個異常然后去編寫代碼處理異常。

當新提交的任務無法保存到隊列中等待執行時,DiscardPolicy會稍稍的拋棄該任務,DiscardOldestPolicy則會拋棄最舊的(下一個將被執行的任務),然后嘗試重新提交新的任務。如果工作隊列是那個優先級隊列時,搭配DiscardOldestPolicy飽和策略會導致優先級最高的那個任務被拋棄,所以兩者不要組合使用。

CallerRunsPolicy是“調用者運行”策略,實現了一種調節機制 。它不會拋棄任務,也不會拋出異常。 而是將任務回退到調用者。它不會在線程池中執行任務,而是在一個調用了Executor的線程中執行該任務。

 1 /**
 2  * 調用者運行的飽和策略
 3  * @ClassName: ThreadDeadlock2
 4  * TODO
 5  * @author xingle
 6  * @date 2014-11-20 下午4:18:11
 7  */
 8 public class ThreadDeadlock2 {
 9     ExecutorService exec = new ThreadPoolExecutor(0, 2, 60L, TimeUnit.SECONDS,  
10             new SynchronousQueue<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());
11 
12     private void putrunnable() {
13         for (int i = 0; i < 4; i++) {
14             exec.submit(new Runnable() {
15 
16                 @Override
17                 public void run() {
18                     // TODO Auto-generated method stub
19                     while (true) {
20                         System.out.println(Thread.currentThread().getName());
21                         try {
22                             Thread.sleep(500);
23                         } catch (InterruptedException e) {
24                             // TODO Auto-generated catch block
25                             e.printStackTrace();
26                         }
27                     }
28                 }
29             });
30         }
31     }
32     public static void main(String[] args) {
33         new ThreadDeadlock2().putrunnable();
34     }
35 }

 

執行結果:

 

 當工作隊列被填滿之后,沒有預定義的飽和策略來阻塞execute。通過使用 Semaphore (信號量)來限制任務的到達率,就可以實現這個功能。在下面的BoundedExecutor 中給出了這種方法,該方法使用了一個無界隊列,並設置信號量的上界設置為線程池的大小加上可排隊任務的數量,這是因為信號量需要控制正在執行的和正在等待執行的任務數量。

 1 /**
 2  * 8.4 使用Semaphore來控制任務的提交速率
 3  * @ClassName: BoundedExecutor
 4  * TODO
 5  * @author xingle
 6  * @date 2014-11-20 下午2:46:19
 7  */
 8 public class BoundedExecutor {
 9     private final Executor exec;
10     private final Semaphore semaphore;
11     int bound;
12 
13     public BoundedExecutor(Executor exec,int bound){
14         this.exec = exec;
15         this.semaphore = new Semaphore(bound);
16         this.bound = bound;
17     }
18     
19     public void submitTask(final Runnable command) throws InterruptedException{
20         //通過 acquire() 獲取一個許可
21         semaphore.acquire();
22         System.out.println("----------當前可用的信號量個數:"+semaphore.availablePermits());
23         try {
24             exec.execute(new Runnable() {
25 
26                 @Override
27                 public void run() {
28                     try {
29                         System.out.println("線程" + Thread.currentThread().getName() +"進入,當前已有" + (bound-semaphore.availablePermits()) + "個並發");
30                         command.run();
31                     } finally {
32                         //release() 釋放一個許可
33                         semaphore.release();
34                          System.out.println("線程" + Thread.currentThread().getName() +   
35                                     "已離開,當前已有" + (bound-semaphore.availablePermits()) + "個並發");  
36                     }
37                 }
38             });
39         } catch (RejectedExecutionException e) {
40             semaphore.release();
41         }        
42     }
43 }

 

測試程序:

 1 public class BoundedExecutor_main {
 2     public static void main(String[] args) throws InterruptedException{
 3         ExecutorService exec = Executors.newCachedThreadPool();    
 4         BoundedExecutor e = new BoundedExecutor(exec, 3);
 5         
 6         for(int i = 0;i<5;i++){
 7             final int c = i;
 8             e.submitTask(new Runnable() {
 9 
10                 @Override
11                 public void run() {
12                     System.out.println("執行任務:" +c);
13                 }
14             });
15         }
16     }
17 }

 

執行結果:

 

 


參考:
1.《java 並發編程》 8.2-8.3


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM