CompletableFuture詳解


在JDK1.5已經提供了Future和Callable的實現,可以用於阻塞式獲取結果,如果想要異步獲取結果,通常都會以輪詢的方式去獲取結果,如下:

 1 //定義一個異步任務
 2 Future<String> future = executor.submit(()->{
 3        Thread.sleep(2000);
 4        return "hello world";
 5 });
 6 //輪詢獲取結果
 7 while (true){
 8     if(future.isDone()) {
 9          System.out.println(future.get());
10          break;
11      }
12  }

從上面的形式看來輪詢的方式會耗費無謂的CPU資源,而且也不能及時地得到計算結果.所以要實現真正的異步,上述這樣是完全不夠的,在Netty中,我們隨處可見異步編程

1 ChannelFuture f = serverBootstrap.bind(port).sync();
2 f.addListener(new GenericFutureListener<Future<? super Void>>() {
3                 @Override
4                 public void operationComplete(Future<? super Void> future) throws Exception {
5                     System.out.println("complete");
6                 }
7             });

    而JDK1.8中的CompletableFuture就為我們提供了異步函數式編程,CompletableFuture提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果,並且提供了轉換和組合CompletableFuture的方法。

1. 創建CompletableFuture對象

    CompletableFuture提供了四個靜態方法用來創建CompletableFuture對象:

1 public static CompletableFuture<Void>   runAsync(Runnable runnable)
2 public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
3 public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
4 public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc表示異步,而supplyAsyncrunAsync不同在與前者異步返回一個結果,后者是void.第二個函數第二個參數表示是用我們自己創建的線程池,否則采用默認的ForkJoinPool.commonPool()作為它的線程池.其中Supplier是一個函數式接口,代表是一個生成者的意思,傳入0個參數,返回一個結果.(更詳細的可以看我另一篇文章)

1 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
2             return "hello world";
3   });
4 System.out.println(future.get());  //阻塞的獲取結果  ''helllo world"

2. 主動計算

    以下4個方法用於獲取結果

1 //同步獲取結果
2 public T    get()
3 public T    get(long timeout, TimeUnit unit)
4 public T    getNow(T valueIfAbsent)
5 public T    join()

getNow有點特殊,如果結果已經計算完則返回結果或者拋出異常,否則返回給定的valueIfAbsent值。join()get()區別在於join()返回計算的結果或者拋出一個unchecked異常(CompletionException),而get()返回一個具體的異常.

  • 主動觸發計算.
1 public boolean complete(T  value)
2 public boolean completeExceptionally(Throwable ex)

上面方法表示當調用CompletableFuture.get()被阻塞的時候,那么這個方法就是結束阻塞,並且get()獲取設置的value.

 
 1 public static CompletableFuture<Integer> compute() {
 2         final CompletableFuture<Integer> future = new CompletableFuture<>();
 3         return future;
 4     }
 5     public static void main(String[] args) throws Exception {
 6         final CompletableFuture<Integer> f = compute();
 7         class Client extends Thread {
 8             CompletableFuture<Integer> f;
 9             Client(String threadName, CompletableFuture<Integer> f) {
10                 super(threadName);
11                 this.f = f;
12             }
13             @Override
14             public void run() {
15                 try {
16                     System.out.println(this.getName() + ": " + f.get());
17                 } catch (InterruptedException e) {
18                     e.printStackTrace();
19                 } catch (ExecutionException e) {
20                     e.printStackTrace();
21                 }
22             }
23         }
24         new Client("Client1", f).start();
25         new Client("Client2", f).start();
26         System.out.println("waiting");
27         //設置Future.get()獲取到的值
28         f.complete(100);
29         //以異常的形式觸發計算
30         //f.completeExceptionally(new Exception());
31         Thread.sleep(1000);
32     }

3. 計算結果完成時的處理

1 public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
2 public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3 public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
4 public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面4個方法是當計算階段結束的時候觸發,BiConsumer有兩個入參,分別代表計算返回值,另外一個是異常.無返回值.方法不以Async結尾,意味着Action使用相同的線程執行,而Async可能會使用其它的線程去執行(如果使用相同的線程池,也可能會被同一個線程選中執行)。

 
1 future.whenCompleteAsync((v,e)->{
2        System.out.println("return value:"+v+"  exception:"+e);
3  });
  • handle()
1 public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
2 public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
3 public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

whenComplete()不同的是這個函數返回CompletableFuture並不是原始的CompletableFuture返回的值,而是BiFunction返回的值.

4. CompletableFuture的組合

  • thenApply
    當計算結算完成之后,后面可以接繼續一系列的thenApply,來完成值的轉化.
1 public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
2 public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
3 public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

它們與handle方法的區別在於handle方法會處理正常計算值和異常,因此它可以屏蔽異常,避免異常繼續拋出。而thenApply方法只是用來處理正常值,因此一旦有異常就會拋出。

 1 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
 2             
 3             return "hello world";
 4         });
 5 
 6 CompletableFuture<String> future3 = future.thenApply((element)->{
 7             return element+"  addPart";
 8         }).thenApply((element)->{
 9             return element+"  addTwoPart";
10         });
11         System.out.println(future3.get());//hello world  addPart  addTwoPart

5. CompletableFuture的Consumer

只對CompletableFuture的結果進行消費,無返回值,也就是最后的CompletableFuture是void.

1 public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
2 public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
3 public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)
 
1 //入參為原始的CompletableFuture的結果.
2 CompletableFuture future4 = future.thenAccept((e)->{
3             System.out.println("without return value");
4         });
5 future4.get();
  • thenAcceptBoth

這個方法用來組合兩個CompletableFuture,其中一個CompletableFuture等待另一個CompletableFuture的結果.

1 CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
2             return "hello world";
3         });
4 CompletableFuture future5 =  future.thenAcceptBoth(CompletableFuture.completedFuture("compose"),
5                 (x, y) -> System.out.println(x+y));//hello world compose

6. Either和ALL

    thenAcceptBoth是當兩個CompletableFuture都計算完成,而我們下面要了解的方法applyToEither是當任意一個CompletableFuture計算完成的時候就會執行。

 1 Random rand = new Random();
 2         CompletableFuture<Integer> future9 = CompletableFuture.supplyAsync(() -> {
 3             try {
 4                 Thread.sleep(1000 + rand.nextInt(1000));
 5             } catch (InterruptedException e) {
 6                 e.printStackTrace();
 7             }
 8             return 100;
 9         });
10         CompletableFuture<Integer> future10 = CompletableFuture.supplyAsync(() -> {
11             try {
12                 Thread.sleep(1000 + rand.nextInt(1000));
13             } catch (InterruptedException e) {
14                 e.printStackTrace();
15             }
16             return 200;
17         });
18         //兩個中任意一個計算完成,那么觸發Runnable的執行
19         CompletableFuture<String> f =  future10.applyToEither(future9,i -> i.toString());
20         //兩個都計算完成,那么觸發Runnable的執行
21         CompletableFuture f1 = future10.acceptEither(future9,(e)->{
22             System.out.println(e);
23         });
24         System.out.println(f.get());

    如果想組合超過2個以上的CompletableFuture,allOfanyOf可能會滿足你的要求.allOf方法是當所有的CompletableFuture都執行完后執行計算。anyOf方法是當任意一個CompletableFuture執行完后就會執行計算,計算的結果相同。

總結

有了CompletableFuture之后,我們自己實現異步編程變得輕松很多,這個類也提供了許多方法來組合CompletableFuture.結合Lambada表達式來用,變得很輕松. 

f的whenComplete的內容由哪個線程來執行,取決於哪個線程X執行了f.complete()。但是當X線程執行了f.complete()的時候,whenComplete還沒有被執行到的時候(就是事件還沒有注冊的時候),那么X線程就不會去同步執行whenComplete的回調了。這個時候哪個線程執行到了whenComplete的事件注冊的時候,就由哪個線程自己來同步執行whenComplete的事件內容。

而whenCompleteAsync的場合,就簡單很多。一句話就是線程池里面拿一個空的線程或者新啟一個線程來執行回調。和執行f.complete的線程以及執行whenCompleteAsync的線程無關。

 

轉自:https://www.jianshu.com/p/547d2d7761db

參考:https://blog.csdn.net/leon_wzm/article/details/80560081


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM