為了防止無良網站的爬蟲抓取文章,特此標識,轉載請注明文章出處。LaplaceDemon/ShiJiaqi。
https://www.cnblogs.com/shijiaqi1066/p/10454237.html
CompletionService是Java8的新增接口,JDK為其提供了一個實現類ExecutorCompletionService。這個類是為線程池中Task的執行結果服務的,即為Executor中Task返回Future而服務的。CompletionService的實現目標是任務先完成可優先獲取到,即結果按照完成先后順序排序。
CompletionService的使用非常簡單。從源碼查看ExecutorCompletionService類,該類只有三個成員變量:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
...
}
可以看到ExecutorCompletionService主要是增強executor線程池的。Task包裝后被塞入completionQueue,當Task結束,其Future就可以從completionQueue中獲取到。
其基本原理可以參看下圖:
CompletionService接口源碼:
public interface CompletionService<V> {
// 提交
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
// 獲取
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
例:向CompletionService中提交10個Task,當Task有任務返回則會優先從CompletionService內部的隊列中獲取到Task的Future。
package test;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestCompletionService {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//開啟3個線程
ExecutorService exs = Executors.newFixedThreadPool(5);
try {
int taskCount = 10;
// 結果集
List<Integer> list = new ArrayList<Integer>();
List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
// 1.定義CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);
// 2.添加任務
for(int i=0;i<taskCount;i++){
Future<Integer> future = completionService.submit(new Task(i+1));
futureList.add(future);
}
// 3.獲取結果
for(int i=0;i<taskCount;i++){
Integer result = completionService.take().get();
System.out.println("任務i=="+result+"完成!"+new Date());
list.add(result);
}
System.out.println("list="+list);
} catch (Exception e) {
e.printStackTrace();
} finally {
//關閉線程池
exs.shutdown();
}
}
static class Task implements Callable<Integer>{
Integer i;
public Task(Integer i) {
super();
this.i=i;
}
@Override
public Integer call() throws Exception {
if(i==5) {
Thread.sleep(5000);
}else{
Thread.sleep(1000);
}
System.out.println("線程:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!");
return i;
}
}
}