CompletableFuture 使用介紹


來自:https://www.cnblogs.com/hansc-blog/p/10645748.html

一、幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。

上面的代碼確實沒什么用,下面介紹幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier supplier);
CompletableFuture.supplyAsync(Supplier supplier, Executor executor)
runAsync 方法接收的是 Runnable 的實例,意味着它沒有返回值
supplyAsync 方法對應的是有返回值的情況
這兩個方法的帶 executor 的變種,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在 ForkJoinPool.commonPool() 線程池中執行的。
好的,現在我們已經有了第一個 CompletableFuture 實例了,我們來看接下來的內容

二、任務執行順序

我們先來看執行兩個任務的情況,首先執行任務 A,然后將任務 A 的結果傳遞給任務 B。

其實這里有很多種情況,任務 A 是否有返回值,任務 B 是否需要任務 A 的返回值,任務 B 是否有返回值,等等。有個明確的就是,肯定是任務 A 執行完后再執行任務 B。

我們用下面的 6 行代碼來說:

CompletableFuture.runAsync(() -> {}).thenRun(() -> {});
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {});
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");

CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
前面 3 行代碼演示的是,任務 A 無返回值,所以對應的,第 2 行和第 3 行代碼中,resultA 其實是 null。

第 4 行用的是 thenRun(Runnable runnable),任務 A 執行完執行 B,並且 B 不需要 A 的結果。

第 5 行用的是 thenAccept(Consumer action),任務 A 執行完執行 B,B 需要 A 的結果,但是任務 B 不返回值。

第 6 行用的是 thenApply(Function fn),任務 A 執行完執行 B,B 需要 A 的結果,同時任務 B 有返回值。

這一小節說完了,如果任務 B 后面還有任務 C,往下繼續調用 .thenXxx() 即可。

三、異常處理

說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個方法:

public CompletableFuture exceptionally(Function<Throwable, ? extends T> fn);
public CompletionStage handle(BiFunction<? super T, Throwable, ? extends U> fn);
看下面的代碼:

CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");
上面的代碼中,任務 A、B、C、D 依次執行,如果任務 A 拋出異常(當然上面的代碼不會拋出異常),那么后面的任務都得不到執行。如果任務 C 拋出異常,那么任務 D 得不到執行。

那么我們怎么處理異常呢?看下面的代碼,我們在任務 A 中拋出異常,並對其進行處理:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
})
.exceptionally(ex -> "errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
.thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());
上面的代碼中,任務 A 拋出異常,然后通過 .exceptionally() 方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。所以最終的輸出結果是:

errorResultA resultB resultC resultD
再看下面的代碼,我們來看下另一種處理方式,使用 handle(BiFunction fn) 來處理異常:

CompletableFuture future = CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
// 任務 C 拋出異常
.thenApply(resultB -> {throw new RuntimeException();})
// 處理任務 C 的返回值或異常
.handle(new BiFunction<Object, Throwable, Object>() {
@Override
public Object apply(Object re, Throwable throwable) {
if (throwable != null) {
return "errorResultC";
}
return re;
}
})
.thenApply(resultC -> resultC + " resultD");

System.out.println(future.join());
上面的代碼使用了 handle 方法來處理任務 C 的執行結果,上面的代碼中,re 和 throwable 必然有一個是 null,它們分別代表正常的執行結果和異常的情況。

當然,它們也可以都為 null,因為如果它作用的那個 CompletableFuture 實例沒有返回值的時候,re 就是 null。

四、取消任務結果

上面一節,我們說的是,任務 A 執行完 -> 任務 B 執行完 -> 執行任務 C,它們之間有先后執行關系,因為后面的任務依賴於前面的任務的結果。

這節我們來看怎么讓任務 A 和任務 B 同時執行,然后取它們的結果進行后續操作。這里強調的是任務之間的並行工作,沒有先后執行順序。

如果使用 Future 的話,我們通常是這么寫的:

ExecutorService executorService = Executors.newCachedThreadPool();

Future futureA = executorService.submit(() -> "resultA");
Future futureB = executorService.submit(() -> "resultB");

String resultA = futureA.get();
String resultB = futureB.get();
接下來,我們看看 CompletableFuture 中是怎么寫的,看下面的幾行代碼:

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
cfA.runAfterBoth(cfB, () -> {});
第 3 行代碼和第 4 行代碼演示了怎么使用兩個任務的結果 resultA 和 resultB,它們的區別在於,thenAcceptBoth 表示后續的處理不需要返回值,而 thenCombine 表示需要返回值。

如果你不需要 resultA 和 resultB,那么還可以使用第 5 行描述的 runAfterBoth 方法。

注意,上面的寫法和下面的寫法是沒有區別的:

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA");

cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"),
(resultA, resultB) -> {});
千萬不要以為這種寫法任務 A 執行完了以后再執行任務 B。

五、領取多個任務

接下來,我們將介紹兩個非常簡單的靜態方法:allOf() 和 anyOf() 方法。

public static CompletableFuture allOf(CompletableFuture<?>... cfs){...}
public static CompletableFuture


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM