CompletableFuture簡介
JDK 1.8 提供了CompletableFuture來支持異步編程,我們可以用CompletableFuture來很快的實現異步編程,CompletableFuture提供了串行,並行,匯聚3種模式提供給我們使用
使用方法
創建
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
我們可以通過上面4個API來創建CompletableFuture對象,API分為兩大類,一類是無返回值的runAsync,一類是有返回值的supplyAsync,每個大類下面有分成了兩個小類,一種是使用默認的Fork/Join線程池,一種是使用自己定義的線程池
串行調用
CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Void> thenAccept(Consumer<? super T> action);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
CompletableFuture<Void> thenRun(Runnable action);
CompletableFuture<Void> thenRunAsync(Runnable action);
CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
CompletableFuture<R> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<R> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
串行調用提供了上述的8個API,分為4大類,逐一介紹一下:
-
thenApply系列需要傳入一個Function<? super T,? extends U>參數,T代表入參,U代表出參,所以thenApply系列可以傳入參數也可以返回結果
-
thenAccept系列會傳入一個Consumer<? super T>,T是入參,所以thenAccept可以傳入參數,但是不會返回結果
-
thenRun系列需要傳入一個Runnale,所以這個系列既不能有入參也不會有結果
-
thenCompose系列和thenApply系列結果相同,但是需要開啟一個子任務去執行,從傳入的參數也可以看出,參數一接受一個CompletionStage的Function,CompletionStage就是CompletableFuture實現的接口,具體到實現類就是在接收一個CompletableFuture對象
每個大類都有* 和 *Async兩種API,區別就在於帶Async的任務會在丟給Fork/Join線程池執行,不帶Async就直接由前面任務的線程來執行,帶Async還可以自己指定線程池
並行
並行比較好理解,就是同時創建多個CompletableFuture,讓任務去並行執行
匯聚
匯聚又分成兩種,一種的AND匯聚,一個是OR匯聚,簡單的說就是AND匯聚需要匯聚的任務都完成才可以執行匯聚之后的方法,而OR匯聚只要其中一個任務完成就可以往下執行了,匯聚API可以將並行執行的CompletableFuture匯聚成一個CompletableFuture
AND匯聚
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action));
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
AND匯聚提供了3類API,API和串行的API功能類似,thenCombine提供了有入參和出參的能力,thenAcceptBoth只提供了入參的能力,沒有返回值,runAfterBoth既沒有入參也沒有出參
OR匯聚
CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)
CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)
CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor)
OR匯聚也和AND匯聚類似,提供了3類API,功能方法也可以參考AND匯聚執行的方法
異常處理
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action));
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)
CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
異常處理提供了3類API:
- exceptionally類似try-catch,如果有異常我們可以通過入參獲取到異常
- whenComplete可以獲取CompletableFuture的結果,並且可以通過第二個參數異常(如果有的話),並且這個異常在主線程也可以捕獲
- handle和whenComplete類似,但是他還可以返回一個結果,和whenComplete不同的是,里面的異常在主線程不能捕獲
例子
package com.demo;
import java.util.concurrent.CompletableFuture;
public class Test {
public static void main(String[] args){
CompletableFuture<String> f1 = CompletableFuture.runAsync(()->{
System.out.println("T1:start");
sleep(1000);
System.out.println("T1: doing sth");
sleep(5000);
}).thenRunAsync(()-> System.out.println("T1 : next task")).thenApply((__)-> {
System.out.println("T1 task end");
return " T1 result";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("T2: start");
sleep(1000);
System.out.println("T2: doing sth");
sleep(2000);
return " T2:result";
}).thenApply(s-> s+ "!!!").thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));
CompletableFuture<String> f3 = f1.thenCombine(f2,(r1,r2)->{
System.out.println("T1 result :" + r1);
System.out.println("T2 result:" + r2);
return "t1 t2 end";
});
System.out.println(f3.join());
System.out.println("--------------");
/**
* exceptionally處理異常情況
* result:
* java.lang.ArithmeticException: / by zero
* 0
*/
CompletableFuture<Integer> f4 = CompletableFuture.supplyAsync(()->1/0)
.thenApply(i->i*i)
.exceptionally((throwable -> {
System.out.println(throwable.getMessage());
return 0;
}));
System.out.println(f4.join());
System.out.println("--------------");
/**
* whenComplete處理異常情況
* result : null, error : java.lang.ArithmeticException: / by zero
* enter exception block
*
* Process finished with exit code 0
*
*/
try {
CompletableFuture<Integer> f5 = CompletableFuture.supplyAsync(()->1/0)
.thenApply(i->i*i)
.whenComplete((i,t)-> {
System.out.println("result : " +i+ ", error : " + t.getMessage());
});
System.out.println(f5.join());
}catch (Exception e){
System.out.println("enter exception block");
}
System.out.println("--------------");
/**
* handle處理異常情況
* result : null, error : java.lang.ArithmeticException: / by zero
* 0
*
* Process finished with exit code 0
*
*/
try {
CompletableFuture<Integer> f6 = CompletableFuture.supplyAsync(()->1/0)
.thenApply(i->i*i)
.handle((i,t)-> {
System.out.println("result : " +i+ ", error : " + t.getMessage());
return 0;
});
System.out.println(f6.join());
}catch (Exception e){
System.out.println("enter exception block");
}
}
private static void sleep(long time){
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
一次結果
T1:start
T2: start
T1: doing sth
T2: doing sth
T1 : next task
T1 task end
T1 result : T1 result
T2 result: T2:RESULT!!!
t1 t2 end
--------------
java.lang.ArithmeticException: / by zero
0
--------------
result : null, error : java.lang.ArithmeticException: / by zero
enter exception block
--------------
result : null, error : java.lang.ArithmeticException: / by zero
0
Process finished with exit code 0
