編程老司機帶你玩轉 CompletableFuture 異步編程


本文從實例出發,介紹 CompletableFuture 基本用法。不過講的再多,不如親自上手練習一下。所以建議各位小伙伴看完,上機練習一把,快速掌握 CompletableFuture

個人博文地址:https://sourl.cn/s5MbCm

全文摘要:

  • Future VS CompletableFuture
  • CompletableFuture 基本用法

0x00. 前言

一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。 Java 提供 Runnable Future<V> 兩個接口用來實現異步任務邏輯。

雖然 Future<V> 可以獲取任務執行結果,但是獲取方式十方不變。我們不得不使用Future#get 阻塞調用線程,或者使用輪詢方式判斷 Future#isDone 任務是否結束,再獲取結果。

這兩種處理方式都不是很優雅,JDK8 之前並發類庫沒有提供相關的異步回調實現方式。沒辦法,我們只好借助第三方類庫,如 Guava,擴展 Future,增加支持回調功能。相關代碼如下:

雖然這種方式增強了 Java 異步編程能力,但是還是無法解決多個異步任務需要相互依賴的場景。

舉一個生活上的例子,假如我們需要出去旅游,需要完成三個任務:

  • 任務一:訂購航班
  • 任務二:訂購酒店
  • 任務三:訂購租車服務

很顯然任務一和任務二沒有相關性,可以單獨執行。但是任務三必須等待任務一與任務二結束之后,才能訂購租車服務。

為了使任務三時執行時能獲取到任務一與任務二執行結果,我們還需要借助 CountDownLatch

0x01. CompletableFuture

JDK8 之后,Java 新增一個功能十分強大的類:CompletableFuture。單獨使用這個類就可以輕松的完成上面的需求:

大家可以先不用管 CompletableFuture 相關 API,下面將會具體講解。

對比 Future<V>CompletableFuture 優點在於:

  • 不需要手工分配線程,JDK 自動分配
  • 代碼語義清晰,異步任務鏈式調用
  • 支持編排異步任務

怎么樣,是不是功能很強大?接下來抓穩了,小黑哥要發車了。

1.1 方法一覽

首先來通過 IDE 查看下這個類提供的方法:

稍微數一下,這個類總共有 50 多個方法,我的天。。。

不過也不要怕,小黑哥幫你們歸納好了,跟着小黑哥的節奏,帶你們掌握 CompletableFuture

若圖片不清晰,可以關注『程序通事』,回復:『233』,獲取該思維導圖

1.2 創建 CompletableFuture 實例

創建 CompletableFuture 對象實例我們可以使用如下幾個方法:

第一個方法創建一個具有默認結果的 CompletableFuture,這個沒啥好講。我們重點講述下下面四個異步方法。

前兩個方法 runAsync 不支持返回值,而 supplyAsync可以支持返回結果。

這個兩個方法默認將會使用公共的 ForkJoinPool 線程池執行,這個線程池默認線程數是 CPU 的核數。

可以設置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數

使用共享線程池將會有個弊端,一旦有任務被阻塞,將會造成其他任務沒機會執行。所以強烈建議使用后兩個方法,根據任務類型不同,主動創建線程池,進行資源隔離,避免互相干擾。

1.3 設置任務結果

CompletableFuture 提供以下方法,可以主動設置任務結果。

 boolean complete(T value)
 boolean completeExceptionally(Throwable ex)

第一個方法,主動設置 CompletableFuture 任務執行結果,若返回 true,表示設置成功。如果返回 false,設置失敗,這是因為任務已經執行結束,已經有了執行結果。

示例代碼如下:

// 執行異步任務
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
  System.out.println("cf 任務執行開始");
  sleep(10, TimeUnit.SECONDS);
  System.out.println("cf 任務執行結束");
  return "樓下小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
  sleep(5, TimeUnit.SECONDS);
  System.out.println("主動設置 cf 任務結果");
  // 設置任務結果,由於 cf 任務未執行結束,結果返回 true
  cf.complete("程序通事");
});
// 由於 cf 未執行結束,將會被阻塞。5 秒后,另外一個線程主動設置任務結果
System.out.println("get:" + cf.get());
// 等待 cf 任務執行結束
sleep(10, TimeUnit.SECONDS);
// 由於已經設置任務結果,cf 執行結束任務結果將會被拋棄
System.out.println("get:" + cf.get());
/***
   * cf 任務執行開始
   * 主動設置 cf 任務結果
   * get:程序通事
   * cf 任務執行結束
   * get:程序通事
*/

這里需要注意一點,一旦 complete 設置成功,CompletableFuture 返回結果就不會被更改,即使后續 CompletableFuture 任務執行結束。

第二個方法,給 CompletableFuture 設置異常對象。若設置成功,如果調用 get 等方法獲取結果,將會拋錯。

示例代碼如下:

// 執行異步任務
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
    System.out.println("cf 任務執行開始");
    sleep(10, TimeUnit.SECONDS);
    System.out.println("cf 任務執行結束");
    return "樓下小黑哥";
});
//
Executors.newSingleThreadScheduledExecutor().execute(() -> {
    sleep(5, TimeUnit.SECONDS);
    System.out.println("主動設置 cf 異常");
    // 設置任務結果,由於 cf 任務未執行結束,結果返回 true
    cf.completeExceptionally(new RuntimeException("啊,掛了"));
});
// 由於 cf 未執行結束,前 5 秒將會被阻塞。后續程序拋出異常,結束
System.out.println("get:" + cf.get());
/***
 * cf 任務執行開始
 * 主動設置 cf 異常
 * java.util.concurrent.ExecutionException: java.lang.RuntimeException: 啊,掛了
 * ......
 */

1.4 CompletionStage

CompletableFuture 分別實現兩個接口 FutureCompletionStage

Future 接口大家都比較熟悉,這里主要講講 CompletionStage

CompletableFuture 大部分方法來自CompletionStage 接口,正是因為這個接口,CompletableFuture才有如從強大功能。

想要理解 CompletionStage 接口,我們需要先了解任務的時序關系的。我們可以將任務時序關系分為以下幾種:

  • 串行執行關系
  • 並行執行關系
  • AND 匯聚關系
  • OR 匯聚關系

1.5 串行執行關系

任務串行執行,下一個任務必須等待上一個任務完成才可以繼續執行。

CompletionStage 有四組接口可以描述串行這種關系,分別為:

thenApply 方法需要傳入核心參數為 Function<T,R>類型。這個類核心方法為:

 R apply(T t)

所以這個接口將會把上一個任務返回結果當做入參,執行結束將會返回結果。

thenAccept 方法需要傳入參數對象為 Consumer<T>類型,這個類核心方法為:

void accept(T t)

返回值 void 可以看出,這個方法不支持返回結果,但是需要將上一個任務執行結果當做參數傳入。

thenRun 方法需要傳入參數對象為 Runnable 類型,這個類大家應該都比較熟悉,核心方法既不支持傳入參數,也不會返回執行結果。

thenCompose 方法作用與 thenApply 一樣,只不過 thenCompose 需要返回新的 CompletionStage。這么理解比較抽象,可以集合代碼一起理解。

方法中帶有 Async ,代表可以異步執行,這個系列還有重載方法,可以傳入自定義的線程池,上圖未展示,讀者只可以自行查看 API。

最后我們通過代碼展示 thenApply 使用方式:

CompletableFuture<String> cf
        = CompletableFuture.supplyAsync(() -> "hello,樓下小黑哥")// 1
        .thenApply(s -> s + "@程序通事") // 2
        .thenApply(String::toUpperCase); // 3
System.out.println(cf.join());
// 輸出結果 HELLO,樓下小黑哥@程序通事

這段代碼比較簡單,首先我們開啟一個異步任務,接着串行執行后續兩個任務。任務 2 需要等待任務1 執行完成,任務 3 需要等待任務 2。

上面方法,大家需要記住了 Function<T,R>Consumer<T>Runnable 三者區別,根據場景選擇使用。

1.6 AND 匯聚關系

AND 匯聚關系代表所有任務完成之后,才能進行下一個任務。

如上所示,只有任務 A 與任務 B 都完成之后,任務 C 才會開始執行。

CompletionStage 有以下接口描述這種關系。

thenCombine 方法核心參數 BiFunction ,作用與 Function一樣,只不過 BiFunction 可以接受兩個參數,而 Function 只能接受一個參數。

thenAcceptBoth 方法核心參數BiConsumer 作用也與 Consumer一樣,不過其需要接受兩個參數。

runAfterBoth 方法核心參數最簡單,上面已經介紹過,不再介紹。

這三組方法只能完成兩個任務 AND 匯聚關系,如果需要完成多個任務匯聚關系,需要使用 CompletableFuture#allOf,不過這里需要注意,這個方法是不支持返回任務結果。

AND 匯聚關系相關示例代碼,開頭已經使用過了,這里再粘貼一下,方便大家理解:

1.7 OR 匯聚關系

有 AND 匯聚關系,當然也存在 OR 匯聚關系。OR 匯聚關系代表只要多個任務中任一任務完成,就可以接着接着執行下一任務。

CompletionStage 有以下接口描述這種關系:

前面三組接口方法傳參與 AND 匯聚關系一致,這里也不再詳細解釋了。

當然 OR 匯聚關系可以使用 CompletableFuture#anyOf 執行多個任務。

下面示例代碼展示如何使用 applyToEither 完成 OR 關系。

CompletableFuture<String> cf
        = CompletableFuture.supplyAsync(() -> {
    sleep(5, TimeUnit.SECONDS);
    return "hello,樓下小黑哥";
});// 1

CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
    sleep(3, TimeUnit.SECONDS);
    return "hello,程序通事";
});
// 執行 OR 關系
CompletableFuture<String> cf3 = cf2.applyToEither(cf, s -> s);

// 輸出結果,由於 cf2 只休眠 3 秒,優先執行完畢
System.out.println(cf2.join());
// 結果:hello,程序通事

1.8 異常處理

CompletableFuture 方法執行過程若產生異常,當調用 getjoin 獲取任務結果才會拋出異常。

上面代碼我們顯示使用 try..catch 處理上面的異常。不過這種方式不太優雅,CompletionStage 提供幾個方法,可以優雅處理異常。

exceptionally 使用方式類似於 try..catchcatch代碼塊中異常處理。

whenCompletehandle 方法就類似於 try..catch..finanllyfinally 代碼塊。無論是否發生異常,都將會執行的。這兩個方法區別在於 handle 支持返回結果。

下面示例代碼展示 handle 用法:

CompletableFuture<Integer>
        f0 = CompletableFuture.supplyAsync(() -> (7 / 0))
        .thenApply(r -> r * 10)
        .handle((integer, throwable) -> {
            // 如果異常存在,打印異常,並且返回默認值
            if (throwable != null) {
                throwable.printStackTrace();
                return 0;
            } else {
                // 如果
                return integer;
            }
        });


System.out.println(f0.join());
/**
 *java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 * .....
 * 
 * 0
 */

0x02. 總結

JDK8 提供 CompletableFuture 功能非常強大,可以編排異步任務,完成串行執行,並行執行,AND 匯聚關系,OR 匯聚關系。

不過這個類方法實在太多,且方法還需要傳入各種函數式接口,新手剛開始使用會直接會被弄懵逼。這里幫大家在總結一下三類核心參數的作用

  • Function 這類函數接口既支持接收參數,也支持返回值
  • Consumer 這類接口函數只支持接受參數,不支持返回值
  • Runnable 這類接口不支持接受參數,也不支持返回值

搞清楚函數參數作用以后,然后根據串行,AND 匯聚關系,OR 匯聚關系歸納一下相關方法,這樣就比較好理解了

最后再貼一下,文章開頭的思維導圖,希望對你有幫助。

0x03. 幫助文檔

  1. 極客時間-並發編程專欄
  2. https://colobu.com/2016/02/29/Java-CompletableFuture
  3. https://www.ibm.com/developerworks/cn/java/j-cf-of-jdk8/index.html

最后說一句(求關注)

CompletableFuture 很早之前就有關注,本以為跟 Future一樣,使用挺簡單,誰知道學的時候才發現好難。各種 API 方法看的頭有點大。

后來看到極客時間-『並發編程』專欄使用歸納方式分類 CompletableFuture 各種方法,一下子就看懂了。所這篇文章也參考這種歸納方式。

這篇文章找資料,整理一個星期,幸好今天順利產出。

看在小黑哥寫的這么辛苦的份上,點個關注吧,賞個贊唄。別下次一定啊,大哥!寫文章很辛苦的,需要來點正反饋。

才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。

感謝您的閱讀,我堅持原創,十分歡迎並感謝您的關注~

歡迎關注我的公眾號:程序通事,獲得日常干貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM