並發編程 06—— CompletionService :Executor 和 BlockingQueue


 

 

概述

第1部分 問題引入及實例

第2部分 實例

 

第1部分 問題引入

《Java並發編程實踐》一書6.3.5節CompletionService:Executor和BlockingQueue,有這樣一段話:

  "如果向Executor提交了一組計算任務,並且希望在計算完成后獲得結果,那么可以保留與每個任務關聯的Future,然后反復使用get方法,同時將參數timeout指定為0,從而通過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionService。"

這是什么意思呢?通過一個例子,分別使用繁瑣的做法和CompletionService來完成,清晰的對比能讓我們更好的理解上面的一段話和CompletionService這個API提供的初衷。

第2部分 實例

考慮這樣的場景,有5個Callable任務分別返回5個整數,然后我們在main方法中按照各個任務完成的先后順序,在控制台打印返回結果。

 

 1 package com.concurrency.TaskExecution_6;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 /**
 7  * 
 8  * @ClassName: ReturnAfterSleepCallable
 9  * TODO
10  * @author Xingle
11  * @date 2014-9-16 上午9:20:34
12  */
13 public class ReturnAfterSleepCallable implements Callable<Integer>{
14     
15     private int sleepSeconds;    
16     private int returnValue;
17     
18     public ReturnAfterSleepCallable(int sleepSeconds,int returnValue){
19         this.sleepSeconds = sleepSeconds;
20         this.returnValue = returnValue;
21     }
22     
23 
24     @Override
25     public Integer call() throws Exception {
26         System.out.println("begin to execute ");
27         
28         TimeUnit.SECONDS.sleep(sleepSeconds);
29         return returnValue;
30     }
31 
32 }

 

1.繁瑣的做法

  通過一個List來保存每個任務返回的Future,然后輪詢這些Future,直到每個Future都已完成。我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,所以在調用get方式時,需要將超時時間設置為0。

 1 package com.concurrency.TaskExecution_6;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ExecutionException;
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 import java.util.concurrent.Future;
 9 
10 /**
11  * 傳統的繁瑣做法
12  * @ClassName: TraditionalTest
13  * TODO
14  * @author Xingle
15  * @date 2014-9-16 上午10:06:21
16  */
17 public class TraditionalTest {
18     
19     public static void main(String[] args){
20         int taskSize = 5;
21         ExecutorService executor = Executors.newFixedThreadPool(taskSize);
22         List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
23         
24         for(int i= 1; i<=taskSize; i++){
25             int sleep = taskSize -1;
26             int value = i;
27             //向線程池提交任務
28             Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value));
29             //保留每個任務的Future
30             futureList.add(future);
31         }
32         // 輪詢,獲取完成任務的返回結果
33         while(taskSize > 0){
34             for (Future<Integer> future : futureList){
35                 Integer result = null;
36                 try {
37                     result = future.get();
38                 } catch (InterruptedException e) {
39                     e.printStackTrace();
40                 } catch (ExecutionException e) {
41                     e.printStackTrace();
42                 } 
43                 //任務已經完成
44                 if(result!=null){
45                     System.out.println("result = "+result);
46                     //從future列表中刪除已經完成的任務
47                     futureList.remove(future);
48                     taskSize --;
49                     break;                    
50                 }
51             }
52         }
53         // 所有任務已經完成,關閉線程池  
54         System.out.println("all over ");
55         executor.shutdown();
56     }
57 
58 }

 

執行結果:

 

2.使用CompletionService

 1 package com.concurrency.TaskExecution_6;
 2 
 3 import java.util.concurrent.CompletionService;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorCompletionService;
 6 import java.util.concurrent.ExecutorService;
 7 import java.util.concurrent.Executors;
 8 
 9 /**
10  * 使用CompletionService
11  * @ClassName: CompletionServiceTest
12  * TODO
13  * @author Xingle
14  * @date 2014-9-16 上午11:32:45
15  */
16 public class CompletionServiceTest {
17     
18     public static void main(String[] args){
19         int taskSize = 5;
20         ExecutorService executor = Executors.newFixedThreadPool(taskSize);
21         // 構建完成服務 
22         CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
23         
24         for (int i=1;i<= taskSize; i++){
25             // 睡眠時間 
26             int sleep = taskSize - i;
27             // 返回結果  
28             int value = i;
29             //向線程池提交任務
30             completionService.submit(new ReturnAfterSleepCallable(sleep, value));
31             try {
32                 System.out.println("result:"+completionService.take().get());
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             } catch (ExecutionException e) {
36                 e.printStackTrace();
37             }
38         }
39         
40         System.out.println("all over. ");
41         executor.shutdown();
42         
43     }
44 
45 }

 

執行結果:

 

3.CompletionService和ExecutorCompletionService的實現

 JDK源碼中CompletionService的javadoc說明如下:

/**
 * A service that decouples the production of new asynchronous tasks
 * from the consumption of the results of completed tasks.  Producers
 * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>
 * completed tasks and process their results in the order they
 * complete. 
 */

 

也就是說,CompletionService實現了生產者提交任務和消費者獲取結果的解耦,生產者和消費者都不用關心任務的完成順序,由CompletionService來保證,消費者一定是按照任務完成的先后順序來獲取執行結果。

ExecutorCompletionService是CompletionService的實現,融合了線程池Executor和阻塞隊列BlockingQueue的功能。
 public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
到這里可以推測,按照任務的完成順序獲取結果,就是通過阻塞隊列實現的,阻塞隊列剛好具有這樣的性質:阻塞和有序。
 
ExecutorCompletionService任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture
public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
}

 

QueueingFuture是FutureTask的一個子類,通過改寫FutureTask類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

FutureTask.done(),這個方法默認什么都不做,就是一個回調,當提交的線程池中的任務完成時,會被自動調用。這也就說時候,當任務完成的時候,會自動執行QueueingFuture.done()方法,將返回結果加入到阻塞隊列中,加入的順序就是任務完成的先后順序。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM