如果你向Executor提交了一個批處理任務,並且希望在它們完成后獲得結果,怎么辦呢?
為此你可以保存與每個任務相關聯的Future,然后不斷地調用 timeout為零的get,來檢驗Future是否完成。這樣做固然可以,但卻相當乏味。幸運的是,還有一個更好的方法:完成服務 (Completion service)。
CompletionService整合了Executor和BlockingQueue的功能。
你可以將Callable任務提交給它去執行,然 后使用類似於隊列中的take和poll方法,在結果完整可用時獲得這個結果,像一個打包的Future。 ExecutorCompletionService是實現CompletionService接口的一個類,並將計算任務委托給一個Executor。
ExecutorCompletionService的實現相當直觀。它在構造函數中創建一個BlockingQueue,用它去保持完成的結果。 計算完成時會調用FutureTask中的done方法。
當提交一個任務后,首先把這個任務包裝為一個QueueingFuture,它是 FutureTask的一個子類,然后覆寫done方法,將結果置入BlockingQueue中,take和poll方法委托給了 BlockingQueue,它會在結果不可用時阻塞。
直接看demo:
package javademo;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
/***
* 兩鍾方式出來線程運行結果
* @author think
*
*/
public class CompletionServiceTest {
public static void main(String[] args) throws Exception {
CompletionServiceTest cst = new CompletionServiceTest();
cst.count1();
cst.count2();
}
/***
* 使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理
* @throws Exception
*/
public void count1() throws Exception{
ExecutorService exec = Executors.newCachedThreadPool();
BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
for(int i=0; i<10; i++){
Future<Integer> future =exec.submit(getTask());
queue.add(future);
}
int sum = 0;
int queueSize = queue.size();
for(int i=0; i<queueSize; i++){
sum += queue.take().get();
}
System.out.println("總數為:"+sum);
exec.shutdown();
}
/***
* 使用CompletionService(完成服務)保持Executor處理的結果
* @throws InterruptedException
* @throws ExecutionException
*/
public void count2() throws InterruptedException, ExecutionException{
ExecutorService exec = Executors.newCachedThreadPool();
CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
for(int i=0; i<10; i++){
execcomp.submit(getTask());
}
int sum = 0;
for(int i=0; i<10; i++){
//檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。
Future<Integer> future = execcomp.take();
sum += future.get();
}
System.out.println("總數為:"+sum);
exec.shutdown();
}
/**
* 得到一個任務
* @return Callable
*/
public Callable<Integer> getTask(){
final Random rand = new Random();
Callable<Integer> task = new Callable<Integer>(){
@Override
public Integer call() throws Exception {
int i = rand.nextInt(10);
int j = rand.nextInt(10);
int sum = i*j;
System.out.print(sum+"\t");
return sum;
}
};
return task;
}
}
- import java.util.Random;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.CompletionService;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorCompletionService;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.LinkedBlockingQueue;
- public class Test17 {
- public static void main(String[] args) throws Exception {
- Test17 t = new Test17();
- t.count1();
- t.count2();
- }
- //使用阻塞容器保存每次Executor處理的結果,在后面進行統一處理
- public void count1() throws Exception{
- ExecutorService exec = Executors.newCachedThreadPool();
- BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
- for(int i=0; i<10; i++){
- Future<Integer> future =exec.submit(getTask());
- queue.add(future);
- }
- int sum = 0;
- int queueSize = queue.size();
- for(int i=0; i<queueSize; i++){
- sum += queue.take().get();
- }
- System.out.println("總數為:"+sum);
- exec.shutdown();
- }
- //使用CompletionService(完成服務)保持Executor處理的結果
- public void count2() throws InterruptedException, ExecutionException{
- ExecutorService exec = Executors.newCachedThreadPool();
- CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
- for(int i=0; i<10; i++){
- execcomp.submit(getTask());
- }
- int sum = 0;
- for(int i=0; i<10; i++){
- //檢索並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。
- Future<Integer> future = execcomp.take();
- sum += future.get();
- }
- System.out.println("總數為:"+sum);
- exec.shutdown();
- }
- //得到一個任務
- public Callable<Integer> getTask(){
- final Random rand = new Random();
- Callable<Integer> task = new Callable<Integer>(){
- @Override
- public Integer call() throws Exception {
- int i = rand.nextInt(10);
- int j = rand.nextInt(10);
- int sum = i*j;
- System.out.print(sum+"\t");
- return sum;
- }
- };
- return task;
- }
- /**
- * 執行結果:
- 6 6 14 40 40 0 4 7 0 0 總數為:106
- 12 6 12 54 81 18 14 35 45 35 總數為:312
- */
- }