Java並發編程實踐 目錄
並發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier
並發編程 06—— CompletionService : Executor 和 BlockingQueue
並發編程 10—— 任務取消 之 關閉 ExecutorService
並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性
並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
概述
第1部分 問題引入
《Java並發編程實踐》一書6.3.5節CompletionService:Executor和BlockingQueue,有這樣一段話:
"如果向Executor提交了一組計算任務,並且希望在計算完成后獲得結果,那么可以保留與每個任務關聯的Future,然后反復使用get方法,同時將參數timeout指定為0,從而通過輪詢來判斷任務是否完成。這種方法雖然可行,但卻有些繁瑣。幸運的是,還有一種更好的方法:完成服務CompletionService。"
這是什么意思呢?通過一個例子,分別使用繁瑣的做法和CompletionService來完成,清晰的對比能讓我們更好的理解上面的一段話和CompletionService這個API提供的初衷。
第2部分 實例
考慮這樣的場景,有5個Callable任務分別返回5個整數,然后我們在main方法中按照各個任務完成的先后順序,在控制台打印返回結果。
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.TimeUnit; 5 6 /** 7 * 8 * @ClassName: ReturnAfterSleepCallable 9 * TODO 10 * @author Xingle 11 * @date 2014-9-16 上午9:20:34 12 */ 13 public class ReturnAfterSleepCallable implements Callable<Integer>{ 14 15 private int sleepSeconds; 16 private int returnValue; 17 18 public ReturnAfterSleepCallable(int sleepSeconds,int returnValue){ 19 this.sleepSeconds = sleepSeconds; 20 this.returnValue = returnValue; 21 } 22 23 24 @Override 25 public Integer call() throws Exception { 26 System.out.println("begin to execute "); 27 28 TimeUnit.SECONDS.sleep(sleepSeconds); 29 return returnValue; 30 } 31 32 }
1.繁瑣的做法
通過一個List來保存每個任務返回的Future,然后輪詢這些Future,直到每個Future都已完成。我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,所以在調用get方式時,需要將超時時間設置為0。
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ExecutionException; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Future; 9 10 /** 11 * 傳統的繁瑣做法 12 * @ClassName: TraditionalTest 13 * TODO 14 * @author Xingle 15 * @date 2014-9-16 上午10:06:21 16 */ 17 public class TraditionalTest { 18 19 public static void main(String[] args){ 20 int taskSize = 5; 21 ExecutorService executor = Executors.newFixedThreadPool(taskSize); 22 List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); 23 24 for(int i= 1; i<=taskSize; i++){ 25 int sleep = taskSize -1; 26 int value = i; 27 //向線程池提交任務 28 Future<Integer> future = executor.submit(new ReturnAfterSleepCallable(sleep, value)); 29 //保留每個任務的Future 30 futureList.add(future); 31 } 32 // 輪詢,獲取完成任務的返回結果 33 while(taskSize > 0){ 34 for (Future<Integer> future : futureList){ 35 Integer result = null; 36 try { 37 result = future.get(); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } catch (ExecutionException e) { 41 e.printStackTrace(); 42 } 43 //任務已經完成 44 if(result!=null){ 45 System.out.println("result = "+result); 46 //從future列表中刪除已經完成的任務 47 futureList.remove(future); 48 taskSize --; 49 break; 50 } 51 } 52 } 53 // 所有任務已經完成,關閉線程池 54 System.out.println("all over "); 55 executor.shutdown(); 56 } 57 58 }
執行結果:

2.使用CompletionService
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.concurrent.CompletionService; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.ExecutorCompletionService; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 9 /** 10 * 使用CompletionService 11 * @ClassName: CompletionServiceTest 12 * TODO 13 * @author Xingle 14 * @date 2014-9-16 上午11:32:45 15 */ 16 public class CompletionServiceTest { 17 18 public static void main(String[] args){ 19 int taskSize = 5; 20 ExecutorService executor = Executors.newFixedThreadPool(taskSize); 21 // 構建完成服務 22 CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor); 23 24 for (int i=1;i<= taskSize; i++){ 25 // 睡眠時間 26 int sleep = taskSize - i; 27 // 返回結果 28 int value = i; 29 //向線程池提交任務 30 completionService.submit(new ReturnAfterSleepCallable(sleep, value)); 31 try { 32 System.out.println("result:"+completionService.take().get()); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } catch (ExecutionException e) { 36 e.printStackTrace(); 37 } 38 } 39 40 System.out.println("all over. "); 41 executor.shutdown(); 42 43 } 44 45 }
執行結果:

3.CompletionService和ExecutorCompletionService的實現
JDK源碼中CompletionService的javadoc說明如下:
/** * A service that decouples the production of new asynchronous tasks * from the consumption of the results of completed tasks. Producers * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt> * completed tasks and process their results in the order they * complete. */
ExecutorCompletionService是CompletionService的實現,融合了線程池Executor和阻塞隊列BlockingQueue的功能。
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
QueueingFuture是FutureTask的一個子類,通過改寫FutureTask類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
FutureTask.done(),這個方法默認什么都不做,就是一個回調,當提交的線程池中的任務完成時,會被自動調用。這也就說時候,當任務完成的時候,會自動執行QueueingFuture.done()方法,將返回結果加入到阻塞隊列中,加入的順序就是任務完成的先后順序。
