1.Future
在執行多個任務的時候,使用Java標准庫提供的線程池是非常方便的。我們提交的任務只需要實現Runnable
接口,就可以讓線程池去執行:
class Task implements Runnable { public String result; public void run() { this.result = longTimeCalculation(); } }
Runnable
接口有個問題,它的方法沒有返回值。如果任務需要一個返回結果,那么只能保存到變量,還要提供額外的方法讀取,非常不便。所以,Java標准庫還提供了一個Callable
接口,和Runnable
接口比,它多了一個返回值:
class Task implements Callable<String> { public String call() throws Exception { return longTimeCalculation(); } }
並且Callable
接口是一個泛型接口,可以返回指定類型的結果。
現在的問題是,如何獲得異步執行的結果?
如果仔細看ExecutorService.submit()
方法,可以看到,它返回了一個Future
類型,一個Future
類型的實例代表一個未來能獲取結果的對象:
ExecutorService executor = Executors.newFixedThreadPool(4); // 定義任務: Callable<String> task = new Task(); // 提交任務並獲得Future: Future<String> future = executor.submit(task); // 從Future獲取異步執行返回的結果: String result = future.get(); // 可能阻塞
當我們提交一個Callable
任務后,我們會同時獲得一個Future
對象,然后,我們在主線程某個時刻調用Future
對象的get()
方法,就可以獲得異步執行的結果。在調用get()
時,如果異步任務已經完成,我們就直接獲得結果。如果異步任務還沒有完成,那么get()
會阻塞,直到任務完成后才返回結果。
一個Future<V>
接口表示一個未來可能會返回的結果,它定義的方法有:
get()
:獲取結果(可能會等待)get(long timeout, TimeUnit unit)
:獲取結果,但只等待指定的時間;cancel(boolean mayInterruptIfRunning)
:取消當前任務;isDone()
:判斷任務是否已完成。
小結
對線程池提交一個Callable
任務,可以獲得一個Future
對象;
可以用Future
在將來某個時刻獲取結果。
2.CompletableFuture
使用Future
獲得異步執行結果時,要么調用阻塞方法get()
,要么輪詢看isDone()
是否為true
,這兩種方法都不是很好,因為主線程也會被迫等待。
從Java 8開始引入了CompletableFuture
,它針對Future
做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法。
我們以獲取股票價格為例,看看如何使用CompletableFuture
:
import java.util.concurrent.CompletableFuture;
public class Main { public static void main(String[] args) throws Exception { // 創建異步執行任務: CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice); // 如果執行成功: cf.thenAccept((result) -> { System.out.println("price: " + result); }); // 如果執行異常: cf.exceptionally((e) -> { e.printStackTrace(); return null; }); // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉: Thread.sleep(200); } static Double fetchPrice() { try { Thread.sleep(100); } catch (InterruptedException e) { } if (Math.random() < 0.3) { throw new RuntimeException("fetch price failed!"); } return 5 + Math.random() * 20; } }
創建一個CompletableFuture
是通過CompletableFuture.supplyAsync()
實現的,它需要一個實現了Supplier
接口的對象:
public interface Supplier<T> { T get(); }
這里我們用lambda語法簡化了一下,直接傳入Main::fetchPrice
,因為Main.fetchPrice()
靜態方法的簽名符合Supplier
接口的定義(除了方法名外)。
緊接着,CompletableFuture
已經被提交給默認的線程池執行了,我們需要定義的是CompletableFuture
完成時和異常時需要回調的實例。完成時,CompletableFuture
會調用Consumer
對象:
public interface Consumer<T> { void accept(T t); }
異常時,CompletableFuture
會調用Function
對象:
public interface Function<T, R> { R apply(T t); }
這里我們都用lambda語法簡化了代碼。
可見CompletableFuture
的優點是:
- 異步任務結束時,會自動回調某個對象的方法;
- 異步任務出錯時,會自動回調某個對象的方法;
- 主線程設置好回調后,不再關心異步任務的執行。
如果只是實現了異步回調機制,我們還看不出CompletableFuture
相比Future
的優勢。CompletableFuture
更強大的功能是,多個CompletableFuture
可以串行執行,例如,定義兩個CompletableFuture
,第一個CompletableFuture
根據證券名稱查詢證券代碼,第二個CompletableFuture
根據證券代碼查詢證券價格,這兩個CompletableFuture
實現串行操作如下:
import java.util.concurrent.CompletableFuture;
public class Main { public static void main(String[] args) throws Exception { // 第一個任務: CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> { return queryCode("中國石油"); }); // cfQuery成功后繼續執行下一個任務: CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> { return fetchPrice(code); }); // cfFetch成功后打印結果: cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉: Thread.sleep(2000); } static String queryCode(String name) { try { Thread.sleep(100); } catch (InterruptedException e) { } return "601857"; } static Double fetchPrice(String code) { try { Thread.sleep(100); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
除了串行執行外,多個CompletableFuture
還可以並行執行。例如,我們考慮這樣的場景:
同時從新浪和網易查詢證券代碼,只要任意一個返回結果,就進行下一步查詢價格,查詢價格也同時從新浪和網易查詢,只要任意一個返回結果,就完成操作:
public class Main { public static void main(String[] args) throws Exception { // 兩個CompletableFuture執行異步查詢: CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> { return queryCode("中國石油", "https://finance.sina.com.cn/code/"); }); CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> { return queryCode("中國石油", "https://money.163.com/code/"); }); // 用anyOf合並為一個新的CompletableFuture: CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163); // 兩個CompletableFuture執行異步查詢: CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://finance.sina.com.cn/price/"); }); CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> { return fetchPrice((String) code, "https://money.163.com/price/"); }); // 用anyOf合並為一個新的CompletableFuture: CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163); // 最終結果: cfFetch.thenAccept((result) -> { System.out.println("price: " + result); }); // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉: Thread.sleep(200); } static String queryCode(String name, String url) { System.out.println("query code from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return "601857"; } static Double fetchPrice(String code, String url) { System.out.println("query price from " + url + "..."); try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { } return 5 + Math.random() * 20; } }
query code from https://money.163.com/code/... query code from https://finance.sina.com.cn/code/... query price from https://finance.sina.com.cn/price/... query price from https://money.163.com/price/... price: 6.214906451395034
上述邏輯實現的異步查詢規則實際上是:
┌─────────────┐ ┌─────────────┐
│ Query Code │ │ Query Code │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└───────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
┌───────┴────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Query Price │ │ Query Price │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└────────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
▼
┌─────────────┐
│Display Price│
└─────────────┘
除了anyOf()
可以實現“任意個CompletableFuture
只要一個成功”,allOf()
可以實現“所有CompletableFuture
都必須成功”,這些組合操作可以實現非常復雜的異步流程控制。
最后我們注意CompletableFuture
的命名規則:
xxx()
:表示該方法將繼續在已有的線程中執行;xxxAsync()
:表示將異步在線程池中執行。
小結
CompletableFuture
可以指定異步處理流程:
thenAccept()
處理正常結果;exceptional()
處理異常結果;thenApplyAsync()
用於串行化另一個CompletableFuture
;anyOf()
和allOf()
用於並行化多個CompletableFuture
。