1、什么是ExecutorService,為什么要使用線程池?
許多服務器應用程序都面向處理來自某些遠程來源的大量短小的任務,每當一個請求到達就創建一個新線程,然后在新線程中為請求服務,但是頻繁創建新線程、銷毀新線程、線程切換既花費較多的時間,影響相應速度,又消耗大量的系統資源,且有時服務器無法處理過多請求導致崩潰。一種情形:假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。 如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性能。ExecutorService是一個線程池,請求到達時,線程已經存在,響應延遲低,多個任務復用線程,避免了線程的重復創建和銷毀,並且可以規定線程數目,請求數目超過閾值時強制其等待直到有空閑線程。
當我們有任務需要多線程來完成時,將任務(實現Runnable、callable接口、繼承Thread類的對象)提交給ExecutorService。
創建方式如下:
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler)
corePoolSize : 核心線程數,一旦創建將不會再釋放。如果創建的線程數還沒有達到指定的核心線程數量,將會繼續創建新的核心線程,直到達到最大核心線程數后,核心線程數將不在增加;如果沒有空閑的核心線程,同時又未達到最大線程數,則將繼續創建非核心線程;如果核心線程數等於最大線程數,則當核心線程都處於激活狀態時,任務將被掛起,等待有空閑線程時再執行。
maximumPoolSize : 最大線程數,允許創建的最大線程數量。如果最大線程數等於核心線程數,則無法創建非核心線程;如果非核心線程處於空閑時,超過設置的空閑時間,則將被回收,釋放占用的資源。
keepAliveTime : 也就是當線程空閑時,所允許保存的最大時間,超過這個時間,線程將被釋放銷毀,但只針對於非核心線程。
unit : 時間單位,TimeUnit.SECONDS等。
workQueue : 任務隊列,用於保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,必須設置容量。此隊列按 FIFO(先進先出)原則對元素進行排序。
- LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,可以設置容量,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入offer操作必須等到另一個線程調用移除poll操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue。
- PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
threadFactory : 線程工廠,用於創建線程。
handler : 當線程邊界和隊列容量已經達到最大時,用於處理阻塞時的程序。
2、ExecutorService的類型
一共有四種線程池:
CachedThreadPool可緩存線程池、SingleThreadExecutor單線程池、FixedThreadPool固定線程數線程池、ScheduledThreadPool 固定線程數,支持定時和周期性任務線程池。
ThreadPoolExecutor(該類就是線程池類)繼承AbstractExecutorService類,該抽象類實現ExecutorService接口,Executors是一個工廠類,包含很多靜態方法,包括newCachedThreadPool、newSingleThreadExecutor、newFixedThreadPool等,這些靜態方法中調用了ThreadPoolExecutor的構造函數,並且不同的線程池調用構造方法時傳入不同的參數。
2.1 CachedThreadPool可緩存線程池 (無界線程池)
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
通過它的創建方式可以知道,創建的都是非核心線程,而且最大線程數為Interge的最大值,空閑線程存活時間是1分鍾。SynchronousQueue隊列,一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作。所以,當我們提交第一個任務的時候,是加入不了隊列的,這就滿足了,一個線程池條件“當無法加入隊列的時候,且任務沒有達到maxsize時,我們將新開啟一個線程任務”。即當線程不夠用的時候會不斷創建新線程,如果線程無限增長,會導致內存溢出。所以我們的maxsize是big big。時間是60s,當一個線程沒有任務執行會暫時保存60s超時時間,如果沒有的新的任務的話,會從cache中remove掉。因此長時間不提交任務的CachedThreadPool不會占用系統資源。就是緩沖區為1的生產者消費者模式。
2.2 SingleThreadExecutor單線程池
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
只用一個線程來執行任務,保證任務按FIFO順序一個個執行。
2.3 FixedThreadPool固定線程數線程池
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
coresize和maxmumsize相同,超時時間為0,隊列用的LinkedBlockingQueue無界的FIFO隊列,如果隊列里面有線程任務的話就從隊列里面取出線程,然后開啟一個新的線程開始執行。 很明顯,這個線程池始終只有size
的線程在運行,大小固定,難以擴展。
2.4 ScheduledThreadPool 固定線程數,支持定時和周期性任務線程池
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
3、創建線程的時機(線程池的工作策略)
- 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(即如果當前運行的線程小於corePoolSize,則任務根本不會添加到workQueue中)
- 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入工作隊列,而不添加新的線程。
- 如果無法將請求加入workQueue(但是隊列已滿),則創建新的線程,除非創建此線程超出 maximumPoolSize,如果超過,在這種情況下,新的任務將被拒絕。
3、代碼例子
1 public class ExecutorServicepool { 2 public static void main(String[] args) throws InterruptedException { 3 int[] a = new int[1]; 4 //創建一個容量為5的線程池 5 ExecutorService executorService = Executors.newFixedThreadPool(5); 6 for(int i = 0;i<15;i++){ 7 //向線程池提交一個任務(其實就是通過線程池來啟動一個線程) 8 executorService.execute(new TestRunnable(a)); 9 System.out.println("============ "+i);
10 Thread.sleep(millis:1000);
System.out.printlin("主線程休眠了1秒鍾") 11 } 12 } 13 } 14 15 class TestRunnable extends Thread{ 16 public int[] count; 17 TestRunnable(int[] a){ 18 this.count = a; 19 } 20 @Override 21 public void run(){ 22 try { 23 if(Thread.currentThread().getName().equals("pool-1-thread-1")) 24 Thread.sleep(2000); 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 System.out.println(Thread.currentThread().getName()+"-線程被調用了"); 29 System.out.println("count值為:"+(++count[0])); 30 } 31 }
得到的輸出結果如下:可知,固定線程數線程池,線程數為5個,提交15個任務給線程池,也就是使用5個線程完成對a[0]++的工作,5個線程之間是異步的,線程池中線程與主線程也是異步的。
============ 0
============ 1
============ 2 ============ 3 ============ 4 ============ 5 ============ 6 ============ 7 ============ 8 ============ 9 ============ 10 ============ 11 ============ 12 ============ 13 ============ 14 pool-1-thread-3-線程被調用了 pool-1-thread-2-線程被調用了 pool-1-thread-5-線程被調用了 pool-1-thread-4-線程被調用了 count值為:2 count值為:3 count值為:1 pool-1-thread-2-線程被調用了 count值為:4 count值為:5 pool-1-thread-3-線程被調用了 pool-1-thread-2-線程被調用了 pool-1-thread-4-線程被調用了 count值為:7 count值為:6 pool-1-thread-2-線程被調用了 count值為:9 count值為:8 pool-1-thread-4-線程被調用了 count值為:10 pool-1-thread-4-線程被調用了 count值為:11 pool-1-thread-2-線程被調用了 count值為:12 pool-1-thread-5-線程被調用了 count值為:13 pool-1-thread-3-線程被調用了 count值為:14
主線程休眠了1秒鍾
pool-1-thread-1-線程被調用了 count值為:15
4、怎么關閉線程池?
執行程序時發現,所有線程執行完畢后,JVM並未結束運行,也就說明線程池沒有正常結束。怎樣正確關閉線程池呢?
調用 Executor 的 shutdown() 方法會等待線程都執行完畢之后再關閉,但是如果調用的是 shutdownNow() 方法,則相當於調用每個線程的 interrupt() 方法。
讓線程池在指定時間內立即關閉:
public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5); final long waitTime = 8 * 1000; final long awaitTime = 2 * 1000; Runnable task1 = new Runnable(){ public void run(){ try { System.out.println("task1 start"); Thread.sleep(waitTime); System.out.println("task1 end"); } catch (InterruptedException e) { System.out.println("task1 interrupted: " + e); } } }; Runnable task2 = new Runnable(){ public void run(){ try { System.out.println("task2 start"); Thread.sleep(1000); System.out.println("task2 end"); } catch (InterruptedException e) { System.out.println("task2 interrupted: " + e); } } }; //消耗時間很長的任務 8秒 pool.execute(task1); //消耗時間1秒 for(int i=0; i<1000; ++i){ pool.execute(task2); } try { // 告訴線程池,如果所有任務執行完畢則關閉線程池 pool.shutdown(); // 判斷線程池是否在限定時間內,或者線程池內線程全部結束 if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){ // 超時的時候向線程池中所有的線程發出中斷(interrupted)。 pool.shutdownNow(); } } catch (InterruptedException e) { System.out.println("awaitTermination interrupted: " + e); } System.out.println("end"); }
task1 start
task2 start
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 start
task2 end
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 end
end
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task1 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
Process finished with exit code 0
如果只執行shutdown,線程池會等待所有線程全部結束才終止線程池。
且!執行shutdown()后,就不能再繼續使用ExecutorService來追加新的任務了,如果繼續調用execute/submit方法執行新的任務的話,就會拋出RejectedExecutionException異常。
所以一般的調用順序為:
shutdown 方法 ,停止接收新的任務
awaitTermination 方法, 判斷任務是否執行完畢或者是否在指定時間內
shutdownNow方法
參考文獻:https://www.cnblogs.com/zhncnblogs/p/10894271.html https://blog.csdn.net/fwt336/article/details/81530581