Java編程的邏輯 (94) - 組合式異步編程


本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接http://item.jd.com/12299018.html


前面兩節討論了Java 8中的函數式數據處理,那是對38節55節介紹的容器類的增強,它可以將對集合數據的多個操作以流水線的方式組合在一起。本節繼續討論Java 8的新功能,主要是一個新的類CompletableFuture,它是對65節83節介紹的並發編程的增強,它可以方便地將多個有一定依賴關系的異步任務以流水線的方式組合在一起,大大簡化多異步任務的開發。

之前介紹了那么多並發編程的內容,還有什么問題不能解決?CompletableFuture到底能解決什么問題?與之前介紹的內容有什么關系?具體如何使用?基本原理是什么?本節進行詳細討論,我們先來看它要解決的問題。

異步任務管理

在現代軟件開發中,系統功能越來越復雜,管理復雜度的方法就是分而治之,系統的很多功能可能會被切分為小的服務,對外提供Web API,單獨開發、部署和維護。比如,在一個電商系統中,可能有專門的產品服務、訂單服務、用戶服務、推薦服務、優惠服務、搜索服務等,在對外具體展示一個頁面時,可能要調用多個服務,而多個調用之間可能還有一定的依賴,比如,顯示一個產品頁面,需要調用產品服務,也可能需要調用推薦服務獲取與該產品有關的其他推薦,還可能需要調用優惠服務獲取該產品相關的促銷優惠,而為了調用優惠服務,可能需要先調用用戶服務以獲取用戶的會員級別。

另外,現代軟件經常依賴很多第三方的服務,比如地圖服務、短信服務、天氣服務、匯率服務等,在實現一個具體功能時,可能要訪問多個這樣的服務,這些訪問之間可能存在着一定的依賴關系。

為了提高性能,充分利用系統資源,這些對外部服務的調用一般都應該是異步的、盡量並發的。我們在77節介紹過異步任務執行服務,使用ExecutorService可以方便地提交單個獨立的異步任務,可以方便地在需要的時候通過Future接口獲取異步任務的結果,但對於多個尤其是有一定依賴關系的異步任務,這種支持就不夠了。

於是,就有了CompletableFuture,它是一個具體的類,實現了兩個接口,一個是Future,另一個是CompletionStage,Future表示異步任務的結果,而CompletionStage字面意思是完成階段,多個CompletionStage可以以流水線的方式組合起來,對於其中一個CompletionStage,它有一個計算任務,但可能需要等待其他一個或多個階段完成才能開始,它完成后,可能會觸發其他階段開始運行。CompletionStage提供了大量方法,使用它們,可以方便地響應任務事件,構建任務流水線,實現組合式異步編程。

具體怎么使用呢?下面我們會逐步說明,CompletableFuture也是一個Future,我們先來看與Future類似的地方。

與Future/FutureTask對比

基本的任務執行服務

我們先通過示例來簡要回顧下異步任務執行服務和Future,在異步任務執行服務中,用Callable或Runnable表示任務,以Callable為例,一個模擬的外部任務為:

private static Random rnd = new Random();

static int delayRandom(int min, int max) {
    int milli = max > min ? rnd.nextInt(max - min) : 0;
    try {
        Thread.sleep(min + milli);
    } catch (InterruptedException e) {
    }
    return milli;
}

static Callable<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};

externalTask表示外部任務,我們使用了Lambda表達式,不熟悉可以參看91節,delayRandom用於模擬延時。

假定有一個異步任務執行服務,其代碼為:

private static ExecutorService executor =
        Executors.newFixedThreadPool(10);

通過任務執行服務調用外部服務,一般返回Future,表示異步結果,示例代碼為:

public static Future<Integer> callExternalService(){
    return executor.submit(externalTask);
}

在主程序中,結合異步任務和本地調用的示例代碼為:

public static void master() {
    // 執行異步任務
    Future<Integer> asyncRet = callExternalService();

    // 執行其他任務 ...

    // 獲取異步任務的結果,處理可能的異常
    try {
        Integer ret = asyncRet.get();
        System.out.println(ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

基本的CompletableFuture

使用CompletableFuture可以實現類似功能,不過,它不支持使用Callable表示異步任務,而支持Runnable和Supplier,Supplier替代Callable表示有返回結果的異步任務,與Callale的區別是,它不能拋出受檢異常,如果會發生異常,可以拋出運行時異常。

使用Supplier表示異步任務,代碼與Callable類似,替換變量類型即可,即:

static Supplier<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};

使用CompletableFuture調用外部服務的代碼可以為:

public static Future<Integer> callExternalService(){
    return CompletableFuture.supplyAsync(externalTask, executor);
}

supplyAsync是一個靜態方法,其定義為:

public static <U> CompletableFuture<U> supplyAsync(
    Supplier<U> supplier, Executor executor)

它接受兩個參數supplier和executor,內部,它使用executor執行supplier表示的任務,返回一個CompletableFuture,調用后,任務被異步執行,這個方法立即返回。

supplyAsync還有一個不帶executor參數的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

沒有executor,任務被誰執行呢?與系統環境和配置有關,一般來說,如果可用的CPU核數大於2,會使用Java 7引入的Fork/Join任務執行服務,即ForkJoinPool.commonPool(),該任務執行服務背后的工作線程數一般為CPU核數減1,即Runtime.getRuntime().availableProcessors()-1,否則,會使用ThreadPerTaskExecutor,它會為每個任務創建一個線程。

對於CPU密集型的運算任務,使用Fork/Join任務執行服務是合適的,但對於一般的調用外部服務的異步任務,Fork/Join可能是不合適的,因為它的並行度比較低,可能會讓本可以並發的多任務串行運行,這時,應該提供Executor參數。

后面我們還會看到很多以Async結尾命名的方法,一般都有兩個版本,一個帶Executor參數,另一個不帶,其含義是相同的,就不再重復介紹了。

對於類型為Runnable的任務,構建CompletableFuture的方法為:

public static CompletableFuture<Void> runAsync(
    Runnable runnable)
public static CompletableFuture<Void> runAsync(
    Runnable runnable, Executor executor)

它與supplyAsync是類似的,具體就不贅述了。

CompletableFuture對Future的基本增強

Future有的接口,CompletableFuture都是支持的,不過,CompletableFuture還有一些額外的相關方法,比如:

public T join()
public boolean isCompletedExceptionally()
public T getNow(T valueIfAbsent)

join與get方法類似,也會等待任務結束,但它不會拋出受檢異常,如果任務異常結束了,join會將異常包裝為運行時異常CompletionException拋出。

Future有isDone方法檢查任務是否結束了,但不知道任務是正常結束還是異常結束,isCompletedExceptionally方法可以判斷任務是否是異常結束了。

getNow與join類似,區別是,如果任務還沒有結束,它不會等待,而是會返回傳入的參數valueIfAbsent。

進一步理解Future/CompletableFuture

前面例子都使用了任務執行服務,其實,任務執行服務與異步結果Future不是綁在一起的,可以自己創建線程返回異步結果,為進一步理解,我們看些示例。

使用FutureTask調用外部服務,代碼可以為:

public static Future<Integer> callExternalService() {
    FutureTask<Integer> future = new FutureTask<>(externalTask);
    new Thread() {
        public void run() {
            future.run();
        }
    }.start();
    return future;
}

內部自己創建了一個線程,線程調用FutureTask的run方法,我們在77節分析過FutureTask的代碼,run方法會調用externalTask的call方法,並保存結果或碰到的異常,喚醒等待結果的線程。

使用CompletableFuture,也可以直接創建線程,並返回異步結果,代碼可以為:

public static Future<Integer> callExternalService() {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    new Thread() {
        public void run() {
            try {
                future.complete(externalTask.get());
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        }
    }.start();
    return future;
}

這里使用了CompletableFuture的兩個方法:

public boolean complete(T value)
public boolean completeExceptionally(Throwable ex) 

這兩個方法顯式設置任務的狀態和結果,complete設置任務成功完成,結果為value,completeExceptionally設置任務異常結束,異常為ex。Future接口沒有對應的方法,FutureTask有相關方法但不是public的(是protected)。設置完后,它們都會觸發其他依賴它們的CompletionStage。具體會觸發什么呢?我們接下來再看。

響應結果或異常

使用Future,我們只能通過get獲取結果,而get可能會需要阻塞等待,而通過CompletionStage,可以注冊回調函數,當任務完成或異常結束時自動觸發執行,有兩類注冊方法,whenComplete和handle,我們分別來看下。

whenComplete

whenComplete的聲明為:

public CompletableFuture<T> whenComplete(
    BiConsumer<? super T, ? super Throwable> action)

參數action表示回調函數,不管前一個階段是正常結束還是異常結束,它都會被調用,函數類型是BiConsumer,接受兩個參數,第一個參數是正常結束時的結果值,第二個參數是異常結束時的異常,BiConsumer沒有返回值。whenComplete的返回值還是CompletableFuture,它不會改變原階段的結果,還可以在其上繼續調用其他函數。看個簡單的示例:

CompletableFuture.supplyAsync(externalTask).whenComplete((result, ex) -> {
    if (result != null) {
        System.out.println(result);
    }
    if (ex != null) {
        ex.printStackTrace();
    }
}).join();

result表示前一個階段的結果,ex表示異常,只可能有一個不為null。

whenComplete注冊的函數具體由誰執行呢?一般而言,這要看注冊時任務的狀態,如果注冊時任務還沒有結束,則注冊的函數會由執行任務的線程執行,在該線程執行完任務后執行注冊的函數,如果注冊時任務已經結束了,則由當前線程(即調用注冊函數的線程)執行。

如果不希望當前線程執行,避免可能的同步阻塞,可以使用其他兩個異步注冊方法:

public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
    BiConsumer<? super T, ? super Throwable> action, Executor executor)       

與前面介紹的以Async結尾的方法一樣,對第一個方法,注冊函數action會由默認的任務執行服務(即ForkJoinPool.commonPool()或ThreadPerTaskExecutor執行),對第二個方法,會由參數中指定的executor執行。

handle

whenComplete只是注冊回調函數,不改變結果,它返回了一個CompletableFuture,但這個CompletableFuture的結果與調用它的CompletableFuture是一樣的,還有一個類似的注冊方法handle,其聲明為:

public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn)

回調函數是一個BiFunction,也是接受兩個參數,一個是正常結果,另一個是異常,但BiFunction有返回值,在handle返回的CompletableFuture中,結果會被BiFunction的返回值替代,即使原來有異常,也會被覆蓋,比如:

String ret =
    CompletableFuture.supplyAsync(()->{
        throw new RuntimeException("test");
    }).handle((result, ex)->{
        return "hello";
    }).join();
System.out.println(ret);

輸出為"hello"。異步任務拋出了異常,但通過handle方法,改變了結果。

與whenComplete類似,handle也有對應的異步注冊方法handleAsync,具體我們就不探討了。

exceptionally

whenComplete和handle都是既響應正常完成也響應異常,如果只對異常感興趣,可以使用exceptionally,其聲明為:

public CompletableFuture<T> exceptionally(
    Function<Throwable, ? extends T> fn)

它注冊的回調函數是Function,接受的參數為異常,返回一個值,與handle類似,它也會改變結果,具體就不舉例了。

除了響應結果和異常,使用CompletableFuture,可以方便地構建有多種依賴關系的任務流,我們先來看簡單的依賴單一階段的情況。

構建依賴單一階段的任務流

thenRun

在一個階段正常完成后,執行下一個任務,看個簡單示例:

Runnable taskA = () -> System.out.println("task A");
Runnable taskB = () -> System.out.println("task B");
Runnable taskC = () -> System.out.println("task C");

CompletableFuture.runAsync(taskA)
    .thenRun(taskB)
    .thenRun(taskC)
    .join();

這里,有三個異步任務taskA, taskB和taskC,通過thenRun自然地描述了它們的依賴關系,thenRun是同步版本,有對應的異步版本thenRunAsync:

public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

在thenRun構建的任務流中,只有前一個階段沒有異常結束,下一個階段的任務才會執行,如果前一個階段發生了異常,所有后續階段都不會運行,結果會被設為相同的異常,調用join會拋出運行時異常CompletionException。

thenRun指定的下一個任務類型是Runnable,它不需要前一個階段的結果作為參數,也沒有返回值,所以,在thenRun返回的CompletableFuture中,結果類型為Void,即沒有結果。

thenAccept/thenApply

如果下一個任務需要前一個階段的結果作為參數,可以使用thenAccept或thenApply方法:

public CompletableFuture<Void> thenAccept(
    Consumer<? super T> action)
public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn)

thenAccept的任務類型是Consumer,它接受前一個階段的結果作為參數,沒有返回值。thenApply的任務類型是Function,接受前一個階段的結果作為參數,返回一個新的值,這個值會成為thenApply返回的CompletableFuture的結果值。看個簡單示例:

Supplier<String> taskA = () -> "hello";
Function<String, String> taskB = (t) -> t.toUpperCase();
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenApply(taskB)
    .thenAccept(taskC)
    .join();

taskA的結果是"hello",傳遞給了taskB,taskB轉換結果為"HELLO",再把結果給taskC,taskC進行了輸出,所以輸出為:

consume: HELLO

CompletableFuture中有很多名稱帶有run, accept或apply的方法,它們一般與任務的類型相對應,run與Runnable對應,accept與Consumer對應,apply與Function對應,后續就不贅述了。

thenCompose

與thenApply類似,還有一個方法thenCompose,聲明為:

public <U> CompletableFuture<U> thenCompose(
    Function<? super T, ? extends CompletionStage<U>> fn)

這個任務類型也是Function,也是接受前一個階段的結果,返回一個新的結果,不過,這個轉換函數fn的返回值類型是CompletionStage,也就是說,它的返回值也是一個階段,如果使用thenApply,結果就會變為CompletableFuture<CompletableFuture<U>>,而使用thenCompose,會直接返回fn返回的CompletionStage,thenCompose與thenApply的區別,就如同Stream API中flatMap與map的區別,看個簡單的示例:

Supplier<String> taskA = () -> "hello";
Function<String, CompletableFuture<String>> taskB = (t) ->
    CompletableFuture.supplyAsync(() -> t.toUpperCase());
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenCompose(taskB)
    .thenAccept(taskC)
    .join();

以上代碼中,taskB是一個轉換函數,但它自己也執行了異步任務,返回類型也是CompletableFuture,所以使用了thenCompose。

構建依賴兩個階段的任務流

依賴兩個都完成

thenRun, thenAccept, thenApply和thenCompose用於在一個階段完成后執行另一個任務,CompletableFuture還有一些方法用於在兩個階段都完成后執行另一個任務,方法是:

public CompletableFuture<Void> runAfterBoth(
    CompletionStage<?> other, Runnable action
public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn)
public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action) 

runAfterBoth對應的任務類型是Runnable,thenCombine對應的任務類型是BiFunction,接受前兩個階段的結果作為參數,返回一個結果,thenAcceptBoth對應的任務類型是BiConsumer,接受前兩個階段的結果作為參數,但不返回結果。它們都有對應的異步和帶Executor參數的版本,用於指定下一個任務由誰執行,具體就不贅述了。當前階段和參數指定的另一個階段other沒有依賴關系,並發執行,當兩個都執行結束后,開始執行指定的另一個任務。

看個簡單的示例,任務A和B執行結束后,執行任務C合並結果,代碼為:

Supplier<String> taskA = () -> "taskA";
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> "taskB");
BiFunction<String, String, String> taskC = (a, b) -> a + "," + b;

String ret = CompletableFuture.supplyAsync(taskA)
        .thenCombineAsync(taskB, taskC)
        .join();
System.out.println(ret);

輸出為:

taskA,taskB

依賴兩個階段中的一個

前面的方法要求兩個階段都完成后才執行下一個任務,如果只需要其中任意一個階段完成,可以使用下面的方法:

public CompletableFuture<Void> runAfterEither(
    CompletionStage<?> other, Runnable action)

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn)

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action)          

它們都有對應的異步和帶Executor參數的版本,用於指定下一個任務由誰執行,具體就不贅述了。當前階段和參數指定的另一個階段other沒有依賴關系,並發執行,只要當其中一個執行完了,就會啟動參數指定的另一個任務,具體就不贅述了。

構建依賴多個階段的任務流

如果依賴的階段不止兩個,可以使用如下方法:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

它們是靜態方法,基於多個CompletableFuture構建了一個新的CompletableFuture。

對於allOf,當所有子CompletableFuture都完成時,它才完成,如果有的CompletableFuture異常結束了,則新的CompletableFuture的結果也是異常,不過,它並不會因為有異常就提前結束,而是會等待所有階段結束,如果有多個階段異常結束,新的CompletableFuture中保存的異常是最后一個的。新的CompletableFuture會持有異常結果,但不會保存正常結束的結果,如果需要,可以從每個階段中獲取。看個簡單的示例:

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
    delayRandom(100, 1000);
    return "helloA";
}, executor);

CompletableFuture<Void> taskB = CompletableFuture.runAsync(() -> {
    delayRandom(2000, 3000);
}, executor);

CompletableFuture<Void> taskC = CompletableFuture.runAsync(() -> {
    delayRandom(30, 100);
    throw new RuntimeException("task C exception");
}, executor);

CompletableFuture.allOf(taskA, taskB, taskC).whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println(ex.getMessage());
    }
    if (!taskA.isCompletedExceptionally()) {
        System.out.println("task A " + taskA.join());
    }
});

taskC會首先異常結束,但新構建的CompletableFuture會等待其他兩個結束,都結束后,可以通過子階段(如taskA)的方法檢查子階段的狀態和結果。

對於anyOf返回的CompletableFuture,當第一個子CompletableFuture完成或異常結束時,它相應地完成或異常結束,結果與第一個結束的子CompletableFuture一樣,具體就不舉例了。

小結

本節介紹了Java 8中的組合式異步編程CompletableFuture:

  • 它是對Future的增強,但可以響應結果或異常事件,有很多方法構建異步任務流
  • 根據任務由誰執行,一般有三類對應方法,名稱不帶Async的方法由當前線程或前一個階段的線程執行,帶Async但沒有指定Executor的方法由默認Excecutor執行(ForkJoinPool.commonPool()或ThreadPerTaskExecutor),帶Async且指定Executor參數的方法由指定的Executor執行
  • 根據任務類型,一般也有三類對應方法,名稱帶run的對應Runnable,帶accept的對應Consumer,帶apply的對應Function

使用CompletableFuture,可以簡潔自然地表達多個異步任務之間的依賴關系和執行流程,大大簡化代碼,提高可讀性。

下一節,我們探討Java 8對日期和時間API的增強。

(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic,位於包shuo.laoma.java8.c94下)

----------------

未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM