我們現在在Java中使用多線程通常不會直接用Thread對象了,而是會用到java.util.concurrent包下的ExecutorService類來初始化一個線程池供我們使用。
之前我一直習慣自己維護一個list保存submit的callable task所返回的Future對象。
在主線程中遍歷這個list並調用Future的get()方法取到Task的返回值。
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args){ testUseFuture(); } private static void testUseFuture(){ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); List<Future<String>> futureList = new ArrayList<Future<String>>(); for(int i = 0;i<numThread;i++ ){ Future<String> future = executor.submit(new CompletionServiceTest.Task(i)); futureList.add(future); } while(numThread > 0){ for(Future<String> future : futureList){ String result = null; try { result = future.get(0, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { //超時異常直接忽略 } if(null != result){ futureList.remove(future); numThread--; System.out.println(result); //此處必須break,否則會拋出並發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決) break; } } } } }
但是,我在很多地方會看到一些代碼通過CompletionService包裝ExecutorService,然后調用其take()方法去取Future對象。
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args) throws InterruptedException, ExecutionException{ testExecutorCompletionService(); } private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); for(int i = 0;i<numThread;i++ ){ completionService.submit(new CompletionServiceTest.Task(i)); } } for(int i = 0;i<numThread;i++ ){ System.out.println(completionService.take().get()); } }
以前沒研究過這兩者之間的區別。今天看了源代碼之后就明白了。
這兩者最主要的區別在於submit的task不一定是按照加入自己維護的list順序完成的。
從list中遍歷的每個Future對象並不一定處於完成狀態,這時調用get()方法就會被阻塞住,如果系統是設計成每個線程完成后就能根據其結果繼續做后面的事,這樣對於處於list后面的但是先完成的線程就會增加了額外的等待時間。
而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。
所以,先完成的必定先被取出。這樣就減少了不必要的等待時間