線程池中的飽和策略


ThreadPoolExecutor允許提供一個BlockingQueue來保存等待執行的任務。

查看結構圖

 

   我們需要關注的方法是offer(E),put(E),take()

newFixedThreadPool和newSingleThreadExecutor在默認情況下將使用一個無界的隊列(LinkedBlockingQueue),如果所有線程都在執行任務,那么任務將在隊列中等待,如果任務到達的速度大於線程執行的速度,造成的后果將是隊列無限期增加。

更穩妥的管理策略是使用有界隊列,如:ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue.有界隊列避免了資源耗盡的情況,但出現一個問題,隊列填滿后,新的任務該怎么辦?使用拒絕策略。

   JDK提供了幾種不同的RejectedExecutionHandler實現,每種都是不同的飽和策略:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy.

AbortPolicy 當任務添加到線程池中被拒絕時,它將拋出 RejectedExecutionException 異常。
如下代碼:
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.*;
 4 
 5 /**
 6  * Created by spark on 17-9-24.
 7  */
 8 public class AbortPolicyDemo {
 9 
10     public static void main(String[] args) {
11         //初始化一個初始化容量大小為1,阻塞隊列容量為1,maxmumPoolSize大小為1的線程池
12         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,
13                 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
14         //設置飽和策略為AbortPolicy---拒絕策略/**/,用戶可以捕獲這個異常
15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
16         //創建線程執行
17         for (int i = 0; i < 5; i++) {
18             MyRunnable myRunnable = new MyRunnable();
19             pool.execute(myRunnable);
20         }
21         pool.shutdown();
22     }
23 
24 
25     static class MyRunnable implements Runnable {
26         @Override
27         public void run() {
28             System.err.println(Thread.currentThread().getId() + ":正在執行");
29             try {
30                 Thread.sleep(300);
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         }
35     }
36 }
 
         
         
        

結果如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.concurrent.executor.AbortPolicyDemo$MyRunnable@1d44bcfa rejected from java.util.concurrent.ThreadPoolExecutor@266474c2[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2066)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at cn.concurrent.executor.AbortPolicyDemo.main(AbortPolicyDemo.java:19)
10:正在執行
10:正在執行

拋出了RejectedExecutionException,由於飽和策略引起的。

如果修改代碼如下:
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.*;
 4 
 5 /**
 6  * Created by spark on 17-9-24.
 7  */
 8 public class AbortPolicyDemo {
 9 
10     public static void main(String[] args) {
11         //初始化一個初始化容量大小為1,阻塞隊列容量為1,maxmumPoolSize大小為1的線程池
12         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,
13                 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
14         //設置飽和策略為AbortPolicy---拒絕策略/**/,用戶可以捕獲這個異常
15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
16         //創建線程執行
17         for (int i = 0; i < 5; i++) {
18             try {
19                 Thread.sleep(500);
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23             MyRunnable myRunnable = new MyRunnable();
24             pool.execute(myRunnable);
25         }
26         pool.shutdown();
27     }
28 
29 
30     static class MyRunnable implements Runnable {
31         @Override
32         public void run() {
33             System.err.println(Thread.currentThread().getId() + ":正在執行");
34             try {
35                 Thread.sleep(300);
36             } catch (InterruptedException e) {
37                 e.printStackTrace();
38             }
39         }
40     }
41 }
 
        

執行結果為:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=43205:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.AbortPolicyDemo
2 10:正在執行
3 10:正在執行
4 10:正在執行
5 10:正在執行
6 10:正在執行
7 
8 Process finished with exit code 0

原因是:正好每一個線程都有足夠的時間執行,因此有界阻塞隊列不會填滿,程序能夠正常運行。

 
        
DiscardOldestPolicy  當任務添加到線程池中被拒絕時,線程池會放棄等待隊列中最舊的未處理任務(拋棄下一個將被執行的任務),然后將被拒絕的任務添加到等待隊列中,如果隊列是一個優先隊列,那么拋棄
最舊的策略就會拋棄優先級最高的任務,因此不要將兩者在一起使用。
如下代碼:
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ThreadPoolExecutor;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 /**
 8  * Created by spark on 17-9-24.
 9  */
10 public class DiscardOledesrPolicy {
11 
12     public static void main(String[] args) {
13 
14         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
15                 new ArrayBlockingQueue<Runnable>(1));
16         //設置飽和策略為DiscardOledestPolicy
17         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
18 
19 
20         for (int i = 0; i < 6; i++) {
21             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
22             pool.submit(myRunnable);
23         }
24         pool.shutdown();
25     }
26 
27     static class MyRunnable implements Runnable {
28 
29         private String name;
30 
31         public MyRunnable(String name) {
32             this.name = name;
33         }
34 
35         @Override
36         public void run() {
37             System.err.println(this.name + ": is running.");
38             try {
39                 Thread.sleep(300);
40             } catch (InterruptedException e) {
41                 e.printStackTrace();
42             }
43         }
44     }
45 }
 
        

結果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=33258:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardOledesrPolicy
this is 0 task: is running.
this is 5 task: is running. 
Process finished with exit code 0

從結果中可以看到,1,2,3,4都被丟棄了。

 
        
DiscardPolicy 該策略默默地丟棄無法處理的任務,不予任何處理
代碼如下:
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.PriorityBlockingQueue;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 /**
 9  * Created by spark on 17-9-24.
10  */
11 public class DiscardPolicy {
12 
13     public static void main(String[] args) {
14         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
15                 new ArrayBlockingQueue<Runnable>(1));
16         //添加飽和策略為丟棄策略
17         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
18         for (int i = 0; i < 6; i++) {
19             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
20             pool.submit(myRunnable);
21         }
22         pool.shutdown();
23     }
24     static class MyRunnable implements Runnable {
25 
26         private String name;
27 
28         public MyRunnable(String name) {
29             this.name = name;
30         }
31 
32         @Override
33         public void run() {
34             System.err.println(this.name + ": is running.");
35             try {
36                 Thread.sleep(300);
37             } catch (InterruptedException e) {
38                 e.printStackTrace();
39             }
40         }
41     }
42 }
 
        

 結果如下:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=41981:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy
2 this is 0 task: is running.
3 this is 1 task: is running.
4 
5 Process finished with exit code 0

 從結果可以看出,2,3,4,5任務都被丟棄了。

線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味着"線程池能同時運行的任務數量最大只能是1"。
線程池pool的阻塞隊列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞隊列,ArrayBlockingQueue的容量為1。這也意味着線程池的阻塞隊列只能有一個線程池阻塞等待。根據""中分析的execute()代碼可知:線程池中共運行了2個任務。第1個任務直接放到Worker中,通過線程去執行;第2個任務放到阻塞隊列中等待。其他的任務都被丟棄了!

修改線程池中的隊列為PriorityBlockingQueue看看結果

 
        
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.PriorityBlockingQueue;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 /**
 9  * Created by spark on 17-9-24.
10  * 13  */
14 public class DiscardPolicy2 {
15 
16     public static void main(String[] args) {
17         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
18                 new PriorityBlockingQueue<>(1));
19         //添加飽和策略為丟棄策略
20         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
21         for (int i = 0; i < 6; i++) {
22             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
23             pool.submit(myRunnable);
24         }
25         pool.shutdown();
26     }
27     static class MyRunnable implements Runnable {
28 
29         private String name;
30 
31         public MyRunnable(String name) {
32             this.name = name;
33         }
34 
35         @Override
36         public void run() {
37             System.err.println(this.name + ": is running.");
38             try {
39                 Thread.sleep(300);
40             } catch (InterruptedException e) {
41                 e.printStackTrace();
42             }
43         }
44     }
45 }

結果如下:
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=42797:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy2
Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at cn.concurrent.executor.DiscardPolicy2.main(DiscardPolicy2.java:23)
this is 0 task: is running.
 
        

報錯的原因是因為我們的任務沒有優先級,因此應該實現Comparaable接口再看看

 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.PriorityBlockingQueue;
 5 import java.util.concurrent.ThreadPoolExecutor;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 /**
 9  * Created by spark on 17-9-24.
10  * 線程池pool的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),這意味着"線程池能同時運行的任務數量最大只能是1"。
11  * 線程池pool的阻塞隊列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞隊列,ArrayBlockingQueue的容量為1。這也意味着線程池的阻塞隊列只能有一個線程池阻塞等待。
12  * 根據""中分析的execute()代碼可知:線程池中共運行了2個任務。第1個任務直接放到Worker中,通過線程去執行;第2個任務放到阻塞隊列中等待。其他的任務都被丟棄了!
13  */
14 public class DiscardPolicy2 {
15 
16     public static void main(String[] args) {
17         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
18                 new PriorityBlockingQueue<>(1));
19         //添加飽和策略為丟棄策略
20         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
21         for (int i = 0; i < 6; i++) {
22             MyRunnable myRunnable = new MyRunnable("this is " + i + " task", i);
23             pool.execute(myRunnable);
24         }
25         pool.shutdown();
26     }
27 
28     static class MyRunnable implements Runnable, Comparable {
29 
30         private String name;
31         private int num;
32 
33         public MyRunnable(String name, int num) {
34             this.name = name;
35             this.num = num;
36         }
37 
38         public MyRunnable(String name) {
39             this.name = name;
40         }
41 
42         @Override
43         public void run() {
44             System.err.println(this.name + ": is running.");
45             try {
46                 Thread.sleep(300);
47             } catch (InterruptedException e) {
48                 e.printStackTrace();
49             }
50         }
51 
52         @Override
53         public int compareTo(Object o) {
54             return 0;
55         }
56     }
57 }

結果:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:41534,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar:/usr/local/idea/lib/idea_rt.jar cn.concurrent.executor.DiscardPolicy2
Connected to the target VM, address: '127.0.0.1:41534', transport: 'socket'
this is 0 task: is running.
this is 1 task: is running.
this is 5 task: is running.
this is 4 task: is running.
this is 3 task: is running.
this is 2 task: is running.
Disconnected from the target VM, address: '127.0.0.1:41534', transport: 'socket'

Process finished with exit code 0

 這里使用execute沒有使用submit是因為submit返回的結果為FutureTask,這個類沒有實現Comparable

 
        
CallerRunsPolicy 該策略只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務(白話就是不會拋棄線程,也不拋出異常,而是將任務回退到調用者,
從而降低新任務的流量),這樣會影響QPS。
代碼如下:
 1 package cn.concurrent.executor;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ThreadPoolExecutor;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 /**
 8  * Created by spark on 17-9-24.
 9  */
10 public class CallerRunsPolicyDemo {
11     public static void main(String[] args) {
12         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
13                 new ArrayBlockingQueue<Runnable>(1));
14         //添加飽和策略為丟棄策略
15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
16         for (int i = 0; i < 6; i++) {
17             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
18             pool.submit(myRunnable);
19         }
20         pool.shutdown();
21     }
22     static class MyRunnable implements Runnable {
23 
24         private String name;
25 
26         public MyRunnable(String name) {
27             this.name = name;
28         }
29 
30         @Override
31         public void run() {
32             System.err.println(this.name + ": is running.");
33             try {
34                 Thread.sleep(300);
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             }
38         }
39     }
40 }
 
        

結果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=40487:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.CallerRunsPolicyDemo
this is 2 task: is running.
this is 0 task: is running.
this is 3 task: is running.
this is 1 task: is running.
this is 5 task: is running.
this is 4 task: is running.

Process finished with exit code 0

 

我們還可以自定義飽和策略:如下:
 1 package cn.concurrent;
 2 
 3 import java.util.concurrent.*;
 4 
 5 /**
 6  * Created by spark on 17-9-3.
 7  * 主要演示線程池的拒絕策略實現的接口RejectedExecutionHandler
 8  */
 9 public class RejectThreadPoolDemo {
10 
11     public static class MyTask implements Runnable {
12         @Override
13         public void run() {
14             System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
15             try {
16                 Thread.sleep(100);
17             } catch (InterruptedException e) {
18                 e.printStackTrace();
19             }
20         }
21     }
22 
23     //實現RejectExecutionHandler
24     public static void main(String[] args) throws InterruptedException {
25         MyTask myTask = new MyTask();
26         //創建一個線程池
27         ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
28                 new LinkedBlockingDeque<Runnable>(10),
29                 Executors.defaultThreadFactory(),
30                 new RejectedExecutionHandler() {
31                     //自定義拒絕策略的處理
32                     @Override
33                     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
34                         System.out.println(runnable.toString() + " is discard");
35                     }
36                 });
37         for(int i=0;i<Integer.MAX_VALUE;i++){
38             es.submit(myTask);
39             Thread.sleep(10);
40         }
41     }
42 }
 
        

結果如下:

  

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=35478:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.RejectThreadPoolDemo
1506244149620:Thread ID:10
1506244149630:Thread ID:11
1506244149640:Thread ID:12
1506244149651:Thread ID:13
1506244149661:Thread ID:14
1506244149720:Thread ID:10
1506244149730:Thread ID:11
1506244149740:Thread ID:12
1506244149751:Thread ID:13
1506244149761:Thread ID:14
1506244149821:Thread ID:10
1506244149830:Thread ID:11
1506244149841:Thread ID:12
1506244149851:Thread ID:13
1506244149862:Thread ID:14
java.util.concurrent.FutureTask@63947c6b is discard
java.util.concurrent.FutureTask@2b193f2d is discard
java.util.concurrent.FutureTask@355da254 is discard
java.util.concurrent.FutureTask@4dc63996 is discard
java.util.concurrent.FutureTask@d716361 is discard
1506244149921:Thread ID:10
1506244149930:Thread ID:11
1506244149941:Thread ID:12
1506244149951:Thread ID:13
1506244149962:Thread ID:14
java.util.concurrent.FutureTask@6ff3c5b5 is discard
java.util.concurrent.FutureTask@3764951d is discard
java.util.concurrent.FutureTask@4b1210ee is discard
java.util.concurrent.FutureTask@4d7e1886 is discard
java.util.concurrent.FutureTask@3cd1a2f1 is discard
1506244150023:Thread ID:10
1506244150031:Thread ID:11

 可以看到,飽和策略生效了,在實際應用中,我們可以記錄日志,分析系統的負載和任務丟失的情況。

記錄點點滴滴,有很多要學習,望大家指點。
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM