Executors框架介紹
Executors框架其內部采用了線程池機制,他在java.util.cocurrent包下,通過該框架來控制線程的啟動、執行、關閉,可以簡化並發編程的操作。因此,通過Executors來啟動線程比使用Thread的start方法更好,而且更容易管理,效率更好,還有關鍵的一點:有助於避免this溢出。
Executors框架包括:線程池、Executor,Executors,ExecutorService、CompletionServince,Future、Callable。
Executor類
Executor接口中定義了方法execute(Runable able)接口,該方法接受一個Runable實例,他來執行一個任務,任務即實現一個Runable接口的類。
ExecutorService
ExecutorService繼承於Executor接口,他提供了更為豐富的線程實現方法,比如ExecutorService提供關閉自己的方法,以及為跟蹤一個或多個異步任務執行狀況而生成Future的方法。
ExecutorService有三種狀態:運行、關閉、終止。創建后便進入運行狀態,當調用了shutdown()方法時,便進入了關閉狀態,此時意味着ExecutorService不再接受新的任務,但是他還是會執行已經提交的任務,當所有已經提交了的任務執行完后,便達到終止狀態。如果不調用shutdown方法,ExecutorService方法會一直運行下去,系統一般不會主動關閉。
Executors
executor提供了工廠的方法來創建線程池,返回的線程池都實現了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int n)
- 創建固定數目的線程的線程池。
- newFixedThreadPool於cacheThreadPool差不多。也是能reuse就用,但是不能隨時創建新的線程;
- 任意時間點,只能夠最多有固定的數目的線程存在,此時如果有新的線程的創建,只能放在等待隊列中,只有當線程池中的一些線程終止被扔出線程池后,才進入線程池進行運行。
- 和cacheThreadPool不同,FixedThreadPool沒有IDLE機制,所以FixedThreadPool多數針對於很穩定的線程開發,多用於服務器。
- fixedThreadPool和cacheThreadPool一樣,同用一個底層池,只不過參數不同,fix線程固定,並且是0sIDLE無IDLE;cache線程支持0-Integer.MAX_VALUE,60s的IDLE。
public static ExecutorService newCacheThreadPool()
- 創建一個可緩存池,調用execute將重用以前構造的線程(如果能夠使用的話)。如果沒有線程可用,那么創建一個線程到線程池中。終止並移除線程池中超過60s沒有被使用過的線程。
- 緩存池一般用於運行生存期很短的異步線程任務。
- 放入cacheThreadPool中的線程不用擔心其結束,超時后會被自動終止。
- 缺省值timeout=60s
public static ExecutorService newSingleThreadExecutor()
- 創建一個單線程化的Executor。
- 用的是和cache和fixed池子相同的底層池,無IDLE。
public static ScheduleExecutorService newScheduleThreadPool(int corePoolSize)
創建一個支持定時及周期性的任務執行的線程池,多數情況可以代替Timer類。Timer存在以下缺陷
- Timer類不管啟動多少定時器,但它只會啟動一條線程,當有多個定時任務時,就會產生延遲。如:我們要求一個任務每隔3S執行,且執行大約需要10S,第二個任務每隔5S執行,兩個任務同時啟動。若使用Timer我們會發現,第而個任務是在第一個任務執行結束后的5S才開始執行。這就是多任務的延時問題。
- 若多個定時任務中有一個任務拋異常,那所有任務都無法執行。
- Timer執行周期任務時依賴系統時間。若系統時間發生變化,那Timer執行結果可能也會發生變化。而ScheduledExecutorService基於時間的延遲,並非時間,因此不會由於系統時間的改變發生執行變化。 綜上所述,定時任務要使用ScheduledExecutorService取代Timer。
Executor執行
任務分為兩類:實現Runable和實現Callable的類。兩者都可以被ExecutorService執行,但是Runable任務是沒有返回值,而Callable任務有返回值。而且Callable的Call方法只能通過ExecutorService的submit方法執行,並返回一個Future,表示任務等待完成的Future。
Callable接口類似於Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會拋出異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,后者沒有。
當將一個Callable的對象傳遞給ExecutorService的submit方法,則該call方法自動在一個線程上執行,並且會返回執行結果Future對象。同樣,將Runnable的對象傳遞給ExecutorService的submit方法,則該run方法自動在一個線程上執行,並且會返回執行結果Future對象,但是在該Future對象上調用get方法,將返回null。
實例
1 public static void main(String[] args) throws InterruptedException, ExecutionException { 2 ExecutorService executorService=Executors.newCachedThreadPool(); 3 4 executorService.execute(new Runnable() { 5 6 @Override 7 public void run() { 8 // TODO Auto-generated method stub 9 10 } 11 }); 12 13 ExecutorService service=Executors.newCachedThreadPool(); 14 Future<String> future=service.submit(new Callable<String>() { 15 public String call() { 16 return ""; 17 } 18 }); 19 20 if(future.isDone()) { 21 String reString=future.get(); 22 } 23 24 executorService.shutdown(); 25 service.shutdown();
}
獲得Callable的多條數據的方式
- 自己封裝一個Callable 的結果集,但是這個有弊端
- 使用ExecutorService.invokeAll函數。
- 使用CompletionService獲取,BlockingQueue、LinkBlockingQueue。
1 ExecutorService service=Executors.newFixedThreadPool(10); 2 3 ArrayList<Future<String>> list=new ArrayList<Future<String>>(); 4 5 for(int i=0;i<10;i++) { 6 Future<String> rFuture=service.submit(new Callable<String>() { 7 8 @Override 9 public String call() throws Exception { 10 int sleepTime=new Random().nextInt(1000); 11 Thread.sleep(sleepTime); 12 return "線程"+"睡了"+sleepTime+"秒"; 13 } 14 }); 15 list.add(rFuture); 16 } 17 for(int i=0;i<10;i++) { 18 Future<String> future=list.get(i); 19 String reString=future.get(); 20 System.out.println(reString); 21 } 22 //這種方式處理Callable所得結果集的弊端 23 /* 24 * 需要自己創建容器,而且維護所有的返回結果,比較麻煩 25 * 從list遍歷的每個Future並不一定處於完成狀態,這時調用get方法會被阻塞住,如果系統是設計成每個線程完成后根據結果繼續執行后面的事,這樣的 26 * 處於list后面的但是先完成的線程會增加無用的時間。 27 */ 28 ExecutorService executorService=Executors.newFixedThreadPool(10); 29 List<Callable<String>> list2=new ArrayList<Callable<String>>(); 30 for(int i=0;i<10;i++) { 31 Callable<String> task=new Callable<String>() { 32 @Override 33 public String call() throws Exception { 34 int sleep=new Random().nextInt(1000); 35 Thread.sleep(sleep); 36 return "線程"+"睡了"+sleep+"秒"; 37 } 38 }; 39 executorService.submit(task); 40 list2.add(task); 41 } 42 ArrayList<Future<String>> res=(ArrayList<Future<String>>) executorService.invokeAll(list2); 43 44 for(int i=0;i<10;i++) { 45 Future<String> future=res.get(i); 46 System.out.println(future.get()); 47 } 48 /* 49 * 這個方法避免了1方法的不足 50 * CompletionService內部維護了一個阻塞隊列,只有執行完成的任務結果才會被放入該隊列,這樣就確保執行時間較短的任務率先被存入阻塞隊列中。 51 */ 52 53 ExecutorService exe=Executors.newFixedThreadPool(10); 54 final BlockingDeque<Future<Integer>> queue=new LinkedBlockingDeque<>(); 55 final CompletionService<Integer> completionService=new ExecutorCompletionService<Integer>(exe,queue); 56 57 for(int i=0;i<10;i++) { 58 exe.submit(new Callable<String>() { 59 60 @Override 61 public String call() throws Exception { 62 int sleep=new Random().nextInt(1000); 63 Thread.sleep(sleep); 64 return "線程"+"睡了"+sleep+"秒"; 65 } 66 }); 67 } 68 for(int i=0;i<10;i++) { 69 Future<Integer> future=completionService.take(); 70 System.out.println(future.get()); 71 }