[轉] CompletableFuture教程


 

原文地址:https://mp.weixin.qq.com/s/dsANLERHskYhPWlnabnjhg  作者:指北君

 

1 CompletableFuture的靜態方法使用

  CompleteableFuture的靜態方法有如下

 

 

   之前的文章里面已經講過suuplyAsync,以及runAsync。我們就直接看其他方法

 delayedExcutor

  delayedExcutor其作用是構建一個延遲執行任務的Excutor,默認使用ForkJoinPool. 也可以使用自定義的Excutor。

一個延遲5秒執行任務的Excutor,默認使用使用ForkJoinPool.commonPool()。
Executor executor = CompletableFuture.delayedExecutor(5l, TimeUnit.SECONDS);

allof和anyof

  allof和anyof 為等待多個CompletableFuture完成之后返回一個CompletableFuture。

  • allof返回無result,

  • anyof返回為最先完成的CompletableFuture。

  可以看如下示例。

CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(()-> {
    try {Thread.sleep(4 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "supplyAsync1";
});
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> {
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "supplyAsync2";
});
CompletableFuture.anyOf(supplyAsync1,supplyAsync2).thenAccept((str)-> {
    System.out.println(LocalDateTime.now() + " anyOf complete : " + str);
});
CompletableFuture.allOf(supplyAsync1,supplyAsync2).thenAccept((str)-> {
    System.out.println(LocalDateTime.now() + " allOf complete "+ str );
});

  執行結果如下:

start second: 2021-10-24T12:39:40.562001600
2021-10-24T12:39:42.611118800 anyOf complete : supplyAsync2
2021-10-24T12:39:44.611233200 allOf complete null

  failedStage和failedFuture是返回一個已知異常的CompletableFuture。這個下面和其他異常一起舉例。

 

2 CompletableFuture的其余方法使用

  CompletableFuture中方法可以大致分為run,apply,accept幾個類別。其對應的參數分別為Runnable,Function,Consummer等幾個函數式表達式。

  1. run代表當前CompletableFuture完成后執行的一些列操作,無輸入參數,無返回結果,所以只是Runnable為參數。()-> { option }

  2. apply代表以當前CompletableFuture完成后的結果為參數進行的操作,並且會返回一個新的CompletableFuture,所以以Function為參數。(s)-> {return s;}

  3. accept代表以當前CompletableFuture完成后的結果為參數,執行的操作,無返回結果,直接消費。以Consumer為參數,(s)-> { option }。

2.1 Run方法

  Run方法相關參數為Runnable,為直接執行的操作。

  thenRun 完成之后直接執行。

  thenRunAsync 使用線程池異步執行,線程池默認為ForkJoinPool.commonPool

  runAfterBoth/ runAfterEither 兩個CompletableFuture同時完成或者某一個完成就執行的操作。

  runAfterBothAsync/runAfterEitherAsync 同理為使用線程池異步執行的操作。

public class CompletableFutureThenRun {
    public static void main(String[] args) {
        System.out.println(" CompletableFutureThenRun main start : " + LocalDateTime.now());
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(" CompletableFutureThenRun  cf1: " + LocalDateTime.now());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "supplyAsync";
        });

        CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
            System.out.println(" CompletableFutureThenRun  cf2: " + LocalDateTime.now());
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        cf1.runAfterBoth(cf2,()-> {
            System.out.println(Thread.currentThread().getName()+" CompletableFutureThenRun runAfterBoth: " + LocalDateTime.now());
        });
        cf1.runAfterBothAsync(cf2,()-> {
            System.out.println( Thread.currentThread().getName()+" CompletableFutureThenRun runAfterBothAsync: " + LocalDateTime.now());
        });
        cf1.runAfterEither(cf2,()-> {
            System.out.println( Thread.currentThread().getName()+" CompletableFutureThenRun runAfterEither: " + LocalDateTime.now());
        });
        cf1.runAfterEitherAsync(cf2,()-> {
            System.out.println( Thread.currentThread().getName()+" CompletableFutureThenRun runAfterEitherAsync: " + LocalDateTime.now());
        });
        cf1.thenRunAsync(()-> {
            System.out.println(Thread.currentThread().getName()+" CompletableFutureThenRun thenRunAsync: " + LocalDateTime.now());
        });
        cf1.thenRun(()-> {
            System.out.println(Thread.currentThread().getName()+" CompletableFutureThenRun thenRun: " + LocalDateTime.now());
        });
        System.out.println(Thread.currentThread().getName() + " CompletableFutureThenRun  last: " + LocalDateTime.now());

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  上述執行結果:

CompletableFutureThenRun main start : 2021-10-25T01:48:52.416000900
CompletableFutureThenRun  cf1: 2021-10-25T01:48:52.492008500
CompletableFutureThenRun  cf2: 2021-10-25T01:48:52.493008600
main CompletableFutureThenRun  last: 2021-10-25T01:48:52.495008800
ForkJoinPool.commonPool-worker-7 CompletableFutureThenRun runAfterEitherAsync: 2021-10-25T01:48:54.495208800
ForkJoinPool.commonPool-worker-3 CompletableFutureThenRun runAfterEither: 2021-10-25T01:48:54.495208800
ForkJoinPool.commonPool-worker-5 CompletableFutureThenRun thenRun: 2021-10-25T01:48:57.493508600
ForkJoinPool.commonPool-worker-3 CompletableFutureThenRun thenRunAsync: 2021-10-25T01:48:57.494508700
ForkJoinPool.commonPool-worker-3 CompletableFutureThenRun runAfterBoth: 2021-10-25T01:48:57.494508700
ForkJoinPool.commonPool-worker-3 CompletableFutureThenRun runAfterBothAsync: 2021-10-25T01:48:57.495508800

  apply 與accept相關的方法類似,此處不一一舉例了。

  下面我們根據一些情景舉例來說明方法如何使用:

2.2 多個 CompletableFuture組合在一起執行

  情景一:先去取快遞,然后再去買菜,然后回家做飯。

CompletableFuture<String> cf = CompletableFuture.supplyAsync(()-> {
    System.out.println(LocalDateTime.now() + " 正在取快遞! ");
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "快遞1";
}).thenApply((str) -> {
    System.out.println(LocalDateTime.now() + " 拿到了: "+str);
    System.out.println(LocalDateTime.now() + " 買菜中。。。 ");
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return str + " 和 蔬菜";
}).thenApply((str2)-> {
    System.out.println(LocalDateTime.now() + " 現在有了: ["+str2+"]");
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "帶着 [" + str2 + " ]回家做飯" ;
});
System.out.println( LocalDateTime.now() + " 美好的一天: "+ cf.join());

  下面看一下上面的執行結果,

2021-10-25T01:10:16.831465600 正在取快遞! 
2021-10-25T01:10:18.861668600 拿到了: 快遞1
2021-10-25T01:10:18.911673600 買菜中。。。 
2021-10-25T01:10:20.911873600 現在有了: [快遞1 和 蔬菜]
2021-10-25T01:10:16.831465600 美好的一天: 帶着 [快遞1 和 蔬菜 ]回家做飯

  可以看到最后一行輸出的時間比較早,這是因為join會阻塞線程,直到此CompletableFuture執行完並獲取到值。

 

  情景二:和女朋友一起出門,我去取快遞,女朋友去買菜,然后一起回家做飯。

CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
    System.out.println(LocalDateTime.now() + " 正在取快遞! ");
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "快遞";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()->{
    System.out.println(LocalDateTime.now() + " 女朋友正在買菜! ");
    try {Thread.sleep(4 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return "蔬菜";
});

cf1.thenAcceptBoth(cf2,(str1 ,str2 )->{
    System.out.println(LocalDateTime.now() +  " ["+ str1 + "]["+str2+"] 帶回來了,開始做飯 ");
}).join();

  此處使用 thenAcceptBoth 需要在兩個CompletableFuture都完成的情況下,才能執行,所以最后使用join()使其阻塞到可以執行當前的操作。

 

  情景三:和女朋友一起出門,我去取快遞,女朋友去買菜,誰先弄完誰就先回去。

cf1.acceptEither(cf2,(str1 )-> {
    System.out.println(LocalDateTime.now() +  " ["+ str1 +"] 帶回來了,先回家吧! ");
}).join();

  我先拿到了快遞,就快快的回家了,然后就挨了一頓毒打。

2.3 在兩個CompletableFuture運行后再次計算

  晚飯過后和女朋友討論做什么事情,然而發生了分歧:

CompletableFuture<List<String>> cf1 = CompletableFuture.supplyAsync(()->{
    List<String> strings = Arrays.asList("看電影", "打撲克");
    System.out.println(LocalDateTime.now() + " 晚飯后女朋友說,想要: " + strings);
    try {Thread.sleep(2 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return strings;
});
CompletableFuture<List<String>> cf2 = CompletableFuture.supplyAsync(()->{
    List<String> strings = Arrays.asList("看電影", "打游戲");
    System.out.println(LocalDateTime.now() + " 晚飯后,我想: " + strings);
    try {Thread.sleep(4 * 1000);} catch (InterruptedException e) {e.printStackTrace();}
    return strings;
});

cf1.thenCombine(cf2,(list1,list2) -> {
        System.out.println("遭受了一頓毒打之后。。。!!!");
    List<String> collect = list1.stream().filter(str -> list2.contains(str)).collect(Collectors.toList());
    System.out.println(LocalDateTime.now() + " 綜合兩個人的想法,最終決定: " + collect);
    return collect;
}).join();

  女朋友想看電影,或者打撲克,但是我想打游戲。最后遭受一頓毒打之后,還是說出了或者看電影。最終選擇了看電影。

 
        
 
        

4 CompletableFuture的異常處理

  CompletableFuture和異常相關的方法有如下

4.1 whenComplete/whenCompleteAsync

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

  whenCompletable使用有BiConsumer里面會有兩個參數,下邊是一個示例。參數需要兩個分別為str,exception, 如果有異常exception有值,str為null。如果stringCompletableFuture正常完成,則exception為null。但是不管是否有異常,表達式里面的方法均會執行。

  有點類似try finally{},有沒有異常均可執行。

CompletableFuture<String> whenCompleteCF = stringCompletableFuture.whenComplete((str, exception) -> {
     if(exception != null){
         System.out.println("whenComplete : " + exception);
         exception.printStackTrace();
     }
     System.out.println("whenComplete execute whither error throw ");
});

4.2 exceptionally

  exceptionally方法中為一個Function參數,需要一個輸入值,為當前CompletableFuture拋出的異常。

  其返回值有兩個結果:

  1. 如果當前CompletableFuture無異常完成,則返回與原CompletableFuture的result相同的CompletableFuture,注意知識result相同,並不是同一個類。

  2. 如果當前CompletableFuture有異常拋出,那么返回新的CompletableFuture以及新處理后的result。

        CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException(" CompletableFuture throw one exception");
//            return "cc";
        });

        CompletableFuture<String> exceptionally = stringCompletableFuture.exceptionally((exception) -> {
            System.out.println("exceptionally only execute  when error throw ");
            return "exception";
        });
         System.out.println("exceptionally  : " + exceptionally.join());

  上述示例無異常拋出時結果如下:

exceptionally  :cc

  有異常拋出時結果如下:

exceptionally only execute  when error throw 
exceptionally  :exception

4.3 handle/handleAsync

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

  handle 和 whenComplete 比較類似,無論有沒有異常,里面的方法均會執行到。但是有有一些區別,handle參數為BiFunction,有返回值,whenComplet的參數為BiComsumer 無返回值。

  下面的實例中,如果有異常則參數中的str為null,如果沒有異常exception為null。

CompletableFuture<String> handle = stringCompletableFuture.handle((str, exception) -> {
    System.out.println("handle : " + str);
    if(exception != null ){
        System.out.println("stringCompletableFuture1 have exception :" );
        exception.printStackTrace();
    }
    return "handle complete ";
});

  有異常的執行結果:

stringCompletableFuture1 have exception :
handle.join(); :handle complete

  無異常的執行結果

handle :cc
handle.join(); :handle complete

4.4 failedStage/failedFuture

  failedStage和failedFuture均為靜態方法,會返回一個已完成的給定異常的CompletableFuture。

  failedStage返回的是CompletionStage,failedFuture返回為CompletableFuture對象

CompletionStage<Object> test_exception = CompletableFuture.failedStage(new RuntimeException("test exception"));
CompletableFuture<Object> test_exception1 = CompletableFuture.failedFuture(new RuntimeException("test exception"));

  最后給一個可以直接食用的示例,可以根據不同的需求進行改良哦!

 

public static Map<String, List<Integer>> testMap = new ConcurrentHashMap<>();

static {
    testMap.put("A", Arrays.asList(1,2,3,4,5));
    testMap.put("B", Arrays.asList(6,7,8,9,10));
    testMap.put("C", Arrays.asList(11,12,13,14,15));
    testMap.put("D", Arrays.asList(21,22,23,24,25));
    testMap.put("E", Arrays.asList(31,32,33,34,35));
}

public static void main(String[] args) {
    System.out.println(" CompletableFutureDemo5 main start : " + LocalDateTime.now());
    List<String> strings = Arrays.asList("A", "B", "C", "D", "E");

    ExecutorService testPool = new ForkJoinPool(4);
    List<CompletableFuture<List<Integer>>> collect = strings.stream().map(
            key -> CompletableFuture.supplyAsync(() -> {
                return obtainTheList(key);
            },testPool).exceptionally((exc)->{
                System.out.println(" hit  the exception " );
                throw new RuntimeException(exc);
            })
    ).collect(Collectors.toList());

    System.out.println(" CompletableFutureDemo5 supplyAsync end : " + LocalDateTime.now());
    try {
        List<List<Integer>> integerCollect = collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }catch (Exception e){
        System.out.println(" catch  the exception " + e.getMessage());
        e.printStackTrace();
    }
    System.out.println(" CompletableFutureDemo5 main end : " + LocalDateTime.now());
    try {
        Thread.sleep(5*1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

private static List<Integer> obtainTheList(String key) {
    List<Integer> integers = testMap.get(key);
    if( key.equals("C") ){
        throw new RuntimeException("exception test !");
    }
    try {
        Thread.sleep(2*1000);
        System.out.println(" obtainTheList thread name : " + Thread.currentThread().getName() +" : "+ LocalDateTime.now());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return integers==null? new ArrayList() :integers;
}

 

總結

  本片用了一些示例來講解CompletableFuture,我們可以在開發中的一些場景中使用起來了。特別是異步多線程去拿一些數據的時候,非常好用哦。

  詳細的Demo,請看傳送門:https://github.com/javatechnorth/java-study-note/tree/master/multiThread/src/main/java/org/javanorth/currency/completableFuture

  作者:指北君,操千曲而后曉聲,觀千劍而后識器。

 

 
        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM