一、CompletableFuture 簡介
CompletableFuture 在 Java 里面被用於異步編程,異步通常意味着非阻塞,可以使得我們的任務單獨運行在與主線程分離的其他線程中,並且通過回調可
以在主線程中得到異步任務的執行狀態,是否完成,和是否異常等信息。
CompletableFuture 實現了 Future, CompletionStage 接口,實現了 Future 接口就可以兼容現在有線程池框架,而 CompletionStage 接口才是異步編程
的接口抽象,里面定義多種異步方法,通過這兩者集合,從而打造出了強大的CompletableFuture 類。
二、Future 與 CompletableFuture
Futrue 在 Java 里面,通常用來表示一個異步任務的引用,比如我們將任務提交到線程池里面,然后我們會得到一個 Futrue,在 Future 里面有 isDone 方
法來 判斷任務是否處理結束,還有 get 方法可以一直阻塞直到任務結束然后獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或
者不斷輪詢才能知道任務是否完成。
Future 的主要缺點如下:
(1)不支持手動完成
我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果通知到正在執行的線程,所以必須主動取消或者一直
等待它執行完成。
(2)不支持進一步的非阻塞調用
通過 Future 的 get 方法會一直阻塞到任務完成,但是想在獲取任務之后執行額外的任務,因為 Future 不支持回調函數,所以無法實現這個功能
(3)不支持鏈式調用
對於 Future 的執行結果,我們想繼續傳到下一個 Future 處理使用,從而形成一個鏈式的 pipline 調用,這在 Future 中是沒法實現的
(4)不支持多個 Future 合並
比如我們有 10 個 Future 並行執行,我們想在所有的 Future 運行完畢之后,執行某些函數,是沒法通過 Future 實現的
(5)不支持異常處理
Future 的 API 沒有任何的異常處理的 api,所以在異步運行時,如果出了問題是不好定位的
三、CompletableFuture 入門
1、使用 CompletableFuture
場景:主線程里面創建一個 CompletableFuture,然后主線程調用 get 方法會阻塞,最后我們在一個子線程中使其終止。
/** * 主線程里面創建一個 CompletableFuture,然后主線程調用 get 方法會阻塞,最后我們 在一個子線程中使其終止 * @param args */
public static void main(String[] args) throws Exception{ CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try{ System.out.println(Thread.currentThread().getName() + "子線程開始干活"); //子線程睡 5 秒
Thread.sleep(5000); //在子線程中完成主線程
future.complete("success"); }catch (Exception e){ e.printStackTrace(); } }, "A").start(); //主線程調用 get 方法阻塞
System.out.println("主線程調用 get 方法獲取結果為: " + future.get()); System.out.println("主線程完成,阻塞結束!!!!!!"); }
2、沒有返回值的異步任務
public class CompletableFutureDemo { public static void main(String[] args) throws Exception { System.out.println("主線程開始"); //運行一個沒有返回值的異步任務
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{ try { System.out.println("子線程啟動干活"); System.out.println(Thread.currentThread().getName()+" : CompletableFuture1"); Thread.sleep(5000); System.out.println("子線程完成"); } catch (Exception e) { e.printStackTrace(); } }); //主線程阻塞
completableFuture1.get(); System.out.println("主線程結束"); } }
3、有返回值的異步任務
public class CompletableFutureDemo2 { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主線程開始"); //運行一個有返回值的異步任務
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{ try { System.out.println("子線程開始任務"); System.out.println(Thread.currentThread().getName()+" : CompletableFuture2"); Thread.sleep(5000); //模擬異常
int i = 10/0; } catch (Exception e) { e.printStackTrace(); } return 1024; }); completableFuture2.whenComplete((t,u)->{ System.out.println("------t="+t); //返回值信息
System.out.println("------u="+u); //方法中的異常信息
}).get(); //主線程阻塞
Integer s = completableFuture2.get(); System.out.println("主線程結束, 子線程的結果為:" + s); } }
4、線程依賴
當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。
private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任務開始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> { return num * num; }); Integer integer = future.get(); System.out.println("主線程結束, 子線程的結果為:" + integer); }
5、消費處理結果
thenAccept 消費處理結果, 接收任務的處理結果,並消費處理,無返回結果。
private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture.supplyAsync(() -> { try { System.out.println("加 10 任務開始"); num += 10; } catch (Exception e) { e.printStackTrace(); } return num; }).thenApply(integer -> { return num * num; }).thenAccept(new Consumer<Integer>() { @Override public void accept(Integer integer) { System.out.println("子線程全部處理完成,最后調用了 accept,結果為:" + integer); } }); }
6、異常處理
exceptionally 異常處理,出現異常時觸發
private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i= 1/0; System.out.println("加 10 任務開始"); num += 10; return num; }).exceptionally(ex -> { System.out.println(ex.getMessage()); return -1; }); System.out.println(future.get()); }
handle 類似於 thenAccept/thenRun 方法,是最后一步的處理調用,但是同時可以處理異常
private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }).handle((i,ex) ->{ System.out.println("進入 handle 方法"); if(ex != null){ System.out.println("發生了異常,內容為:" + ex.getMessage()); return -1; }else{ System.out.println("正常完成,內容為: " + i); return i; }}); System.out.println(future.get()); }
7、結果合並
thenCompose 合並兩個有依賴關系的 CompletableFutures 的執行結果
private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); //第一步加 10
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); //合並
CompletableFuture<Integer> future1 = future.thenCompose(i ->
//再來一個 CompletableFuture
CompletableFuture.supplyAsync(() -> { return i + 1; })); System.out.println(future.get()); System.out.println(future1.get()); }
thenCombine 合並兩個沒有依賴關系的 CompletableFutures 任務
private static Integer num = 10; public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {System.out.println("乘以 10 任務開始"); num = num * 10; return num; }); //合並兩個結果
CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() { @Override public List<Integer> apply(Integer a, Integer b) { List<Integer> list = new ArrayList<>(); list.add(a); list.add(b); return list; } }); System.out.println("合並結果為:" + future.get()); }
合並多個任務的結果 allOf 與 anyOf
allOf: 一系列獨立的 future 任務,等其所有的任務執行完后做一些事情
private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); List<CompletableFuture> list = new ArrayList<>(); CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { System.out.println("加 10 任務開始"); num += 10; return num; }); list.add(job1); CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { System.out.println("乘以 10 任務開始");num = num * 10; return num; }); list.add(job2); CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { System.out.println("減以 10 任務開始"); num = num * 10; return num; }); list.add(job3); CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { System.out.println("除以 10 任務開始"); num = num * 10; return num; }); list.add(job4); //多任務合並
List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList()); System.out.println(collect); }
anyOf: 只要在多個 future 里面有一個返回,整個任務就可以結束,而不需要等到每一個future 結束
private static Integer num = 10; /** * 先對一個數加 10,然后取平方 * @param args */
public static void main(String[] args) throws Exception{ System.out.println("主線程開始"); CompletableFuture<Integer>[] futures = new CompletableFuture[4]; CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(5000); System.out.println("加 10 任務開始");num += 10; return num; }catch (Exception e){ return 0; } }); futures[0] = job1; CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(2000); System.out.println("乘以 10 任務開始"); num = num * 10; return num; }catch (Exception e){ return 1; } }); futures[1] = job2; CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(3000); System.out.println("減以 10 任務開始"); num = num * 10; return num; }catch (Exception e){ return 2; } }); futures[2] = job3; CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> { try{ Thread.sleep(4000); System.out.println("除以 10 任務開始");num = num * 10; return num; }catch (Exception e){ return 3; } }); futures[3] = job4; CompletableFuture<Object> future = CompletableFuture.anyOf(futures); System.out.println(future.get()); }