第十三章:(1)CompletableFuture異步回調


一、CompletableFuture 簡介

   

 

 CompletableFuture 在 Java 里面被用於異步編程,異步通常意味着非阻塞,可以使得我們的任務單獨運行在與主線程分離的其他線程中,並且通過回調可
以在主線程中得到異步任務的執行狀態,是否完成,和是否異常等信息。

CompletableFuture 實現了 Future, CompletionStage 接口,實現了 Future 接口就可以兼容現在有線程池框架,而 CompletionStage 接口才是異步編程
的接口抽象,里面定義多種異步方法,通過這兩者集合,從而打造出了強大的CompletableFuture 類。

 

二、Future 與 CompletableFuture

  Futrue 在 Java 里面,通常用來表示一個異步任務的引用,比如我們將任務提交到線程池里面,然后我們會得到一個 Futrue,在 Future 里面有 isDone 方
法來 判斷任務是否處理結束,還有 get 方法可以一直阻塞直到任務結束然后獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或
者不斷輪詢才能知道任務是否完成。

  Future 的主要缺點如下:

  (1)不支持手動完成

    我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果通知到正在執行的線程,所以必須主動取消或者一直
等待它執行完成。

 

  (2)不支持進一步的非阻塞調用

    通過 Future 的 get 方法會一直阻塞到任務完成,但是想在獲取任務之后執行額外的任務,因為 Future 不支持回調函數,所以無法實現這個功能

  (3)不支持鏈式調用

    對於 Future 的執行結果,我們想繼續傳到下一個 Future 處理使用,從而形成一個鏈式的 pipline 調用,這在 Future 中是沒法實現的

 

  (4)不支持多個 Future 合並

    比如我們有 10 個 Future 並行執行,我們想在所有的 Future 運行完畢之后,執行某些函數,是沒法通過 Future 實現的

  (5)不支持異常處理

    Future 的 API 沒有任何的異常處理的 api,所以在異步運行時,如果出了問題是不好定位的

 

三、CompletableFuture 入門

1、使用 CompletableFuture

  場景:主線程里面創建一個 CompletableFuture,然后主線程調用 get 方法會阻塞,最后我們在一個子線程中使其終止。

    /** * 主線程里面創建一個 CompletableFuture,然后主線程調用 get 方法會阻塞,最后我們 在一個子線程中使其終止 * @param args */
    public static void main(String[] args) throws Exception{ CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try{ System.out.println(Thread.currentThread().getName() + "子線程開始干活"); //子線程睡 5 秒
                Thread.sleep(5000); //在子線程中完成主線程
                future.complete("success"); }catch (Exception e){ e.printStackTrace(); } }, "A").start(); //主線程調用 get 方法阻塞
        System.out.println("主線程調用 get 方法獲取結果為: " + future.get()); System.out.println("主線程完成,阻塞結束!!!!!!"); }

 

2、沒有返回值的異步任務

public class CompletableFutureDemo { public static void main(String[] args) throws Exception { System.out.println("主線程開始"); //運行一個沒有返回值的異步任務
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{ try { System.out.println("子線程啟動干活"); System.out.println(Thread.currentThread().getName()+" : CompletableFuture1"); Thread.sleep(5000); System.out.println("子線程完成"); } catch (Exception e) { e.printStackTrace(); } }); //主線程阻塞
 completableFuture1.get(); System.out.println("主線程結束"); } }

 

3、有返回值的異步任務

public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主線程開始"); //運行一個有返回值的異步任務
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{ try { System.out.println("子線程開始任務"); System.out.println(Thread.currentThread().getName()+" : CompletableFuture2"); Thread.sleep(5000); //模擬異常
                int i = 10/0; } catch (Exception e) { e.printStackTrace(); } return 1024; }); completableFuture2.whenComplete((t,u)->{ System.out.println("------t="+t);  //返回值信息
            System.out.println("------u="+u);  //方法中的異常信息
 }).get(); //主線程阻塞
        Integer s = completableFuture2.get(); System.out.println("主線程結束, 子線程的結果為:" + s); } }

 

4、線程依賴

  當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。

    private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
    public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任務開始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> { return num * num; }); Integer integer = future.get(); System.out.println("主線程結束, 子線程的結果為:" + integer); }

 

5、消費處理結果

  thenAccept 消費處理結果, 接收任務的處理結果,並消費處理,無返回結果。

    private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任務開始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> { return num * num; }).thenAccept(new Consumer<Integer>() { @Override public void accept(Integer integer) { System.out.println("子線程全部處理完成,最后調用了 accept,結果為:" + integer); } }); }

 

6、異常處理

  exceptionally 異常處理,出現異常時觸發

    private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i= 1/0; System.out.println("加 10 任務開始"); num += 10; return num; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println(future.get()); }

 

  handle 類似於 thenAccept/thenRun 方法,是最后一步的處理調用,但是同時可以處理異常

    private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }).handle((i,ex) ->{ System.out.println("進入 handle 方法"); if(ex != null){ System.out.println("發生了異常,內容為:" + ex.getMessage()); return -1; }else{ System.out.println("正常完成,內容為: " + i); return i; }}); System.out.println(future.get()); }

 

7、結果合並

  thenCompose 合並兩個有依賴關系的 CompletableFutures 的執行結果

    private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); //第一步加 10
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); //合並
        CompletableFuture<Integer> future1 = future.thenCompose(i ->
                //再來一個 CompletableFuture
                CompletableFuture.supplyAsync(() -> { return i + 1; })); System.out.println(future.get()); System.out.println(future1.get()); }

 

  thenCombine 合並兩個沒有依賴關系的 CompletableFutures 任務

    private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {System.out.println("乘以 10 任務開始"); num = num * 10; return num; }); //合並兩個結果
        CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() { @Override public List<Integer> apply(Integer a, Integer b) { List<Integer> list = new ArrayList<>(); list.add(a); list.add(b); return list; } }); System.out.println("合並結果為:" + future.get()); }

 

  合並多個任務的結果 allOf 與 anyOf
  allOf: 一系列獨立的 future 任務,等其所有的任務執行完后做一些事情

    private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
    public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); List<CompletableFuture> list = new ArrayList<>(); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); list.add(job1); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任務開始");num = num * 10; return num; }); list.add(job2); CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { System.out.println("減以 10 任務開始"); num = num * 10; return num; }); list.add(job3); CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除以 10 任務開始"); num = num * 10; return num; }); list.add(job4); //多任務合並
        List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList()); System.out.println(collect); }

 

  anyOf: 只要在多個 future 里面有一個返回,整個任務就可以結束,而不需要等到每一個future 結束

    private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
    public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4]; CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(5000); System.out.println("加 10 任務開始");num += 10; return num; }catch (Exception e){ return 0; } }); futures[0] = job1; CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(2000); System.out.println("乘以 10 任務開始"); num = num * 10; return num; }catch (Exception e){ return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(3000); System.out.println("減以 10 任務開始"); num = num * 10; return num; }catch (Exception e){ return 2; } }); futures[2] = job3; CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(4000); System.out.println("除以 10 任務開始");num = num * 10; return num; }catch (Exception e){ return 3; } }); futures[3] = job4; CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); }

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM