CompletableFuture實戰
簡介
在Java8中,CompletableFuture提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,並且提供了函數式編程的能力,可以通過回調的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法。
Java中的異步計算
異步計算很難推理。通常,我們希望將任何計算都視為一系列步驟,但是在異步計算的情況下,以回調表示的動作往往分散在代碼中或彼此深深地嵌套在一起。當我們需要處理其中一個步驟中可能發生的錯誤時,情況變得更加糟糕。 Future是Java 5中添加作為異步計算的結果,但它沒有任何方法處理計算可能出現的錯誤。
Java 8引入了CompletableFuture類。除Future接口外,它還實現了CompletionStage接口。該接口為異步計算步驟定義了合同,我們可以將其與其他步驟結合使用。
將CompletableFuture用作簡單的Future
首先,CompletableFuture類實現了Future接口,因此我們可以將其用作將來的實現,但需要附加完成邏輯。 例如,我們可以用一個無參數構造函數創建這個類的實例來表示Future的結果,將它分發給使用者,並在將來的某個時候使用complete方法完成它。使用者可以使用get方法阻塞當前線程,直到提供此結果。在下面的示例中,我們有一個方法,它創建一個CompletableFuture實例,然后在另一個線程中派生一些計算,並立即返回Future。計算完成后,該方法通過向完整方法提供結果來完成Future:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
我們使用executorapi。這種創建和完成CompletableFuture的方法可以與任何並發機制或API(包括原始線程)一起使用。請注意,calculateAsync方法將返回一個Future的實例。我們只需調用該方法,接收Future實例,並在准備阻塞結果時對其調用get方法。
還要注意get方法拋出一些已檢查的異常,即ExecutionException(封裝計算期間發生的異常)和interruptedeexception(表示執行方法的線程被中斷的異常):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
如果我們已經知道計算的結果,我們可以使用靜態completedFuture方法,並使用一個參數,該參數表示此計算的結果。因此,將來的get方法永遠不會阻塞,立即返回此結果,而不是:
Future<String> completableFuture =
CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
封裝計算邏輯的CompletableFuture
上面的代碼允許我們選擇任何並發執行的機制,但是如果我們想跳過這個樣板文件,簡單地異步執行一些代碼呢?
靜態方法runAsync和supplyAsync允許我們相應地使用Runnable和Supplier函數類型創建一個可完成的未來實例。
Runnable和Supplier都是函數接口,由於新的java8特性,它們允許將實例作為lambda表達式傳遞。
Runnable接口與線程中使用的舊接口相同,不允許返回值。
Supplier接口是一個通用函數接口,它有一個方法,該方法沒有參數,並且返回一個參數化類型的值。
這允許我們提供一個供應商實例作為lambda表達式來執行計算並返回結果。簡單到:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
異步計算的處理結果
處理計算結果的最通用的方法是將其提供給函數。thenApply方法正是這樣做的;它接受一個函數實例,用它來處理結果,並返回一個包含函數返回值的Future:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
如果我們不需要在Future中返回值,我們可以使用Consumer函數接口的實例。它的單個方法接受一個參數並返回void。
在可完成的將來,有一種方法可以解決這個用例。thenAccept方法接收使用者並將計算結果傳遞給它。最后一個future.get()調用返回Void類型的實例:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最后,如果我們既不需要計算的值,也不想返回值,那么我們可以將一個可運行的lambda傳遞給thenRun方法。在下面的示例中,我們只需在調用future.get()后在控制台中打印一行:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
組合CompletableFuture
CompletableFuture API最好的部分是能夠在一系列計算步驟中組合CompletableFuture實例。
這種鏈接的結果本身就是一個完整的Future,允許進一步的鏈接和組合。這種方法在函數語言中普遍存在,通常被稱為享元模式。
在下面的示例中,我們使用thenCompose方法按順序鏈接兩個Future。
請注意,此方法接受一個返回CompletableFuture實例的函數。此函數的參數是上一計算步驟的結果。這允許我們在下一個CompletableFuture的lambda中使用此值:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose方法與thenApply一起實現了享元模式的基本構建塊。它們與流的map和flatMap方法以及java8中的可選類密切相關。
兩個方法都接收一個函數並將其應用於計算結果,但是thencomose(flatMap)方法接收一個返回另一個相同類型對象的函數。這種功能結構允許將這些類的實例組合為構建塊。
如果我們想執行兩個獨立的未來,並對它們的結果進行處理,我們可以使用thenCombine方法,該方法接受一個未來和一個具有兩個參數的函數來處理這兩個結果:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
一個簡單的例子是,當我們想處理兩個CompletableFuture的結果時,但不需要將任何結果值傳遞給CompletableFuture的鏈。thenAcceptBoth方法可以幫助:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
thenApply()和thenCompose()方法之間的區別
在前面的部分中,我們展示了有關thenApply()和thenCompose()的示例。兩個api都有助於鏈接不同的CompletableFuture調用,但這兩個函數的用法不同。
thenApply()
我們可以使用此方法處理上一次調用的結果。但是,需要記住的一點是,返回類型將由所有調用組合而成。
因此,當我們要轉換CompletableFuture調用的結果時,此方法非常有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
thenCompose()
thenCompose()方法與thenApply()類似,因為兩者都返回一個新的完成階段。但是,thencose()使用前一階段作為參數。它將展平並直接返回一個帶有結果的CompletableFuture,而不是我們在thenApply()中觀察到的嵌套CompletableFuture:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果要鏈接可完成的CompletableFuture方法,那么最好使用thenCompose()。
另外,請注意,這兩個方法之間的差異類似於map()和flatMap()之間的差異。
並行運行多個CompletableFuture
當我們需要並行執行多個期貨時,我們通常希望等待所有Supplier執行,然后處理它們的組合結果。
CompletableFuture.allOf靜態方法允許等待的所有Supplier的完成:
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
注意CompletableFuture.allOf()的返回類型是CompletableFuture
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
join()方法類似於get方法,但是如果Future不能正常完成,它會拋出一個未檢查的異常。這樣就可以將其用作Stream.map()方法中的方法引用。
異步方法
CompletableFuture類中fluentapi的大多數方法都有另外兩個帶有異步后綴的變體。這些方法通常用於在另一個線程中運行相應的執行步驟。
沒有異步后綴的方法使用調用線程運行下一個執行階段。相反,不帶Executor參數的Async方法使用Executor的公共fork/join池實現運行一個步驟,該實現通過ForkJoinPool.commonPool()方法訪問。最后,帶有Executor參數的Async方法使用傳遞的Executor運行步驟。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
END
歡迎關注公眾號! 公眾號回復:
入群
,掃碼加入我們交流群!