CompletableFuture 使用介紹


一、幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。

上面的代碼確實沒什么用,下面介紹幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。

CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
  • runAsync 方法接收的是 Runnable 的實例,意味着它沒有返回值
  • supplyAsync 方法對應的是有返回值的情況
  • 這兩個方法的帶 executor 的變種,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在 ForkJoinPool.commonPool() 線程池中執行的。

好的,現在我們已經有了第一個 CompletableFuture 實例了,我們來看接下來的內容

二、任務執行順序

我們先來看執行兩個任務的情況,首先執行任務 A,然后將任務 A 的結果傳遞給任務 B。

其實這里有很多種情況,任務 A 是否有返回值,任務 B 是否需要任務 A 的返回值,任務 B 是否有返回值,等等。有個明確的就是,肯定是任務 A 執行完后再執行任務 B。

我們用下面的 6 行代碼來說:

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); 
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); 
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB"); CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");

前面 3 行代碼演示的是,任務 A 無返回值,所以對應的,第 2 行和第 3 行代碼中,resultA 其實是 null

第 4 行用的是 thenRun(Runnable runnable),任務 A 執行完執行 B,並且 B 不需要 A 的結果。

第 5 行用的是 thenAccept(Consumer action),任務 A 執行完執行 B,B 需要 A 的結果,但是任務 B 不返回值。

第 6 行用的是 thenApply(Function fn),任務 A 執行完執行 B,B 需要 A 的結果,同時任務 B 有返回值。

這一小節說完了,如果任務 B 后面還有任務 C,往下繼續調用 .thenXxx() 即可。

三、異常處理

說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);

看下面的代碼:

CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");

上面的代碼中,任務 A、B、C、D 依次執行,如果任務 A 拋出異常(當然上面的代碼不會拋出異常),那么后面的任務都得不到執行。如果任務 C 拋出異常,那么任務 D 得不到執行。

那么我們怎么處理異常呢?看下面的代碼,我們在任務 A 中拋出異常,並對其進行處理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());

上面的代碼中,任務 A 拋出異常,然后通過 .exceptionally() 方法處理了異常,並返回新的結果,這個新的結果將傳遞給任務 B。所以最終的輸出結果是:

errorResultA resultB resultC resultD

再看下面的代碼,我們來看下另一種處理方式,使用 handle(BiFunction fn) 來處理異常:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 任務 C 拋出異常 .thenApply(resultB -> {throw new RuntimeException();}) // 處理任務 C 的返回值或異常 .handle(new BiFunction<Object, Throwable, Object>() { @Override public Object apply(Object re, Throwable throwable) { if (throwable != null) { return "errorResultC"; } return re; } }) .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());

上面的代碼使用了 handle 方法來處理任務 C 的執行結果,上面的代碼中,re 和 throwable 必然有一個是 null,它們分別代表正常的執行結果和異常的情況。

當然,它們也可以都為 null,因為如果它作用的那個 CompletableFuture 實例沒有返回值的時候,re 就是 null。

四、取消任務結果

上面一節,我們說的是,任務 A 執行完 -> 任務 B 執行完 -> 執行任務 C,它們之間有先后執行關系,因為后面的任務依賴於前面的任務的結果。

這節我們來看怎么讓任務 A 和任務 B 同時執行,然后取它們的結果進行后續操作。這里強調的是任務之間的並行工作,沒有先后執行順序。

如果使用 Future 的話,我們通常是這么寫的:

ExecutorService executorService = Executors.newCachedThreadPool();

Future<String> futureA = executorService.submit(() -> "resultA"); Future<String> futureB = executorService.submit(() -> "resultB"); String resultA = futureA.get(); String resultB = futureB.get();

接下來,我們看看 CompletableFuture 中是怎么寫的,看下面的幾行代碼:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B"); cfA.runAfterBoth(cfB, () -> {});

第 3 行代碼和第 4 行代碼演示了怎么使用兩個任務的結果 resultA 和 resultB,它們的區別在於,thenAcceptBoth 表示后續的處理不需要返回值,而 thenCombine 表示需要返回值。

如果你不需要 resultA 和 resultB,那么還可以使用第 5 行描述的 runAfterBoth 方法。

注意,上面的寫法和下面的寫法是沒有區別的:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"), (resultA, resultB) -> {});

千萬不要以為這種寫法任務 A 執行完了以后再執行任務 B。

五、領取多個任務

接下來,我們將介紹兩個非常簡單的靜態方法:allOf() 和 anyOf() 方法。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...} public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...}

這兩個方法都非常簡單,簡單介紹一下。

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC"); CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC); // 所以這里的 join() 將阻塞,直到所有的任務執行結束 future.join();

由於 allOf 聚合了多個 CompletableFuture 實例,所以它是沒有返回值的。這也是它的一個缺點。

anyOf 也非常容易理解,就是只要有任意一個 CompletableFuture 實例執行完成就可以了,看下面的例子:

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC"); CompletableFuture<Object> future = CompletableFuture.anyOf(cfA, cfB, cfC); Object result = future.join();

最后一行的 join() 方法會返回最先完成的任務的結果,所以它的泛型用的是 Object,因為每個任務可能返回的類型不同。

六、either 方法

如果你的 anyOf(...) 只需要處理兩個 CompletableFuture 實例,那么也可以使用 xxxEither() 來處理,

cfA.acceptEither(cfB, result -> {});
cfA.acceptEitherAsync(cfB, result -> {});
cfA.acceptEitherAsync(cfB, result -> {}, executorService);

cfA.applyToEither(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService); cfA.runAfterEither(cfA, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}, executorService);

上面的各個帶 either 的方法,表達的都是一個意思,指的是兩個任務中的其中一個執行完成,就執行指定的操作。它們幾組的區別也很明顯,分別用於表達是否需要任務 A 和任務 B 的執行結果,是否需要返回值。

大家可能會對這里的幾個變種有盲區,這里順便說幾句。

1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一個意思;

2、第二個變種,加了 Async 后綴的方法,代表將需要執行的任務放到 ForkJoinPool.commonPool() 中執行(非完全嚴謹);第三個變種很好理解,將任務放到指定線程池中執行;

3、難道第一個變種是同步的?不是的,而是說,它由任務 A 或任務 B 所在的執行線程來執行,取決於哪個任務先結束


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM