JAVA並發 Executors框架


Executors框架介紹

Executors框架其內部采用了線程池機制,他在java.util.cocurrent包下,通過該框架來控制線程的啟動、執行、關閉,可以簡化並發編程的操作。因此,通過Executors來啟動線程比使用Thread的start方法更好,而且更容易管理,效率更好,還有關鍵的一點:有助於避免this溢出。

Executors框架包括:線程池、Executor,Executors,ExecutorService、CompletionServince,Future、Callable。

Executor類

Executor接口中定義了方法execute(Runable able)接口,該方法接受一個Runable實例,他來執行一個任務,任務即實現一個Runable接口的類。

ExecutorService

ExecutorService繼承於Executor接口,他提供了更為豐富的線程實現方法,比如ExecutorService提供關閉自己的方法,以及為跟蹤一個或多個異步任務執行狀況而生成Future的方法。

ExecutorService有三種狀態:運行、關閉、終止。創建后便進入運行狀態,當調用了shutdown()方法時,便進入了關閉狀態,此時意味着ExecutorService不再接受新的任務,但是他還是會執行已經提交的任務,當所有已經提交了的任務執行完后,便達到終止狀態。如果不調用shutdown方法,ExecutorService方法會一直運行下去,系統一般不會主動關閉。

Executors

executor提供了工廠的方法來創建線程池,返回的線程池都實現了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int n)

  • 創建固定數目的線程的線程池。
  • newFixedThreadPool於cacheThreadPool差不多。也是能reuse就用,但是不能隨時創建新的線程;
  • 任意時間點,只能夠最多有固定的數目的線程存在,此時如果有新的線程的創建,只能放在等待隊列中,只有當線程池中的一些線程終止被扔出線程池后,才進入線程池進行運行。
  • 和cacheThreadPool不同,FixedThreadPool沒有IDLE機制,所以FixedThreadPool多數針對於很穩定的線程開發,多用於服務器。
  • fixedThreadPool和cacheThreadPool一樣,同用一個底層池,只不過參數不同,fix線程固定,並且是0sIDLE無IDLE;cache線程支持0-Integer.MAX_VALUE,60s的IDLE。

public static ExecutorService newCacheThreadPool()

  • 創建一個可緩存池,調用execute將重用以前構造的線程(如果能夠使用的話)。如果沒有線程可用,那么創建一個線程到線程池中。終止並移除線程池中超過60s沒有被使用過的線程。
  • 緩存池一般用於運行生存期很短的異步線程任務。
  • 放入cacheThreadPool中的線程不用擔心其結束,超時后會被自動終止。
  • 缺省值timeout=60s

public static ExecutorService newSingleThreadExecutor()

  • 創建一個單線程化的Executor。
  • 用的是和cache和fixed池子相同的底層池,無IDLE。

public static ScheduleExecutorService newScheduleThreadPool(int corePoolSize)

創建一個支持定時及周期性的任務執行的線程池,多數情況可以代替Timer類。Timer存在以下缺陷

  • Timer類不管啟動多少定時器,但它只會啟動一條線程,當有多個定時任務時,就會產生延遲。如:我們要求一個任務每隔3S執行,且執行大約需要10S,第二個任務每隔5S執行,兩個任務同時啟動。若使用Timer我們會發現,第而個任務是在第一個任務執行結束后的5S才開始執行。這就是多任務的延時問題。
  • 若多個定時任務中有一個任務拋異常,那所有任務都無法執行。
  • Timer執行周期任務時依賴系統時間。若系統時間發生變化,那Timer執行結果可能也會發生變化。而ScheduledExecutorService基於時間的延遲,並非時間,因此不會由於系統時間的改變發生執行變化。 綜上所述,定時任務要使用ScheduledExecutorService取代Timer。

Executor執行

任務分為兩類:實現Runable和實現Callable的類。兩者都可以被ExecutorService執行,但是Runable任務是沒有返回值,而Callable任務有返回值。而且Callable的Call方法只能通過ExecutorService的submit方法執行,並返回一個Future,表示任務等待完成的Future。

Callable接口類似於Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會拋出異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,后者沒有。

當將一個Callable的對象傳遞給ExecutorService的submit方法,則該call方法自動在一個線程上執行,並且會返回執行結果Future對象。同樣,將Runnable的對象傳遞給ExecutorService的submit方法,則該run方法自動在一個線程上執行,並且會返回執行結果Future對象,但是在該Future對象上調用get方法,將返回null。

實例

 1     public static void main(String[] args) throws InterruptedException, ExecutionException {
 2         ExecutorService executorService=Executors.newCachedThreadPool();
 3         
 4         executorService.execute(new Runnable() {
 5             
 6             @Override
 7             public void run() {
 8                 // TODO Auto-generated method stub
 9                 
10             }
11         });
12         
13         ExecutorService service=Executors.newCachedThreadPool();
14         Future<String> future=service.submit(new Callable<String>() {
15             public String call() {
16                 return "";
17             }
18         });
19         
20         if(future.isDone()) {
21             String reString=future.get();
22         }
23         
24         executorService.shutdown();
25         service.shutdown();
}

 

獲得Callable的多條數據的方式

  1. 自己封裝一個Callable 的結果集,但是這個有弊端
  2. 使用ExecutorService.invokeAll函數。
  3. 使用CompletionService獲取,BlockingQueue、LinkBlockingQueue。
 1 ExecutorService service=Executors.newFixedThreadPool(10);
 2         
 3         ArrayList<Future<String>> list=new ArrayList<Future<String>>();
 4         
 5         for(int i=0;i<10;i++) {
 6             Future<String> rFuture=service.submit(new Callable<String>() {
 7 
 8                 @Override
 9                 public String call() throws Exception {
10                     int sleepTime=new Random().nextInt(1000);
11                     Thread.sleep(sleepTime);
12                     return "線程"+"睡了"+sleepTime+"秒";
13                 }
14             });
15             list.add(rFuture);
16         }
17         for(int i=0;i<10;i++) {
18             Future<String> future=list.get(i);
19             String reString=future.get();
20             System.out.println(reString);
21         }
22         //這種方式處理Callable所得結果集的弊端
23         /*
24          * 需要自己創建容器,而且維護所有的返回結果,比較麻煩
25          * 從list遍歷的每個Future並不一定處於完成狀態,這時調用get方法會被阻塞住,如果系統是設計成每個線程完成后根據結果繼續執行后面的事,這樣的
26          * 處於list后面的但是先完成的線程會增加無用的時間。
27          */
28         ExecutorService executorService=Executors.newFixedThreadPool(10);
29         List<Callable<String>> list2=new ArrayList<Callable<String>>();
30         for(int i=0;i<10;i++) {
31             Callable<String> task=new Callable<String>() {
32                 @Override
33                 public String call() throws Exception {
34                     int sleep=new Random().nextInt(1000);
35                     Thread.sleep(sleep);
36                     return "線程"+"睡了"+sleep+"秒";
37                 }
38             };
39             executorService.submit(task);
40             list2.add(task);
41         }
42         ArrayList<Future<String>> res=(ArrayList<Future<String>>) executorService.invokeAll(list2);
43         
44         for(int i=0;i<10;i++) {
45             Future<String> future=res.get(i);
46             System.out.println(future.get());
47         }
48         /*
49          * 這個方法避免了1方法的不足
50          * CompletionService內部維護了一個阻塞隊列,只有執行完成的任務結果才會被放入該隊列,這樣就確保執行時間較短的任務率先被存入阻塞隊列中。
51          */
52         
53         ExecutorService exe=Executors.newFixedThreadPool(10);
54         final BlockingDeque<Future<Integer>> queue=new LinkedBlockingDeque<>();
55         final CompletionService<Integer> completionService=new ExecutorCompletionService<Integer>(exe,queue);
56         
57         for(int i=0;i<10;i++) {
58             exe.submit(new Callable<String>() {
59 
60                 @Override
61                 public String call() throws Exception {
62                     int sleep=new Random().nextInt(1000);
63                     Thread.sleep(sleep);
64                     return "線程"+"睡了"+sleep+"秒";
65                 }
66             });
67         }
68         for(int i=0;i<10;i++) {
69             Future<Integer> future=completionService.take();
70             System.out.println(future.get());
71         }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM