- 1. CompletableFuture的介紹
- 2. Future與CompletableFuture對比
- 3. CompletableFuture常用方法
- 3.1. CompletableFuture#runAsync
- 3.2. CompletableFuture#supplyAsync
- 3.3. CompletableFuture#thenAccept
- 3.4. CompletableFuture#thenAcceptAsync
- 3.5. CompletableFuture#thenApply
- 3.6. CompletableFuture#thenCompose
- 3.7. CompletableFuture#thenCombine
- 3.8. CompletableFuture#allOf
- 3.9. CompletableFuture#anyOf
- 3.10. CompletableFuture#handle
- 3.11. Completable#exceptionally
- 3.12. CompletableFuture#complete
- 3.13. 注:Junit測試類中的公共方法
- 4. thenApply與thenCompose的區別
- 5. CompletableFuture常用方法總結
1. CompletableFuture的介紹
在Java8時被引入,在包java.util.concurrent下,是Java多線程編程中的一個類,擴展了Future中很多功能,CompletableFuture是一個實現了Future和CompletionStage接口。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
2. Future與CompletableFuture對比
1 Future#get阻塞方法,影響后續代碼執行,CompletableFuture可以設置callback的方式處理:CompletableFuture#thenAcceptAsync
2 CompletableFuture可以組合多個CompletableFuture:CompletableFuture#thenCompose、anyof
3 CompletableFuture優雅處理線程異常:CompletableFuture#handle、exceptionally
4 CompletableFuture可以手動設置為完成,即一個線程處理任務的時間過長,可以手動設置為完成,並設置返回值:CompletableFuture#complete
3. CompletableFuture常用方法
3.1. CompletableFuture#runAsync
/** * @see CompletableFuture#runAsync(Runnable) 接收一個Runnable參數 */ @Test public void runAsyncTest() { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(getCurrentThreadName() + "------runSync method"); }); // ForkJoinPool.commonPool-worker-9------runSync method }
3.2. CompletableFuture#supplyAsync
/** * @throws ExecutionException * @throws InterruptedException * @see CompletableFuture#supplyAsync(Supplier) 接受一個Supplier參數 */ @Test public void supplyAsyncTest() throws ExecutionException, InterruptedException { CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "return a string of supplyAsync method"); System.out.println(stringCompletableFuture.get()); // return a string of supplyAsync method }
3.3. CompletableFuture#thenAccept
/** * @see CompletableFuture#thenAccept(Consumer) runAync或者supplyAsync執行完后進行的操作(callback) */ @Test public void thenAcceptTest() { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { System.out.println(getCurrentThreadName() + "------runSync method"); }).thenAccept((consumer) -> { System.out.println(getCurrentThreadName() + "------thenAccept method"); }); System.out.println(getCurrentThreadName() + " End"); // ForkJoinPool.commonPool-worker-9------runSync method // main------thenAccept method // main End }
3.4. CompletableFuture#thenAcceptAsync
/** * @see CompletableFuture#thenAcceptAsync(Consumer) 異步callback */ @Test public void thenAcceptAsyncTest() throws InterruptedException { AtomicReference<Thread> threadAtomicReference = new AtomicReference<>(); CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> { threadAtomicReference.set(Thread.currentThread()); System.out.println(getCurrentThreadName() + "------runSync method"); }).thenAcceptAsync((consumer) -> { System.out.println(getCurrentThreadName() + "------thenAccept method"); getCurrentThread().notifyAll(); }); TimeUnit.SECONDS.sleep(1); System.out.println(getCurrentThreadName() + " End"); // ForkJoinPool.commonPool-worker-9------runSync method // ForkJoinPool.commonPool-worker-9------thenAccept method // main End }
3.5. CompletableFuture#thenApply
/** * @see CompletableFuture#thenApply(Function) */ @Test public void thenApply() { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> "supplyAsync method") .thenApply((s) -> s + " -> theApply method") .thenAccept(System.out::println); // supplyAsync method -> theApply method }
3.6. CompletableFuture#thenCompose
/** * @see CompletableFuture#thenCompose(Function) 作為兩個CompletableFuture組合使用 這里的Function<? super T, ? * extends CompletionStage<U>> T要轉換成CompletionStage對應的子類? extends * CompletionStage<U>,比如另一個CompletableFuture#supplyAsync返回值就是. */ @Test public void thenComposeTest() { CompletableFuture.supplyAsync(() -> getCurrentThreadName() + ": supplyAsync -> ") .thenCompose( (s) -> CompletableFuture.supplyAsync( () -> s + getCurrentThreadName() + "---theApply method -> ")) .thenAccept((f) -> System.out.println(f + getCurrentThreadName() + "---thenAccept")); // ForkJoinPool.commonPool-worker-9: supplyAsync -> ForkJoinPool.commonPool-worker-9---theApply // method -> main---thenAccept }
3.7. CompletableFuture#thenCombine
/** * @see CompletableFuture#thenCombine(CompletionStage, BiFunction) * 等兩個CompletableFuture完成后,對它們的返回值進行處理,也是對多個CompletableFuture進行組合 */ @Test public void thenCombineTest() { CompletableFuture.supplyAsync(() -> getCurrentThreadName() + ": the first return value") .thenCombine( CompletableFuture.supplyAsync( () -> getCurrentThreadName() + ": the second return value"), (p1, p2) -> { if (StringUtils.hasText(p1) && StringUtils.hasText(p2)) { return p1 + "\n" + p2 + "\n"; } else if (StringUtils.hasText(p1)) { return p1; } else if (StringUtils.hasText(p2)) { return p2; } else { return null; } }) .thenAccept((f) -> System.out.println(f + getCurrentThreadName() + "---thenAccept")); // ForkJoinPool.commonPool-worker-9: the first return value // ForkJoinPool.commonPool-worker-9: the second return value // main---thenAccept }
3.8. CompletableFuture#allOf
/** * 多個CompletableFuture,如果都結束了,就可以獲得它們的返回值,進行處理 * * @throws InterruptedException * @see CompletableFuture#allOf(CompletableFuture[]) */ @Test public void allOfTest() throws InterruptedException { CompletableFuture<String> diligent = CompletableFuture.supplyAsync( () -> { try { TimeUnit.MICROSECONDS.sleep(100); return getCurrentThreadName() + ": be a diligent man."; } catch (InterruptedException e) { e.printStackTrace(); return getCurrentThreadName() + ": missing diligent"; } }); CompletableFuture<String> studious = CompletableFuture.supplyAsync( () -> { try { TimeUnit.MICROSECONDS.sleep(100); return getCurrentThreadName() + ": be a studious man."; } catch (InterruptedException e) { e.printStackTrace(); return getCurrentThreadName() + ": missing studious"; } }); CompletableFuture<String> savvy = CompletableFuture.supplyAsync( () -> { try { TimeUnit.MICROSECONDS.sleep(100); return getCurrentThreadName() + ": be a savvy man."; } catch (InterruptedException e) { e.printStackTrace(); return getCurrentThreadName() + ": missing savvy"; } }); CompletableFuture<List<String>> allOf = CompletableFuture.allOf(savvy, diligent, studious) .thenApply( (n) -> { return Stream.of(savvy, diligent, studious) .map( (completableFuture) -> { try { // get every return string return completableFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return e.toString(); } }) .collect(Collectors.toList()); }); TimeUnit.SECONDS.sleep(1); // if you want to output every string (and above code of block of "thenApply" also can // forEach(System.out::println)) allOf.thenAccept((stringList) -> stringList.forEach(System.out::println)); // ForkJoinPool.commonPool-worker-11: be a savvy man. // ForkJoinPool.commonPool-worker-9: be a diligent man. // ForkJoinPool.commonPool-worker-2: be a studious man. }
3.9. CompletableFuture#anyOf
/** * anyOf方法,組合多個future,只要有一個結束就完成 * * @see CompletableFuture#anyOf(CompletableFuture[]) */ @Test public void anyOfTest() { CompletableFuture<People> peopleCompletableFuture = CompletableFuture.supplyAsync(People::new); CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "a string"); CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> 1); CompletableFuture.anyOf( stringCompletableFuture, peopleCompletableFuture, integerCompletableFuture) .thenAccept(System.out::println); }
3.10. CompletableFuture#handle
/** * handle方法無論是否發生異常,都會調用,可以在這里處理異常,另一個處理異常的方式:{@link #exceptionallyTest()} * * @see CompletableFuture#handle(BiFunction) * @throws InterruptedException */ @Test public void handleTest() throws InterruptedException { final String string = ""; CompletableFuture.supplyAsync( () -> { if (!StringUtils.hasText(string)) { throw new NullPointerException("string is null"); } return string; }) .handle( (s, t) -> { if (t != null) { // log.error("handle method", t); log.error("handle method"); } return s; }); // For junit main thread stop after ForkJoinPool thread TimeUnit.SECONDS.sleep(1); System.out.println(getCurrentThreadName() + " stop"); // 18:30:47.837 [ForkJoinPool.commonPool-worker-9] ERROR // com.xy.java.basic.demos.completablefuture.CompletableFutureTest - handle method // main stop }
3.11. Completable#exceptionally
/** * 當發生異常時,進入exceptionally方法,另一個處理異常的方式:{@link #handleTest()} * * @see CompletableFuture#exceptionally(Function) * @throws InterruptedException * @throws ExecutionException */ @Test public void exceptionallyTest() throws InterruptedException, ExecutionException { final String string = ""; CompletableFuture.supplyAsync( () -> { if (!StringUtils.hasText(string)) { throw new NullPointerException("string is null"); } return Optional.ofNullable(string); }) .exceptionally( (t) -> { // log.error("exceptionally method", t); log.error("exceptionally method"); return Optional.empty(); }); // For junit main thread stop after ForkJoinPool thread TimeUnit.SECONDS.sleep(1); System.out.println(getCurrentThreadName() + " stop"); // 18:27:29.451 [ForkJoinPool.commonPool-worker-9] ERROR // com.xy.java.basic.demos.completablefuture.CompletableFutureTest - exceptionally method // main stop }
3.12. CompletableFuture#complete
/** * 手動完成一個耗時很長的Future,並且設置默認值 * * @throws InterruptedException * @throws ExecutionException */ @Test public void completeTest() throws InterruptedException, ExecutionException { CompletableFuture<Boolean> runAsync = CompletableFuture.supplyAsync( () -> { try { TimeUnit.SECONDS.sleep(3); return true; } catch (InterruptedException e) { e.printStackTrace(); return null; } }); TimeUnit.SECONDS.sleep(1); // 手動設置Future為完成狀態並設置默認值 runAsync.complete(false); System.out.println(runAsync.get()); // false TimeUnit.SECONDS.sleep(5); }
3.13. 注:Junit測試類中的公共方法
private Thread getCurrentThread() { return Thread.currentThread(); } private String getCurrentThreadName() { return getCurrentThread().getName(); }
4. thenApply與thenCompose的區別
4.1. 用thenApply方法來組合兩個CompletableFuture
/** * @see CompletableFuture#thenApply(Function) * 如果想用thenApply方法來組合兩個CompletableFuture,看起來會非常不優雅,所以組合多個CompletableFuture推薦使用<b>CompletableFuture#thenCompose</b> */ @Test public void thenApplyNeedReturnCompletionStageTest() { CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> getCurrentThreadName() + "---supplyAsync method -> ") .thenApply( (s) -> CompletableFuture.supplyAsync( () -> s + getCurrentThreadName() + "---theApply method -> ")) .thenAccept( (c) -> c.thenAccept( (f) -> System.out.println(f + getCurrentThreadName() + "---thenAccept"))); // ForkJoinPool.commonPool-worker-9---supplyAsync method -> // ForkJoinPool.commonPool-worker-9---theApply method -> main---thenAccept }
4.2. 用thenCompose方法來組合兩個CompletableFuture
/** * @see CompletableFuture#thenCompose(Function) 作為兩個CompletableFuture組合使用 這里的Function<? super T, ? * extends CompletionStage<U>> T要轉換成CompletionStage對應的子類? extends * CompletionStage<U>,比如另一個CompletableFuture#supplyAsync返回值就是. */ @Test public void thenComposeTest() { CompletableFuture.supplyAsync(() -> getCurrentThreadName() + ": supplyAsync -> ") .thenCompose( (s) -> CompletableFuture.supplyAsync( () -> s + getCurrentThreadName() + "---theApply method -> ")) .thenAccept((f) -> System.out.println(f + getCurrentThreadName() + "---thenAccept")); // ForkJoinPool.commonPool-worker-9: supplyAsync -> ForkJoinPool.commonPool-worker-9---theApply // method -> main---thenAccept }
5. CompletableFuture常用方法總結
1 runAsync接收的Runnable參數,supplyAsync接收的Supplier參數
2 thenAccept與thenAcceptAsync的區別在於:該callback方法是否在當前線程中執行(更具體的例子見前面的代碼中的運行結果)
3 thenApply與thenCompose主要區別在於組合多個CompletableFuture
4 其他的方法如上面的代碼例子所示