初識Callable and Future
在編碼時,我們可以通過繼承Thread或是實現Runnable接口來創建線程,但是這兩種方式都存在一個缺陷:在執行完任務之后無法獲取執行結果。如果需要獲取執行結果,就必須通過共享變量或者使用線程通信的方式來達到目的。Java5提供了Callable和Future,通過它們可以在任務執行完畢之后得到任務執行結果。
Callable and Future源碼:
(1)Callable接口:
public interface Callable<V> { V call() throws Exception; }
(2)Future接口:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
源碼解說:
Callable位於java.util.concurrent包下,它是一個接口,在它里面只聲明了一個call()方法。從上面的源碼可以看到,Callable是一個泛型接口,call()函數返回的類型就是傳遞進來的泛型實參類型。
Future類位於java.util.concurrent包下,Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果,其cancel()方法的參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置為true,則表示可以取消正在執行過程中的任務;get()方法用來獲取執行結果,該方法會阻塞直到任務返回結果。
Callable and Future示例:
(1)下面的示例是一個Callable,它會采用最明顯的方式查找數組的一個分段中的最大值。
import java.util.concurrent.Callable; class FindMaxTask implements Callable<Integer> { private int[] data; private int start; private int end; FindMaxTask(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public Integer call() { int max = Integer.MIN_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) max = data[i]; } return max; } }
(2)將Callable對象提交給一個Executor,它會為每個Callable對象創建一個線程,如下代碼段所示:
import java.util.concurrent.*; public class MultithreadedMaxFinder { public static int max(int[] data) throws InterruptedException, ExecutionException { if (data.length == 1) { return data[0]; } else if (data.length == 0) { throw new IllegalArgumentException(); } // split the job into 2 pieces FindMaxTask task1 = new FindMaxTask(data, 0, data.length/2); FindMaxTask task2 = new FindMaxTask(data, data.length/2, data.length); // spawn 2 threads ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future1 = service.submit(task1); Future<Integer> future2 = service.submit(task2); return Math.max(future1.get(), future2.get()); } }
補充:
ExecutorService接口中聲明了若干個不同形式的submit()方法,各個方法的返回類型為Future類型,如下:
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
初識FutureTask
因為Future只是一個接口,所以是無法直接用來創建對象來使用的,因此就有了下面的FutureTask,FutureTask目前是Future接口的一個唯一實現類。在Java並發程序中FutureTask表示一個可以取消的異步運算。它有啟動和取消運算、查詢運算是否完成和取回運算結果等方法。只有當運算完成的時候結果才能取回,如果運算尚未完成get方法將會阻塞。
FutureTask實現了RunnableFuture接口,其聲明如下:
public class FutureTask<V> implements RunnableFuture<V>
RunnableFuture接口定義如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
解說:
因RunnableFuture接口繼承Runnable接口和Future接口,FutureTask實現了RunnableFuture接口,所以FutureTask既可以作為Runnable被線程執行(Thread接收Runnable類型的參數),又可以提交給Executor來執行以得到返回值(ExecutorService.submit(Runnable task))。
FutureTask構造函數:
FutureTask的構造函數接收不同形式的參數,如下:
public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { }
FutureTask示例
觀察下述兩個示例代碼中FutureTask的使用方式
示例一:
FutureTask將被作為Runnable被線程執行
(1)任務線程ThreadC:
package demo.thread; import java.util.concurrent.Callable; //實現Callable接口,call()方法可以有返回結果 public class ThreadC implements Callable<String> { @Override public String call() throws Exception { try {//模擬任務,執行了500毫秒; Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } return "thread B"; } }
(2)主線程ThreadMain:
package demo.thread; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class ThreadMain { public static void main(String[] args) { ThreadC threadc = new ThreadC(); FutureTask<String> faeature = new FutureTask<String>(threadc); new Thread(faeature).start();//注意啟動方式,FutureTask將被作為Runnable被線程執行 System.out.println("這是主線程;begin!"); //注意細細體會這個,只有主線程get了,主線程才會繼續往下執行 try { System.out.println("得到的返回結果是:"+faeature.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("這是主線程;end!"); } }
示例二:
FutureTask被提交給Executor執行以得到返回值
public class Test { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask);//FutureTask被提交給Executor執行以得到返回值 executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主線程在執行任務"); try { System.out.println("task運行結果"+futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任務執行完畢"); } } class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { Thread.sleep(3*1000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } }
FutureTask原理
前面提到了一條重要信息:ExecutorService接口中的submit()方法可以接收callable、Runnable類型的參數,方法的返回類型為Future類型。
ExecutorService的submit()方法的內部實現是根據參數構建了FutureTask對象,然后將FutureTask對象轉為Future類型返回,這也對應了下面這一條信息:
FutureTask間接繼承了Future接口,其構造函數可以接收callable、Runnable類型的參數。
仔細想一想,其實這個內部實現使用了適配器模式,使得不同接口的實現最終對外表現為一致
CompletionService
ExecutorCompletionService
ExecutorCompletionService實現了CompletionService,融合了線程池Executor和阻塞隊列BlockingQueue的功能,將計算部分委托給一個Executor。
(1)構造函數:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
(2)任務提交:
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
從上述submit()方法可以看出,當提交某個任務時,該任務首先將被包裝為一個QueueingFuture
(3)QueueingFuture源碼:
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
參考資料:
(1)http://ifeve.com/futuretask-source/
(2)http://www.tuicool.com/articles/umyy6b