我們現在在Java中使用多線程通常不會直接用Thread對象了,而是會用到java.util.concurrent包下的ExecutorService類來初始化一個線程池供我們使用。
之前我一直習慣自己維護一個list保存submit的callable task所返回的Future對象。
在主線程中遍歷這個list並調用Future的get()方法取到Task的返回值。
public class CompletionServiceTest {
static class Task implements Callable<String>{
private int i;
public Task(int i){
this.i = i;
}
@Override
public String call() throws Exception {
Thread.sleep(10000);
return Thread.currentThread().getName() + "執行完任務:" + i;
}
}
public static void main(String[] args){
testUseFuture();
}
private static void testUseFuture(){
int numThread = 5;
ExecutorService executor = Executors.newFixedThreadPool(numThread);
List<Future<String>> futureList = new ArrayList<Future<String>>();
for(int i = 0;i<numThread;i++ ){
Future<String> future = executor.submit(new CompletionServiceTest.Task(i));
futureList.add(future);
}
while(numThread > 0){
for(Future<String> future : futureList){
String result = null;
try {
result = future.get(0, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
//超時異常直接忽略
}
if(null != result){
futureList.remove(future);
numThread--;
System.out.println(result);
//此處必須break,否則會拋出並發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)
break;
}
}
}
}
}
但是,我在很多地方會看到一些代碼通過CompletionService包裝ExecutorService,然后調用其take()方法去取Future對象。
public class CompletionServiceTest {
static class Task implements Callable<String>{
private int i;
public Task(int i){
this.i = i;
}
@Override
public String call() throws Exception {
Thread.sleep(10000);
return Thread.currentThread().getName() + "執行完任務:" + i;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException{
testExecutorCompletionService();
}
private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{
int numThread = 5;
ExecutorService executor = Executors.newFixedThreadPool(numThread);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
for(int i = 0;i<numThread;i++ ){
completionService.submit(new CompletionServiceTest.Task(i));
}
}
for(int i = 0;i<numThread;i++ ){
System.out.println(completionService.take().get());
}
}
以前沒研究過這兩者之間的區別。今天看了源代碼之后就明白了。
這兩者最主要的區別在於submit的task不一定是按照加入自己維護的list順序完成的。
從list中遍歷的每個Future對象並不一定處於完成狀態,這時調用get()方法就會被阻塞住,如果系統是設計成每個線程完成后就能根據其結果繼續做后面的事,這樣對於處於list后面的但是先完成的線程就會增加了額外的等待時間。
而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。
所以,先完成的必定先被取出。這樣就減少了不必要的等待時間
