關於CompletableFuture的一切,看這篇文章就夠了



java中CompletableFuture的使用

之前的文章中,我們講解了Future, 本文我們將會繼續講解java 8中引入的CompletableFuture的用法。

CompletableFuture首先是一個Future,它擁有Future所有的功能,包括獲取異步執行結果,取消正在執行的任務等。

除此之外,CompletableFuture還是一個CompletionStage。

我們看下CompletableFuture的定義:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

什么是CompletionStage呢?

在異步程序中,如果將每次的異步執行都看成是一個stage的話,我們通常很難控制異步程序的執行順序,在javascript中,我們需要在回調中執行回調。這就會形成傳說中的回調地獄。

好在在ES6中引入了promise的概念,可以將回調中的回調轉寫為鏈式調用,從而大大的提升了程序的可讀性和可寫性。

同樣的在java中,我們使用CompletionStage來實現異步調用的鏈式操作。

CompletionStage定義了一系列的then*** 操作來實現這一功能。

CompletableFuture作為Future使用

調用CompletableFuture.complete方法可以立馬返回結果,我們看下怎么使用這個方法來構建一個基本的Future:

    public Future<String> calculateAsync() throws InterruptedException {
        CompletableFuture<String> completableFuture
                = new CompletableFuture<>();

        Executors.newCachedThreadPool().submit(() -> {
            Thread.sleep(500);
            completableFuture.complete("Hello");
            return null;
        });

        return completableFuture;
    }

上面我們通過調動ExecutorService來提交一個任務從而得到一個Future。如果你知道執行的結果,那么可以使用CompletableFuture的completedFuture方法來直接返回一個Future。

    public Future<String> useCompletableFuture(){
        Future<String> completableFuture =
                CompletableFuture.completedFuture("Hello");
        return completableFuture;
    }

CompletableFuture還提供了一個cancel方法來立馬取消任務的執行:

    public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(500);
        completableFuture.cancel(false);
        return null;
    });
    return completableFuture;
    }

如果這個時候調用Future的get方法,將會報CancellationException異常。

Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException

異步執行code

CompletableFuture提供了runAsync和supplyAsync的方法,可以以異步的方式執行代碼。

我們看一個runAsync的基本應用,接收一個Runnable參數:

    public  void runAsync(){
        CompletableFuture<Void> runAsync= CompletableFuture.runAsync(()->{
            log.info("runAsync");
        });
    }

而supplyAsync接受一個Supplier:

    public void supplyAsync(){
        CompletableFuture<String> supplyAsync=CompletableFuture.supplyAsync(()->{
            return "supplyAsync";
        });
    }

他們兩個的區別是一個沒有返回值,一個有返回值。

組合Futures

上面講到CompletableFuture的一個重大作用就是將回調改為鏈式調用,從而將Futures組合起來。

而鏈式調用的返回值還是CompletableFuture,我們看一個thenCompose的例子:

CompletableFuture<String> completableFuture 
  = CompletableFuture.supplyAsync(() -> "Hello")
    .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

thenCompose將前一個Future的返回結果作為后一個操作的輸入。

如果我們想合並兩個CompletableFuture的結果,則可以使用thenCombine:

    public void thenCombine(){
        CompletableFuture<String> completableFuture
                = CompletableFuture.supplyAsync(() -> "Hello")
                .thenCombine(CompletableFuture.supplyAsync(
                        () -> " World"), (s1, s2) -> s1 + s2));
    }

如果你不想返回結果,則可以使用thenAcceptBoth:

    public void thenAcceptBoth(){
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
                        (s1, s2) -> System.out.println(s1 + s2));
    }

thenApply() 和 thenCompose()的區別

thenApply()和thenCompose()兩個方法都可以將CompletableFuture連接起來,但是兩個有點不一樣。

thenApply()接收的是前一個調用返回的結果,然后對該結果進行處理。

thenCompose()接收的是前一個調用的stage,返回flat之后的的CompletableFuture。

簡單點比較,兩者就像是map和flatMap的區別。

並行執行任務

當我們需要並行執行任務時,通常我們需要等待所有的任務都執行完畢再去處理其他的任務,那么我們可以用到CompletableFuture.allOf方法:

    public void allOf(){
        CompletableFuture<String> future1
                = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2
                = CompletableFuture.supplyAsync(() -> "Beautiful");
        CompletableFuture<String> future3
                = CompletableFuture.supplyAsync(() -> "World");

        CompletableFuture<Void> combinedFuture
                = CompletableFuture.allOf(future1, future2, future3);
    }

allOf只保證task全都執行,而並沒有返回值,如果希望帶有返回值,我們可以使用join:

    public void join(){
        CompletableFuture<String> future1
                = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2
                = CompletableFuture.supplyAsync(() -> "Beautiful");
        CompletableFuture<String> future3
                = CompletableFuture.supplyAsync(() -> "World");

        String combined = Stream.of(future1, future2, future3)
                .map(CompletableFuture::join)
                .collect(Collectors.joining(" "));
    }

上面的程序將會返回:“Hello Beautiful World”。

異常處理

如果在鏈式調用的時候拋出異常,則可以在最后使用handle來接收:

    public void handleError(){
        String name = null;

        CompletableFuture<String> completableFuture
                =  CompletableFuture.supplyAsync(() -> {
            if (name == null) {
                throw new RuntimeException("Computation error!");
            }
            return "Hello, " + name;
        }).handle((s, t) -> s != null ? s : "Hello, Stranger!");
    }

這和Promise中的catch方法使用類似。

本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/CompletableFuture

更多教程請參考 flydean的博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM