Future
從JDK1.5開始,提供了Future來表示異步計算的結果,一般它需要結合ExecutorService(執行者)和Callable(任務)來使用。
示例
import java.util.*; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); Future<Integer> future = executor.submit(() -> { // 故意耗時 Thread.sleep(5000); return new Random().nextInt(100); }); System.out.println(future.get()); System.out.println("如果get是阻塞的,則此消息在數據之后輸出"); executor.shutdown(); } }
輸出
即使異步任務等待了5秒,也依然先於消息輸出,由此證明get方法是阻塞的。
Future只是個接口,實際上返回的類是FutureTask:
/** * 表示此任務的運行狀態。最初是NEW == 0。運行狀態僅在set、setException和cancel方法中轉換為終端狀態。 * * 可能的狀態轉換: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; public V get() throws InterruptedException, ExecutionException { int s = state; // 如果當前狀態是COMPLETING及其之下的狀態,則需要進入awaitDone方法阻塞等待任務完成。 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
CompletableFuture
JDk1.8引入了CompletableFuture,它實際上也是Future的實現類。這里可以得出:
1. 面試問Future和CompletableFuture的區別實際上是不嚴謹的,因為一個是接口一個是其實現類。
2. 問區別實際上是問FutureTask和CompletableFuture的區別,或者說CompletableFuture有哪些新特性,能完成Future不能完成的工作。
首先看類定義,可以看到,實現了CompletionStage接口,這個接口是所有的新特性了
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
對於CompletableFuture有四個執行異步任務的方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
1. 如果我們指定線程池,則會使用我么指定的線程池;如果沒有指定線程池,默認使用ForkJoinPool.commonPool()作為線程池。
2. supply開頭的帶有返回值,run開頭的無返回值。
1. 執行異步任務(supplyAsync / runAsync)
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { return new Random().nextInt(100); }, executor); System.out.println(future.get()); executor.shutdown(); } }
以上僅僅返回個隨機數,如果我們要利用計算結果進一步處理呢?
2. 結果轉換(thenApply / thenApplyAsync)
// 同步轉換 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 異步轉換,使用默認線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) // 異步轉換,使用指定線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Integer> future = CompletableFuture // 執行異步任務 .supplyAsync(() -> { return new Random().nextInt(100); }, executor) // 對上一步的結果進行處理 .thenApply(n -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } int res = new Random().nextInt(100); System.out.println(String.format("如果是同步的,這條消息應該先輸出。上一步結果:%s,新加:%s", n, res)); return n + res; }); System.out.println("我等了你2秒"); System.out.println(future.get()); executor.shutdown(); } }
輸出:
如果把thenApply換成thenApplyAsync,則會輸出:
處理完任務以及結果,該去消費了
3. 消費而不影響最終結果(thenAccept / thenRun / thenAcceptBoth)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
這三種的區別是:
thenAccept:能夠拿到並利用執行結果
thenRun:不能夠拿到並利用執行結果,只是單純的執行其它任務
thenAcceptBoth:能傳入另一個stage,然后把另一個stage的結果和當前stage的結果作為參數去消費。
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Integer> future = CompletableFuture // 執行異步任務 .supplyAsync(() -> { return new Random().nextInt(100); }, executor) // 對上一步的結果進行處理 .thenApplyAsync(n -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } int res = new Random().nextInt(100); System.out.println(String.format("如果是同步的,這條消息應該先輸出。上一步結果:%s,新加:%s", n, res)); return n + res; }); // 單純的消費執行結果,注意這個方法是不會返回計算結果的——CompletableFuture<Void> CompletableFuture<Void> voidCompletableFuture = future.thenAcceptAsync(n -> { System.out.println("單純消費任務執行結果:" + n); }); // 這個無法消費執行結果,沒有傳入的入口,只是在當前任務執行完畢后執行其它不相干的任務 future.thenRunAsync(() -> { System.out.println("我只能執行其它工作,我得不到任務執行結果"); }, executor); // 這個方法會接受其它CompletableFuture返回值和當前返回值 future.thenAcceptBothAsync(CompletableFuture.supplyAsync(() -> { return "I'm Other Result"; }), (current, other) -> { System.out.println(String.format("Current:%s,Other:%s", current, other)); }); System.out.println("我等了你2秒"); System.out.println(future.get()); executor.shutdown(); } }
結果:
如果我要組合兩個任務呢?
4. 組合任務(thenCombine / thenCompose)
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
這兩種區別:主要是返回類型不一樣。
thenCombine:至少兩個方法參數,一個為其它stage,一個為用戶自定義的處理函數,函數返回值為結果類型。
thenCompose:至少一個方法參數即處理函數,函數返回值為stage類型。
先看thenCombine
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Integer> otherFuture = CompletableFuture // 執行異步任務 .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("任務A:" + result); return result; }, executor); CompletableFuture<Integer> future = CompletableFuture // 執行異步任務 .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("任務B:" + result); return result; }, executor) .thenCombineAsync(otherFuture, (current, other) -> { int result = other + current; System.out.println("組合兩個任務的結果:" + result); return result; }); System.out.println(future.get()); executor.shutdown(); } }
執行結果:
再來看thenCompose
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<Integer> future = CompletableFuture // 執行異步任務 .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("任務A:" + result); return result; }, executor) .thenComposeAsync((current) -> { return CompletableFuture.supplyAsync(() -> { int b = new Random().nextInt(100); System.out.println("任務B:" + b); int result = b + current; System.out.println("組合兩個任務的結果:" + result); return result; }, executor); }); System.out.println(future.get()); executor.shutdown(); } }
輸出:
注意這兩個輸出雖然一樣,但是用法不一樣。
5. 快者優先(applyToEither / acceptEither)
有個場景,如果我們有多條渠道去完成同一種任務,那么我們肯定選擇最快的那個。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
這兩種區別:僅僅是一個有返回值,一個沒有(Void)
先看applyToEither
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<String> otherFuture = CompletableFuture .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("執行者A:" + result); try { // 故意A慢了一些 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "執行者A【" + result + "】"; }, executor); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("執行者B:" + result); return "執行者B【" + result + "】"; }, executor) .applyToEither(otherFuture, (faster) -> { System.out.println("誰最快:" + faster); return faster; }); System.out.println(future.get()); executor.shutdown(); } }
輸出:
再看acceptEither
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<String> otherFuture = CompletableFuture .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("執行者A:" + result); try { // 故意A慢了一些 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "執行者A【" + result + "】"; }, executor); CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { int result = new Random().nextInt(100); System.out.println("執行者B:" + result); return "執行者B【" + result + "】"; }, executor) .acceptEither(otherFuture, (faster) -> { System.out.println("誰最快:" + faster); }); System.out.println(future.get()); executor.shutdown(); } }
輸出:
6. 異常處理(exceptionally / whenComplete / handle)
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn); public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
exceptionally
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true){ throw new RuntimeException("Error!!!"); } return "Hello"; }, executor) // 處理上一步發生的異常 .exceptionally(e -> { System.out.println("處理異常:" + e.getMessage()); return "處理完畢!"; }); System.out.println(future.get()); executor.shutdown(); } }
輸出:
whenComplete
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true){ throw new RuntimeException("Error!!!"); } return "Hello"; }, executor) // 處理上一步發生的異常 .whenComplete((result,ex) -> { // 這里等待為了上一步的異常輸出完畢 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("上一步結果:" + result); System.out.println("處理異常:" + ex.getMessage()); }); System.out.println(future.get()); executor.shutdown(); } }
輸出結果:
可以看見,用whenComplete對異常情況不是特別友好。
handle
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true){ throw new RuntimeException("Error!!!"); } return "Hello"; }, executor) // 處理上一步發生的異常 .handle((result,ex) -> { System.out.println("上一步結果:" + result); System.out.println("處理異常:" + ex.getMessage()); return "Value When Exception Occurs"; }); System.out.println(future.get()); executor.shutdown(); } }
輸出:
綜上,如果單純要處理異常,那就用exceptionally;如果還想處理結果(沒有異常的情況),那就用handle,比whenComplete友好一些,handle不僅能處理異常還能返回一個異常情況的默認值。
對比
Future:我們的目的都是獲取異步任務的結果,但是對於Future來說,只能通過get方法或者死循環判斷isDone來獲取。異常情況就更是難辦。
CompletableFuture:只要我們設置好回調函數即可實現:
1. 只要任務完成,即執行我們設置的函數(不用再去考慮什么時候任務完成)
2. 如果發生異常,同樣會執行我們處理異常的函數,甚至連默認返回值都有(異常情況處理更加省力)
3. 如果有復雜任務,比如依賴問題,組合問題等,同樣可以寫好處理函數來處理(能應付復雜任務的處理)