前言
CompletionStage是Java8新增得一個接口,用於異步執行中的階段處理,其大量用在Lambda表達式計算過程中,目前只有CompletableFuture一個實現類,但我先從這個接口的方法開始介紹,為了舉例說明這些接口方法的使用,會用到部分CompletableFuture的方法,下一步再詳細的介紹CompletableFuture。
CompletionStage定義了一組接口用於在一個階段執行結束之后,要么繼續執行下一個階段,要么對結果進行轉換產生新的結果等等,一般來說要執行下一個階段都需要上一個階段正常完成,當然這個類也提供了對異常結果的處理接口。CompletionStage只定義了一組基本的接口,其實現類還可據此擴展出更豐富的方法。
方法概述
CompletionStage的接口方法可以從多種角度進行分類,從最宏觀的橫向划分,CompletionStage的接口主要分三類:
一、產出型或者函數型:就是用上一個階段的結果作為指定函數的參數執行函數產生新的結果。這一類接口方法名中基本都有apply字樣,接口的參數是(Bi)Function類型。
二、消耗型或者消費型:就是用上一個階段的結果作為指定操作的參數執行指定的操作,但不對階段結果產生影響。這一類接口方法名中基本都有accept字樣,接口的參數是(Bi)Consumer類型。
三、不消費也不產出型:就是不依據上一個階段的執行結果,只要上一個階段完成(但一般要求正常完成),就執行指定的操作,且不對階段的結果產生影響。這一類接口方法名中基本都有run字樣,接口的參數是Runnable類型。
還有一組特別的方法帶有compose字樣,它以依賴階段本身作為參數而不是階段產生的結果進行產出型(或函數型)操作。
在以上三類橫向划分方法的基礎上,又可以按照以下的規則對這些接口方法進行縱向的划分:
一、多階段的依賴:一個階段的執行可以由一個階段的完成觸發,或者兩個階段的同時完成,或者兩個階段中的任何一個完成。
- 方法前綴為then的方法安排了對單個階段的依賴。
- 那些由完成兩個階段而觸發的,可以結合他們的結果或產生的影響,這一類方法帶有combine或者both字樣。
- 那些由兩個階段中任意一個完成觸發的,不能保證哪個的結果或效果用於相關階段的計算,這類方法帶有either字樣。
二、按執行的方式:階段之間的依賴關系控制計算的觸發,但不保證任何特定的順序。因為一個階段的執行可以采用以下三種方式之一安排:
- 默認的執行方式。所有方法名沒有以async后綴的方法都按這種默認執行方式執行。
- 默認的異步執行。所有方法名以async為后綴,但沒有Executor參數的方法都屬於此類。
- 自定義執行方式。所有方法名以async為后綴,並且具有Executor參數的方法都屬於此類。
默認的執行方式(包括默認的異步執行)的執行屬性由CompletionStage的實現類指定例如CompletableFuture,而自定義的執行方式的執行屬性由傳入的Executor指定,這可能具有任意的執行屬性,甚至可能不支持並發執行,但還是被安排異步執行。
三、按上一個階段的完成狀態:無論觸發階段是正常完成還是異常完成,都有兩種形式的方法支持處理。
- 不論上一個階段是正常還是異常完成:
- whenComplete方法可以在上一個階段不論以何種方式完成的處理,但它是一個消費型接口,即不對整個階段的結果產生影響。
- handle前綴的方法也可以在上一個階段不論以何種方式完成的處理,它是一個產出型(或函數型)接口,既可以由上一個階段的異常產出新結果,也可以其正常結果產出新結果,使該結果可以由其他相關階段繼續進一步處理。
- 上一個階段是異常完成的時候執行:exceptionally方法可以在上一個階段以異常完成時進行處理,它可以根據上一個階段的異常產出新的結果,使該結果可以由其他相關階段繼續進一步處理。
CompletionStage的異常規則
除了whenComplete不要求其依賴的階段是正常完成還是異常完成,以及handle前綴的方法只要求其依賴的階段異常完成之外,其余所有接口方法都要求其依賴的階段正常完成。
- 如果一個階段的執行由於一個(未捕獲的)異常或錯誤而突然終止,那么所有要求其完成的相關階段也將異常地完成,並通過CompletionException包裝其具體異常堆棧。
- 如果一個階段同時依賴於兩個階段,並且兩個階段都異常地完成,那么CompletionException可以對應於這兩個異常中的任何一個。
- 如果一個階段依賴於另外兩個階段中的任何一個,並且其中只有一個異常完成,則不能保證依賴階段是正常完成還是異常完成。
- 在使用方法whenComplete的情況下,當提供的操作本身遇到異常時,如果前面的階段沒有異常完成,則階段將以其異常作為原因異常完成。
所有方法都遵循上述觸發、執行和異常完成規范,此外,雖然用於傳遞一個表示完成結果的參數(也就是說,對於T類型的參數)可以為null,但是如果為其它任何參數傳遞null都將導致NullPointerException。此接口不定義用於初始創建、強制正常或異常完成、探測完成狀態或結果或等待階段完成的方法。CompletionStage的實現類可以提供適當的方法來實現這些效果。方法 toCompletableFuture 通過提供一個公共轉換類型,支持該接口的不同實現之間的互操作性。
方法示例
通過上面的方法概述,已經將CompletionStage的所有接口方法進行了概要的說明,下面通過示例將這些方法進行介紹。
一、根據階段正常完成結果的產出型(或者叫函數型):
1 //依賴單個階段 2 public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); // 默認執行方式 3 public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);// 默認的異步執行方式 4 public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); //自定義的執行方式 5 6 //依賴兩個階段都完成 7 public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); 8 public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); 9 public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor); 10 11 //依賴兩個階段中的任何一個完成 12 public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); 13 public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); 14 public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
這一類方法都由上一階段(或者兩個階段,或者兩個階段中的任意一個)的正常完成結果觸發,然后以該結果執行給定的函數,產出新的結果。這里把異步執行的兩者形式也列舉出來了。
以下是使用示例,運用了CompletionStage實現類CompletableFuture,這里忽略Async版本的異步方法:

1 @Test 2 public void thenApply() { 3 CompletableFuture<String> stage = CompletableFuture.supplyAsync(() -> "hello"); 4 5 String result = stage.thenApply(s -> s + " world").join(); 6 System.out.println(result); 7 } 8 9 @Test 10 public void thenCombine() { 11 String result = CompletableFuture.supplyAsync(() -> { 12 try { 13 Thread.sleep(3000); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 return "hello"; 18 }).thenCombine(CompletableFuture.supplyAsync(() -> { 19 try { 20 Thread.sleep(2000); 21 } catch (InterruptedException e) { 22 e.printStackTrace(); 23 } 24 return "world"; 25 }), (s1, s2) -> s1 + " " + s2).join(); 26 System.out.println(result); 27 } 28 29 @Test 30 public void applyToEither() { 31 String result = CompletableFuture.supplyAsync(() -> { 32 try { 33 Thread.sleep(3000); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 return "Tom"; 38 }).applyToEither(CompletableFuture.supplyAsync(() -> { 39 try { 40 Thread.sleep(3000); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 return "John"; 45 }), s -> "hello " + s).join(); 46 System.out.println(result); 47 }
這些示例展示了根據一個階段的結果、兩個階段的結果以及兩個階段中最先完成的結果進行轉換,並返回新的結果。第一個和第一個示例結果都是"hello world",其中第二個示例不論兩個階段誰先完成,參數s1都是"hello",參數s2都是"world'。第三個示例,applyToEither依賴兩個階段誰最先完成,其結果有時候是"hello Tom",有時候是"hello John"
二、根據階段正常完成結果的消費型:
1 //依賴單個階段 2 public CompletionStage<Void> thenAccept(Consumer<? super T> action); 3 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); 4 public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); 5 6 //依賴兩個階段都完成 7 public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); 8 public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); 9 public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); 10 11 //依賴兩個階段中的任何一個完成 12 public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); 13 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); 14 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
這一類方法都由上一階段(或者兩個階段,或者兩個階段中的任意一個)正常完成的結果觸發,然后以該結果執行給定的操作action,但不會對階段的結果進行影響。這里把異步執行的兩者形式也列舉出來了。
以下是使用示例,運用了CompletionStage實現類CompletableFuture,這里忽略Async版本的異步方法:

1 @Test 2 public void thenAccept(){ 3 CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world")); 4 } 5 6 @Test 7 public void thenAcceptBoth() { 8 CompletableFuture.supplyAsync(() -> { 9 try { 10 Thread.sleep(3000); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 return "hello"; 15 }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { 16 try { 17 Thread.sleep(2000); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 return "world"; 22 }), (s1, s2) -> System.out.println(s1 + " " + s2)); 23 24 while (true){} //等待打印出結果 25 } 26 27 @Test 28 public void acceptEither() { 29 CompletableFuture.supplyAsync(() -> { 30 try { 31 Thread.sleep(3000); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 return "hello john"; 36 }).acceptEither(CompletableFuture.supplyAsync(() -> { 37 try { 38 Thread.sleep(3000); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } 42 return "hello tom"; 43 }), System.out::println); 44 45 while (true){} //等待打印出結果 46 }
示例展示了根據一個階段的結果、兩個階段的結果以及兩個階段中最先完成的結果進行消耗,並沒有返回值。acceptEither的示例中,依賴兩個階段誰最先完成,打印結果有時候是"hello tom",有時候是"hello john"。
三、只要求依賴的階段正常完成的不消耗也不產出型:
1 //依賴單個階段 2 public CompletionStage<Void> thenRun(Runnable action); 3 public CompletionStage<Void> thenRunAsync(Runnable action); 4 public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor); 5 6 7 //依賴兩個階段都完成 8 public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action); 9 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action); 10 public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor); 11 12 13 //依賴兩個階段中的任何一個完成 14 public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action); 15 public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action); 16 public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
這一類方法只要求上一階段(或者兩個階段,或者兩個階段中的任意一個)正常完成,並不關心其具體結果,從而執行指定的操作cation,但不會對階段的結果進行影響。這里把異步執行的兩者形式也列舉出來了。
以下是使用示例,運用了CompletionStage實現類CompletableFuture,這里忽略Async版本的異步方法:

1 @Test 2 public void thenRun(){ 3 CompletableFuture.supplyAsync(() -> { 4 try { 5 Thread.sleep(2000); 6 } catch (InterruptedException e) { 7 e.printStackTrace(); 8 } 9 return "hello"; 10 }).thenRun(() -> System.out.println("hello world")); 11 while (true){} 12 } 13 14 @Test 15 public void runAfterBoth(){ 16 //不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage正常執行完畢,之后在進行操作(Runnable)。 17 CompletableFuture.supplyAsync(() -> { 18 try { 19 Thread.sleep(2000); 20 } catch (InterruptedException e) { 21 e.printStackTrace(); 22 } 23 return "s1"; 24 }).runAfterBoth(CompletableFuture.supplyAsync(() -> { 25 try { 26 Thread.sleep(3000); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 return "s2"; 31 }), () -> System.out.println("hello world")); 32 while (true){} 33 } 34 35 @Test 36 public void runAfterEither() { 37 //兩個CompletionStage,任何一個正常完成了都會執行下一步的操作(Runnable)。 38 CompletableFuture.supplyAsync(() -> { 39 try { 40 Thread.sleep(3000); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 return "s1"; 45 }).runAfterEither(CompletableFuture.supplyAsync(() -> { 46 try { 47 Thread.sleep(2000); 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } 51 return "s2"; 52 }), () -> System.out.println("hello world")); 53 while (true) { 54 } 55 }
示例展示了只要依賴的上一個階段(或者兩個階段,或者兩個階段中的任意一個)正常完成,就會執行指定的操作,並且不會依賴上一個階段(或者兩個階段,或者兩個階段中的任意一個最先完成的階段)的結果。三個示例都回打印出"hello world"
四、根據正常完成的階段本身而不是其結果的產出型:
以上產出型的方法都是應用依賴階段的正常執行結果,CompletionStage提供了一組以階段本身為依據的產出型接口方法:
1 public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); 2 public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); 3 public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
以下是使用示例:

1 @Test 2 public void thenCompose(){ 3 String result = CompletableFuture.supplyAsync(() -> { 4 try { 5 Thread.sleep(3000); 6 } catch (InterruptedException e) { 7 e.printStackTrace(); 8 } 9 return "hello"; 10 }).thenCompose(s -> CompletableFuture.supplyAsync(() -> { 11 try { 12 Thread.sleep(3000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 return s + " world"; 17 })).join(); 18 19 System.out.println(result); 20 }
其階段最終結果打印出"hello world",thenCompose和thenCombine很相似,但看起來thenCompose比thenCombine更簡潔。
五、不論階段正常還是異常完成的消耗型:
2 public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); 3 public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); 4 public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
上面的一、二、三、四種類型的方法都需要依賴的階段正常完成,如果異常完成將導致上面介紹的四種類型的方法最終也異常完成,不會得出我們希望的結果。而whenComplete則不論依賴的上一個階段是正常完成還是異常完成都不會影響它的執行,但它是一個消耗型接口,即不會對階段的原來結果產生影響,結合thenCombine綜合whenComplete的示例如下:
1 @Test 2 public void thenCombine(){ 3 String result = CompletableFuture.supplyAsync(() -> { 4 try { 5 Thread.sleep(4000); 6 } catch (InterruptedException e) { 7 e.printStackTrace(); 8 } 9 10 if (1 == 1) { 11 throw new RuntimeException("測試一下異常情況"); 12 } 13 14 return "hello "; 15 }).thenCombine(CompletableFuture.supplyAsync(() -> { 16 try { 17 Thread.sleep(3000); 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } 21 System.out.println("return world..."); //會執行 22 return "world"; 23 }), (s1, s2) -> { 24 String s = s1 + " " + s2; //並不會執行 25 System.out.println("combine result :"+s); //並不會執行 26 return s; 27 }).whenComplete((s, t) -> { 28 System.out.println("current result is :" +s); 29 if(t != null){ 30 System.out.println("階段執行過程中存在異常:"); 31 t.printStackTrace(); 32 } 33 }).join(); 34 35 System.out.println("final result:"+result); //並不會執行 36 }
上例中,whenComplete的參數s表示通過thenCombine正常完成的結果,如果沒有異常的話,該參數的值就是"hello world",t參數是Throwable類型的異常,因為thenCombine同時依賴兩個階段的正常完成,此時第一個階段中拋出了異常,所以不會執行thenCombine指定的函數,即不會打印"combine result",whenComplete不論是否前面的階段是否出現異常都會執行,最后打印出這樣的信息:

1 return world... 2 current result is :null 3 階段執行過程中存在異常: 4 java.util.concurrent.CompletionException: java.lang.RuntimeException: 測試一下異常情況 5 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) 6 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) 7 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) 8 at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) 9 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 10 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 11 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 12 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 13 Caused by: java.lang.RuntimeException: 測試一下異常情況 14 at com.Queue.ArrayListTest.lambda$thenCombine$2(ArrayListTest.java:37) 15 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) 16 ... 5 more 17 18 java.util.concurrent.CompletionException: java.lang.RuntimeException: 測試一下異常情況 19 20 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) 21 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) 22 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) 23 at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) 24 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 25 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 26 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 27 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 28 Caused by: java.lang.RuntimeException: 測試一下異常情況 29 at com.Queue.ArrayListTest.lambda$thenCombine$2(ArrayListTest.java:37) 30 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) 31 ... 5 more
如果將上例中的thenCombine換成applyToEither,那么如果兩個階段中最先完成的階段是異常完成,那么其結果與上面不變,還是異常結束;如果最先完成的階段是正常完成(把拋異常之前那個hread.sleep(3000) 改成 hread.sleep(2000) )的話,那么整個階段將不會出現異常,whenComplete的參數s就是"hello world",t為null。
六、不論階段正常還是異常完成的產出型:
whenComplete是對不論依賴階段正常完成還是異常完成時的消耗或者消費,即不會改變階段的現狀,而handle前綴的方法則是對應的產出型方法,即可以對正常完成的結果進行轉換,也可以對異常完成的進行補償一個結果,即可以改變階段的現狀。
1 //不論正常還是異常的產出型: 2 public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn); 3 public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); 4 public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
舉一個簡單的示例:
1 @Test 2 public void handle() { 3 String result = CompletableFuture.supplyAsync(() -> { 4 try { 5 Thread.sleep(3000); 6 } catch (InterruptedException e) { 7 e.printStackTrace(); 8 } 9 //出現異常 10 if (1 == 3) { 11 throw new RuntimeException("測試一下異常情況"); 12 } 13 return "Tom"; 14 }).handle((s, t) -> { 15 if (t != null) { //出現異常了 16 return "John"; 17 } 18 return s; //這里也可以對正常結果進行轉換 19 }).join(); 20 System.out.println(result); 21 }
handle的第一個參數s是上一個階段的結果,t參數是Throwable類型的異常,這里上一個階段如果沒有拋出異常,那么最終打印的結果是"Tom",現在通過handle對出現異常的情況進行了補償返回John,所以上例最終其實打印的是"John"。
七、異常完成的產出型:
第五、六兩種類型的方法是對於不論依賴的階段是正常完成還是異常完成的處理,CompletionStage還提供了一個僅當上一個階段異常完成時的處理,並且可以修改階段的結果:
1 public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
如下示例:
1 @Test 2 public void exceptionally() { 3 String result = CompletableFuture.supplyAsync(() -> { 4 try { 5 Thread.sleep(3000); 6 } catch (InterruptedException e) { 7 e.printStackTrace(); 8 } 9 if (1 == 1) { 10 throw new RuntimeException("測試一下異常情況"); 11 } 12 return "s1"; 13 }).exceptionally(e -> { 14 e.printStackTrace(); //e肯定不會null 15 return "hello world"; //補償返回 16 }).join(); 17 System.out.println(result); //打印hello world 18 }
可見exceptionally只有一個參數e,表示上一個節點的異常,只有上一個階段異常完成才會被執行,以上示例在異常時返回了新的值"hello world"對出現異常的階段進行了補償,所以最終整個階段不會出現異常,並打印出"hello world"。
八、實現該接口不同實現之間互操作的類型轉換方法:
1 public CompletableFuture<T> toCompletableFuture();
返回一個與此階段保持相同完成屬性的CompletableFuture實例。如果此階段已經是一個CompletableFuture,那么直接返回該階段本身,否則此方法的調用可能等效於thenApply(x -> x),但返回一個類型為CompletableFuture的實例。不選擇實現該互操作性的CompletionStage實現,可能會拋出UnsupportedOperationException異常。