Java8 CompletableFuture處理多個異步任務


CompletableFuture

Java5引入了Future和 FutureTask,用於異步處理。Future可以通過get()方法獲取異步的返回值。
在Java8引入了CompletableFuture,CompletableFuture不僅實現了Future接口, 還實現了CompletionStage接口。
CompletableFuture實現了CompletionStage接口,重寫thenApply()、thenCombine()等方法。

CompletableFuture類能夠處理多個異步任務,還能處理異步回調。還能完成以下操作:

  • 將兩個異步計算合並為一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果。
  • 等待 Future 集合中的所有任務都完成。
  • 僅等待 Future集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同一個值),並返回它的結果。
  • 通過編程方式完成一個Future任務的執行(即以手工設定異步操作結果的方式)。
  • 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future 計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)

構建CompletableFuture

構建CompletableFuture主要有兩種方式,runAsync和supplyAsync。

  • runAsync

runAsync異步處理任務,使用Runnable,構建CompletableFuture。

runAsync方法的參數有兩種形式,一種是使用默認的ForkJoinPool,另一種是使用自定義的線程池。
第一種源碼如下:

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

注意,默認的ForkJoinPool的線程是守護線程,當主線程結束時,ForkJoinPool的線程也會隨之結束,會影響異步任務的執行。
因此,建議使用自定義的線程池。

第二種源碼如下:

    public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

此處的Executor executor就是指線程池。

示例如下:

    public static void runAsyncTest2()  {
        //該線程池僅用於示例,實際建議使用自定義的線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("completableFuture runAsync2.");
        }, executorService);
        //        executorService.shutdown();

    }
  • supplyAsync

supplyAsync同樣可以構建CompletableFuture。
supplyAsync跟runAsync的區別,主要是supplyAsync有返回值。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

此處的參數supplier其實是一個匿名對象,實現了Supplier 接口,重寫get()方法並返回值。

@FunctionalInterface
public interface Supplier<T> {
    T get();
}

Supplier是函數式接口,關於函數式接口的理解,詳情見參考資料: java8 函數式接口

示例如下:

    /**
     *   supplyAsync使用線程池,構建CompletableFuture
     *
     */
    public static void supplyAsyncTest2() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyAsyncTest2");
            return "completableFuture結果";
        }, executorService);
//        executorService.shutdown();
    }

獲取CompletableFuture任務的結果

T get() throws InterruptedException, ExecutionException: 獲取返回值,會阻塞,還需要處理受檢的異常

V get(long timeout,Timeout unit):可以設置阻塞時間,unit為時間的單位(秒/分/時之類)

T getNow(T defaultValue):表示當有了返回結果時會返回結果,如果異步線程拋了異常會返回設置的默認值.

T join():獲取返回值,會阻塞

在實際編程中,不建議使用CompletableFuture的 get() 方法,
最好用get(long timeout,Timeout unit) 設置超時時間,這樣不會一直阻塞。

    public static void supplyAsyncGet()  {
        //該線程池僅用於示例,實際建議使用自定義的線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()
                -> runTask(), executorService);

        String result = null;
        try {
            //獲取返回值,2秒超時
            result = completableFuture.get(2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("InterruptedException error.", e);
        } catch (ExecutionException e) {
            logger.error("ExecutionException error.", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException error.", e);
        }
        logger.info("result:"+result);
    }

    private static String runTask() {
        try {
            //任務耗時。可以分別設置1000和3000,看未超時和超時的不同結果。
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            logger.error("supplyAsyncGet error.");
        }
        return "supplyAsyncGet";
    }

CompletableFuture任務執行中

thenApply(): 接收一個任務的前一階段的輸出作為本階段的輸入,該方法有一個參數,也有返回值。
通過thenApply(),可以在supplyAsync()異步完成后,馬上就使用supplyAsync()的返回值,不會阻塞。

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

thenAccept(): 接收一個任務的前一階段的輸出作為本階段的輸入,該方法返回值類型為Void,相當於沒有返回值。

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action)

thenRun(): 根本不關心一個任務的前一階段的輸出,它只負責運行新的Runnable任務,該方法返回值類型為Void,相當於沒有返回值。

    public CompletableFuture<Void> thenRun(Runnable action) 

示例如下:

    public static void thenApplyTest()  {
        CompletableFuture.supplyAsync(() -> 1)
                .thenApply(i -> i + 1).thenApply(i -> {
                    System.out.println("thenApplyTest結果為:"+i*i);
                    return i * i;
                });
    }

CompletableFuture任務完成后

whenComplete():任務完成后觸發,該方法有返回值。還有兩個參數,第一個參數是任務的返回值,第二個參數是異常。

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)

exceptionally():當運行出現異常時,調用該方法可進行一些補償操作,設置默認值.如果沒有異常,則不會觸發。

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

示例如下:

    public static void exceptionallyTest() {
        // num作為分母,如果改成1,.exceptionally()方法就不會執行
        Integer num = 0;
        CompletableFuture<Integer> exceptionCf = CompletableFuture.supplyAsync(() -> 1)
                .thenApply(i -> i / num)
                .whenComplete((i, e) -> {
                    System.out.println(i);
                })
                .exceptionally(e -> {
                    System.out.println("exceptionallyTest error. "+ e);
                    System.out.println("異常處理");
                    return 0;
                });
        Integer result = exceptionCf.join();
        System.out.println("exceptionCf結果為:"+result);
    }

多個CompletableFuture任務的組合

thenCombine(): 會將兩個任務(CompletableFuture)的執行結果作為方法入參傳遞到指定方法中,且該方法有返回值;

public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn)

thenAcceptBoth(): 同樣將兩個任務(CompletableFuture)的執行結果作為方法入參,但是是個Void的返回值,相當於沒有返回值;

 public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)

runAfterBoth(): 在兩個任務(CompletableFuture)之后執行,但沒有入參,而且是個Void的返回值,相當於沒有返回值。

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

示例如下:

    public static void thenCombineTest() {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()-> {
            System.out.println("任務1結束.");
            return "Hello";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()-> {
            System.out.println("任務2結束.");
            return "World";
        });

        cf1.thenCombine(cf2, (result1, result2) -> result1 + result2)
            .whenComplete((r,e)-> System.out.println("任務1的返回值加上任務2的返回值,結果為:"+r));

    }

CompletableFuture任務執行后

allOf就是所有任務都完成時觸發。但是是個Void的返回值,相當於沒有返回值。

  //...表示不定參數,可以有多個CompletableFuture參數
  public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

anyOf是當入參的completableFuture組中有一個任務執行完畢就返回。返回結果是第一個完成的任務的結果。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

示例如下:

    public static void allOfTest() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務1結束.");
            return "futureOneResult";
        }, executorService);
        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務2結束.");
            return "futureTwoResult";
        },executorService);
        CompletableFuture.allOf(cf1, cf2).thenRun(()->{
            System.out.println("任務1和任務2都完成了");
        });

//        CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
//        //返回結果是第一個完成的任務的結果
//        System.out.println(completableFuture.get());

    }

參考資料

《java8實戰》
https://www.jianshu.com/p/547d2d7761db
https://www.cnblogs.com/fingerboy/p/9948736.html
https://www.jianshu.com/p/6bac52527ca4


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM