使用CompletableFuture優化你的代碼執行效率


  這篇文章詳細講解java8中CompletableFuture的特性,方法以及實例.

  在java8以前,我們使用java的多線程編程,一般是通過Runnable中的run方法來完成,這種方式,有個很明顯的缺點,就是,沒有返回值,這時候,大家可能會去嘗試使用Callable中的call方法,然后用Future返回結果,如下:

public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> stringFuture = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "async thread";
            }
        });
        Thread.sleep(1000);
        System.out.println("main thread");
        System.out.println(stringFuture.get());

    }

  通過觀察控制台,我們發現先打印 main thread ,一秒后打印 async thread,似乎能滿足我們的需求,但仔細想我們發現一個問題,當調用future的get()方法時,當前主線程是堵塞的,這好像並不是我們想看到的,另一種獲取返回結果的方式是先輪詢,可以調用isDone,等完成再獲取,但這也不能讓我們滿意.

  不管怎么看,這種用法看起來並不優雅,起碼從視覺上就有些丑陋,而且某些場景無法使用,比如說,

  1.很多個異步線程執行時間可能不一致,我的主線程業務不能一直等着,這時候我可能會想要只等最快的線程執行完或者最重要的那個任務執行完,亦或者我只等1秒鍾,至於沒返回結果的線程我就用默認值代替.

  2.我兩個異步任務之間執行獨立,但是第二個依賴第一個的執行結果.

  java8的CompletableFuture,就在這混亂且不完美的多線程江湖中閃亮登場了.CompletableFuture讓Future的功能和使用場景得到極大的完善和擴展,提供了函數式編程能力,使代碼更加美觀優雅,而且可以通過回調的方式計算處理結果,對異常處理也有了更好的處理手段.

  CompletableFuture源碼中有四個靜態方法用來執行異步任務:

創建任務

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..} 

  如果有多線程的基礎知識,我們很容易看出,run開頭的兩個方法,用於執行沒有返回值的任務,因為它的入參是Runnable對象,而supply開頭的方法顯然是執行有返回值的任務了,至於方法的入參,如果沒有傳入Executor對象將會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼.在實際使用中,一般我們使用自己創建的線程池對象來作為參數傳入使用,這樣速度會快些.

  執行異步任務的方式也很簡單,只需要使用上述方法就可以了:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //....執行任務
    return "hello";}, executor)

  接下來看一下獲取執行結果的幾個方法.

V get();
V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join();

  上面兩個方法是Future中的實現方式,get()會堵塞當前的線程,這就造成了一個問題,如果執行線程遲遲沒有返回數據,get()會一直等待下去,因此,第二個get()方法可以設置等待的時間.

   getNow()方法比較有意思,表示當有了返回結果時會返回結果,如果異步線程拋了異常會返回自己設置的默認值.

接下來以一些場景的實例來介紹一下CompletableFuture中其他一些常用的方法.

thenAccept()

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

  功能:當前任務正常完成以后執行,當前任務的執行結果可以作為下一任務的輸入參數,無返回值.

  場景:執行任務A,同時異步執行任務B,待任務B正常返回之后,用B的返回值執行任務C,任務C無返回值

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任務B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
      System.out.println("執行任務C.");
      System.out.println("參數:" + b);//參數:任務B
      return "a";
});

thenRun(..)

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

  功能:對不關心上一步的計算結果,執行下一個操作

  場景:執行任務A,任務A執行完以后,執行任務B,任務B不接受任務A的返回值(不管A有沒有返回值),也無返回值

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務A");
futureA.thenRun(() -> System.out.println("執行任務B"));

thenApply(..)

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

  功能:當前任務正常完成以后執行,當前任務的執行的結果會作為下一任務的輸入參數,有返回值

  場景:多個任務串聯執行,下一個任務的執行依賴上一個任務的結果,每個任務都有輸入和輸出

  實例1:異步執行任務A,當任務A完成時使用A的返回結果resultA作為入參進行任務B的處理,可實現任意多個任務的串聯執行

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");

CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);

System.out.println(future3.join());

  上面的代碼,我們當然可以先調用future.join()先得到任務A的返回值,然后再拿返回值做入參去執行任務B,而thenApply的存在就在於幫我簡化了這一步,我們不必因為等待一個計算完成而一直阻塞着調用線程,而是告訴CompletableFuture你啥時候執行完就啥時候進行下一步. 就把多個任務串聯起來了.

thenCombine(..)  thenAcceptBoth(..)  runAfterBoth(..)

public <U,V> CompletableFuture<V>     thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

  功能:結合兩個CompletionStage的結果,進行轉化后返回

  場景:需要根據商品id查詢商品的當前價格,分兩步,查詢商品的原始價格和折扣,這兩個查詢相互獨立,當都查出來的時候用原始價格乘折扣,算出當前價格. 使用方法:thenCombine(..)

 CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
 CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
 CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
 System.out.println("最終價格為:" + futureResult.join()); //最終價格為:80.0

  thenCombine(..)是結合兩個任務的返回值進行轉化后再返回,那如果不需要返回呢,那就需要thenAcceptBoth(..),同理,如果連兩個任務的返回值也不關心呢,那就需要runAfterBoth了,如果理解了上面三個方法,thenApply,thenAccept,thenRun,這里就不需要單獨再提這兩個方法了,只在這里提一下.

thenCompose(..)

public <U> CompletableFuture<U>     thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>     thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

  功能:這個方法接收的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture

  這個方法和thenApply非常像,都是接受上一個任務的結果作為入參,執行自己的操作,然后返回.那具體有什么區別呢?

  thenApply():它的功能相當於將CompletableFuture<T>轉換成CompletableFuture<U>,改變的是同一個CompletableFuture中的泛型類型

  thenCompose():用來連接兩個CompletableFuture,返回值是一個新的CompletableFuture

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));

CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));

System.out.println(future3.join());

  這段代碼實現的和上面thenApply一樣的效果,在實際使用中,我並沒有很清楚兩個在使用上的區別,如果有大佬,跪求告知.

applyToEither(..)  acceptEither(..)  runAfterEither(..)

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

 

  功能:執行兩個CompletionStage的結果,那個先執行完了,就是用哪個的返回值進行下一步操作
  場景:假設查詢商品a,有兩種方式,A和B,但是A和B的執行速度不一樣,我們希望哪個先返回就用那個的返回值.

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "通過方式A獲取商品a";
        });
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "通過方式B獲取商品a";
        });
CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "結果:" + product);
System.out.println(futureC.join()); //結果:通過方式A獲取商品a

  同樣的道理,applyToEither的兄弟方法還有acceptEither(),runAfterEither(),我想不需要我解釋你也知道該怎么用了.


 exceptionally(..)

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

  功能:當運行出現異常時,調用該方法可進行一些補償操作,如設置默認值.

  場景:異步執行任務A獲取結果,如果任務A執行過程中拋出異常,則使用默認值100返回.

 CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "futureA result:" + s)
                .exceptionally(e -> {
                    System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                });
CompletableFuture<String> futureB = CompletableFuture.
                supplyAsync(() -> "執行結果:" + 50)
                .thenApply(s -> "futureB result:" + s)
                .exceptionally(e -> "futureB result: 100");
System.out.println(futureA.join());//futureA result: 100
System.out.println(futureB.join());//futureB result:執行結果:50

  上面代碼展示了正常流程和出現異常的情況,可以理解成catch,根據返回值可以體會下.


 whenComplete(..)

public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

  功能:當CompletableFuture的計算結果完成,或者拋出異常的時候,都可以進入whenComplete方法執行,舉個栗子

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .whenComplete((s, e) -> {
                    if (s != null) {
                        System.out.println(s);//未執行
                    }
                    if (e == null) {
                        System.out.println(s);//未執行
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                })
                .exceptionally(e -> {
                    System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
             return "futureA result: 100"; }); 
System.out.println(futureA.join());
//futureA result: 100

  根據控制台,我們可以看出執行流程是這樣,supplyAsync->whenComplete->exceptionally,可以看出並沒有進入thenApply執行,原因也顯而易見,在supplyAsync中出現了異常,thenApply只有當正常返回時才會去執行.而whenComplete不管是否正常執行,還要注意一點,whenComplete是沒有返回值的.

  上面代碼我們使用了函數式的編程風格並且先調用whenComplete再調用exceptionally,如果我們先調用exceptionally,再調用whenComplete會發生什么呢,我們看一下:

CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .whenComplete((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());//未執行
                    }
                })
                ;
System.out.println(futureA.join());//futureA result: 100

  代碼先執行了exceptionally后執行whenComplete,可以發現,由於在exceptionally中對異常進行了處理,並返回了默認值,whenComplete中接收到的結果是一個正常的結果,被exceptionally美化過的結果,這一點需要留意一下.

handle(..)

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

  功能:當CompletableFuture的計算結果完成,或者拋出異常的時候,可以通過handle方法對結果進行處理

 CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero
                    return "futureA result: 100";
                })
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//futureA result: 100
                    } else {
                        System.out.println(e.getMessage());//未執行
                    }
                    return "handle result:" + (s == null ? "500" : s);
                });
System.out.println(futureA.join());//handle result:futureA result: 100

  通過控制台,我們可以看出,最后打印的是handle result:futureA result: 100,執行exceptionally后對異常進行了"美化",返回了默認值,那么handle得到的就是一個正常的返回,我們再試下,先調用handle再調用exceptionally的情況.

 CompletableFuture<String> futureA = CompletableFuture.
                supplyAsync(() -> "執行結果:" + (100 / 0))
                .thenApply(s -> "apply result:" + s)
                .handle((s, e) -> {
                    if (e == null) {
                        System.out.println(s);//未執行
                    } else {
                        System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
                    }
                    return "handle result:" + (s == null ? "500" : s);
                })
                .exceptionally(e -> {
                    System.out.println("ex:" + e.getMessage()); //未執行
                    return "futureA result: 100";
                });
System.out.println(futureA.join());//handle result:500

  根據控制台輸出,可以看到先執行handle,打印了異常信息,並對接過設置了默認值500,exceptionally並沒有執行,因為它得到的是handle返回給它的值,由此我們大概推測handle和whenComplete的區別

   1.都是對結果進行處理,handle有返回值,whenComplete沒有返回值

   2.由於1的存在,使得handle多了一個特性,可在handle里實現exceptionally的功能

allOf(..)  anyOf(..)

public static CompletableFuture<Void>  allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>  anyOf(CompletableFuture<?>... cfs)

  allOf:當所有的CompletableFuture都執行完后執行計算

  anyOf:最快的那個CompletableFuture執行完之后執行計算

  場景二:查詢一個商品詳情,需要分別去查商品信息,賣家信息,庫存信息,訂單信息等,這些查詢相互獨立,在不同的服務上,假設每個查詢都需要一到兩秒鍾,要求總體查詢時間小於2秒.

public static void main(String[] args) throws Exception {

        ExecutorService executorService = Executors.newFixedThreadPool(4);

        long start = System.currentTimeMillis();
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品詳情";
        },executorService);

        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "賣家信息";
        },executorService);

        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "庫存信息";
        },executorService);

        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000 + RandomUtils.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "訂單信息";
        },executorService);

        CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
        allFuture.join();

        System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());
        System.out.println("總耗時:" + (System.currentTimeMillis() - start));
    }

參考資料:

  https://colobu.com/2016/02/29/Java-CompletableFuture/#Either

  https://blog.csdn.net/qq_36597450/article/details/81232051


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM