java中CompletableFuture的使用
之前的文章中,我們講解了Future, 本文我們將會繼續講解java 8中引入的CompletableFuture的用法。
CompletableFuture首先是一個Future,它擁有Future所有的功能,包括獲取異步執行結果,取消正在執行的任務等。
除此之外,CompletableFuture還是一個CompletionStage。
我們看下CompletableFuture的定義:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
什么是CompletionStage呢?
在異步程序中,如果將每次的異步執行都看成是一個stage的話,我們通常很難控制異步程序的執行順序,在javascript中,我們需要在回調中執行回調。這就會形成傳說中的回調地獄。
好在在ES6中引入了promise的概念,可以將回調中的回調轉寫為鏈式調用,從而大大的提升了程序的可讀性和可寫性。
同樣的在java中,我們使用CompletionStage來實現異步調用的鏈式操作。
CompletionStage定義了一系列的then*** 操作來實現這一功能。
CompletableFuture作為Future使用
調用CompletableFuture.complete方法可以立馬返回結果,我們看下怎么使用這個方法來構建一個基本的Future:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture
= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
上面我們通過調動ExecutorService來提交一個任務從而得到一個Future。如果你知道執行的結果,那么可以使用CompletableFuture的completedFuture方法來直接返回一個Future。
public Future<String> useCompletableFuture(){
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
return completableFuture;
}
CompletableFuture還提供了一個cancel方法來立馬取消任務的執行:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
如果這個時候調用Future的get方法,將會報CancellationException異常。
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
異步執行code
CompletableFuture提供了runAsync和supplyAsync的方法,可以以異步的方式執行代碼。
我們看一個runAsync的基本應用,接收一個Runnable參數:
public void runAsync(){
CompletableFuture<Void> runAsync= CompletableFuture.runAsync(()->{
log.info("runAsync");
});
}
而supplyAsync接受一個Supplier:
public void supplyAsync(){
CompletableFuture<String> supplyAsync=CompletableFuture.supplyAsync(()->{
return "supplyAsync";
});
}
他們兩個的區別是一個沒有返回值,一個有返回值。
組合Futures
上面講到CompletableFuture的一個重大作用就是將回調改為鏈式調用,從而將Futures組合起來。
而鏈式調用的返回值還是CompletableFuture,我們看一個thenCompose的例子:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
thenCompose將前一個Future的返回結果作為后一個操作的輸入。
如果我們想合並兩個CompletableFuture的結果,則可以使用thenCombine:
public void thenCombine(){
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
}
如果你不想返回結果,則可以使用thenAcceptBoth:
public void thenAcceptBoth(){
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
}
thenApply() 和 thenCompose()的區別
thenApply()和thenCompose()兩個方法都可以將CompletableFuture連接起來,但是兩個有點不一樣。
thenApply()接收的是前一個調用返回的結果,然后對該結果進行處理。
thenCompose()接收的是前一個調用的stage,返回flat之后的的CompletableFuture。
簡單點比較,兩者就像是map和flatMap的區別。
並行執行任務
當我們需要並行執行任務時,通常我們需要等待所有的任務都執行完畢再去處理其他的任務,那么我們可以用到CompletableFuture.allOf方法:
public void allOf(){
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
}
allOf只保證task全都執行,而並沒有返回值,如果希望帶有返回值,我們可以使用join:
public void join(){
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
}
上面的程序將會返回:“Hello Beautiful World”。
異常處理
如果在鏈式調用的時候拋出異常,則可以在最后使用handle來接收:
public void handleError(){
String name = null;
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
}
這和Promise中的catch方法使用類似。
本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/CompletableFuture
更多教程請參考 flydean的博客