CompletableFuture
Java5引入了Future和 FutureTask,用於異步處理。Future可以通過get()方法獲取異步的返回值。
在Java8引入了CompletableFuture,CompletableFuture不僅實現了Future接口, 還實現了CompletionStage接口。
CompletableFuture實現了CompletionStage接口,重寫thenApply()、thenCombine()等方法。
CompletableFuture類能夠處理多個異步任務,還能處理異步回調。還能完成以下操作:
- 將兩個異步計算合並為一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果。
- 等待 Future 集合中的所有任務都完成。
- 僅等待 Future集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同一個值),並返回它的結果。
- 通過編程方式完成一個Future任務的執行(即以手工設定異步操作結果的方式)。
- 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future 計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)
構建CompletableFuture
構建CompletableFuture主要有兩種方式,runAsync和supplyAsync。
- runAsync
runAsync異步處理任務,使用Runnable,構建CompletableFuture。
runAsync方法的參數有兩種形式,一種是使用默認的ForkJoinPool,另一種是使用自定義的線程池。
第一種源碼如下:
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
注意,默認的ForkJoinPool的線程是守護線程,當主線程結束時,ForkJoinPool的線程也會隨之結束,會影響異步任務的執行。
因此,建議使用自定義的線程池。
第二種源碼如下:
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
此處的Executor executor就是指線程池。
示例如下:
public static void runAsyncTest2() {
//該線程池僅用於示例,實際建議使用自定義的線程池
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("completableFuture runAsync2.");
}, executorService);
// executorService.shutdown();
}
- supplyAsync
supplyAsync同樣可以構建CompletableFuture。
supplyAsync跟runAsync的區別,主要是supplyAsync有返回值。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
此處的參數supplier其實是一個匿名對象,實現了Supplier
@FunctionalInterface
public interface Supplier<T> {
T get();
}
Supplier是函數式接口,關於函數式接口的理解,詳情見參考資料: java8 函數式接口
示例如下:
/**
* supplyAsync使用線程池,構建CompletableFuture
*
*/
public static void supplyAsyncTest2() {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsyncTest2");
return "completableFuture結果";
}, executorService);
// executorService.shutdown();
}
獲取CompletableFuture任務的結果
T get() throws InterruptedException, ExecutionException: 獲取返回值,會阻塞,還需要處理受檢的異常
V get(long timeout,Timeout unit):可以設置阻塞時間,unit為時間的單位(秒/分/時之類)
T getNow(T defaultValue):表示當有了返回結果時會返回結果,如果異步線程拋了異常會返回設置的默認值.
T join():獲取返回值,會阻塞
在實際編程中,不建議使用CompletableFuture的 get() 方法,
最好用get(long timeout,Timeout unit) 設置超時時間,這樣不會一直阻塞。
public static void supplyAsyncGet() {
//該線程池僅用於示例,實際建議使用自定義的線程池
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()
-> runTask(), executorService);
String result = null;
try {
//獲取返回值,2秒超時
result = completableFuture.get(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("InterruptedException error.", e);
} catch (ExecutionException e) {
logger.error("ExecutionException error.", e);
} catch (TimeoutException e) {
logger.error("TimeoutException error.", e);
}
logger.info("result:"+result);
}
private static String runTask() {
try {
//任務耗時。可以分別設置1000和3000,看未超時和超時的不同結果。
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("supplyAsyncGet error.");
}
return "supplyAsyncGet";
}
CompletableFuture任務執行中
thenApply(): 接收一個任務的前一階段的輸出作為本階段的輸入,該方法有一個參數,也有返回值。
通過thenApply(),可以在supplyAsync()異步完成后,馬上就使用supplyAsync()的返回值,不會阻塞。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
thenAccept(): 接收一個任務的前一階段的輸出作為本階段的輸入,該方法返回值類型為Void,相當於沒有返回值。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
thenRun(): 根本不關心一個任務的前一階段的輸出,它只負責運行新的Runnable任務,該方法返回值類型為Void,相當於沒有返回值。
public CompletableFuture<Void> thenRun(Runnable action)
示例如下:
public static void thenApplyTest() {
CompletableFuture.supplyAsync(() -> 1)
.thenApply(i -> i + 1).thenApply(i -> {
System.out.println("thenApplyTest結果為:"+i*i);
return i * i;
});
}
CompletableFuture任務完成后
whenComplete():任務完成后觸發,該方法有返回值。還有兩個參數,第一個參數是任務的返回值,第二個參數是異常。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
exceptionally():當運行出現異常時,調用該方法可進行一些補償操作,設置默認值.如果沒有異常,則不會觸發。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
示例如下:
public static void exceptionallyTest() {
// num作為分母,如果改成1,.exceptionally()方法就不會執行
Integer num = 0;
CompletableFuture<Integer> exceptionCf = CompletableFuture.supplyAsync(() -> 1)
.thenApply(i -> i / num)
.whenComplete((i, e) -> {
System.out.println(i);
})
.exceptionally(e -> {
System.out.println("exceptionallyTest error. "+ e);
System.out.println("異常處理");
return 0;
});
Integer result = exceptionCf.join();
System.out.println("exceptionCf結果為:"+result);
}
多個CompletableFuture任務的組合
thenCombine(): 會將兩個任務(CompletableFuture)的執行結果作為方法入參傳遞到指定方法中,且該方法有返回值;
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
thenAcceptBoth(): 同樣將兩個任務(CompletableFuture)的執行結果作為方法入參,但是是個Void的返回值,相當於沒有返回值;
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
runAfterBoth(): 在兩個任務(CompletableFuture)之后執行,但沒有入參,而且是個Void的返回值,相當於沒有返回值。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
示例如下:
public static void thenCombineTest() {
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()-> {
System.out.println("任務1結束.");
return "Hello";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()-> {
System.out.println("任務2結束.");
return "World";
});
cf1.thenCombine(cf2, (result1, result2) -> result1 + result2)
.whenComplete((r,e)-> System.out.println("任務1的返回值加上任務2的返回值,結果為:"+r));
}
CompletableFuture任務執行后
allOf就是所有任務都完成時觸發。但是是個Void的返回值,相當於沒有返回值。
//...表示不定參數,可以有多個CompletableFuture參數
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
anyOf是當入參的completableFuture組中有一個任務執行完畢就返回。返回結果是第一個完成的任務的結果。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例如下:
public static void allOfTest() {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務1結束.");
return "futureOneResult";
}, executorService);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務2結束.");
return "futureTwoResult";
},executorService);
CompletableFuture.allOf(cf1, cf2).thenRun(()->{
System.out.println("任務1和任務2都完成了");
});
// CompletableFuture completableFuture = CompletableFuture.anyOf(futureOne, futureTwo);
// //返回結果是第一個完成的任務的結果
// System.out.println(completableFuture.get());
}
參考資料
《java8實戰》
https://www.jianshu.com/p/547d2d7761db
https://www.cnblogs.com/fingerboy/p/9948736.html
https://www.jianshu.com/p/6bac52527ca4