在我們日常使用線程池的時候,經常會有需要獲得線程處理結果的時候。此時我們通常有兩種做法。
1. 使用並發容器將callable.call() 的返回Future存儲起來。然后使用一個消費者線程去遍歷這個並發容器,調用Future.isDone()去判斷各個任務是否處理完畢。然后再處理響應的業務。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; public class ExecutorResultManager { public static void main(String[] args) { // 隊列 BlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>(); // 生產者 new Thread() { @Override public void run() { ExecutorService pool = Executors.newCachedThreadPool(); for (int i=0; i< 10; i++) { int index = i; Future<String> submit = pool.submit(new Callable<String>() { @Override public String call() throws Exception { return "task done" + index; } }); try { futures.put(submit); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); // 消費者 new Thread() { @Override public void run() { while(true) { for (Future<String> future : futures) { if(future.isDone()) { // 處理業務 // ............. }; } } } }.start(); } }
2. 使用jdk 自帶線程池結果管理器:ExecutorCompletionService。它將BlockingQueue 和Executor 封裝起來。然后使用ExecutorCompletionService.submit()方法提交任務。
submit 方法如下:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); // RunnableFuture封裝了任務,使得任務既能run 也能get() RunnableFuture<V> f = newTaskFor(task); // 使用一個繼承Runnable類的QueueingFutue 再次封裝了我們的任務 executor.execute(new QueueingFuture(f)); return f; }
我們再來看看QueueingFuture:
// 繼承自FutureTask, FutureTask 也是一個Runnable的子類 private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } // 實現了FutureTask 的done 方法,在任務處理完畢或者拋異常后將封裝成Future的任務加入到隊列。這樣我們就能在隊列中取到處處理完的任務,並通過Future.get()方法去取得處理完后的結果。不用自己去判斷任務是否處理完畢了 protected void done() { completionQueue.add(task); } private final Future<V> task; }
簡單實現:
import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorCompletionServiceManager { public static void main(String[] args) { ExecutorCompletionService<String> service = new ExecutorCompletionService<String>( Executors.newCachedThreadPool()); // 生產者 new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { int index = i; service.submit(new Callable<String>() { @Override public String call() throws Exception { return "task done" + index; } }); } } }.start(); // 消費者 new Thread() { @Override public void run() { try { Future<String> take = service.take(); // do some thing........ } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); } }
相對於原始的自造輪子處理方式,jdk 自帶的工具類處理方式顯得優雅了許多。