簡介
先說Future, 它用來描述一個異步計算的結果。isDone方法可以用來檢查計算是否完成,get方法可以用來獲取結果,直到完成前一直阻塞當前線程,cancel方法可以取消任務。而對於結果的獲取,只能通過阻塞(get())或者輪詢的方式[while(!isDone)]. 阻塞的方式違背了異步編程的理念,輪詢的方式耗費無謂的CPU資源(CPU空轉)。於是,CompletableFuture應運而生。
樣例
后面介紹的源碼都會以下面的用例為切入點,循着調用軌跡理解源碼。如果任務很耗時,記得傳Executor, 或者方法末尾加上future.get(); 因為CompletableFuture默認使用ForkJoinPool, 而ForkJoinPool里面的線程都是daemon線程,主線程跑完了,虛擬機也就over了。
1 public void whenComplete() { 2 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 3 future.whenComplete((l, r) -> System.out.println(l)); 4 } 5 6 public void thenApply() { 7 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 8 future.thenApply(i -> -i); 9 } 10 11 public void thenAccept() { 12 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 13 future.thenAccept(System.out::println); 14 } 15 16 public void thenRun() { 17 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 18 future.thenRun(() -> System.out.println("Done")); 19 } 20 21 public void thenAcceptBoth() { 22 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 23 CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200); 24 future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y)); 25 } 26 27 public void acceptEither() { 28 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 29 CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200); 30 future.acceptEither(other, System.out::println); 31 32 } 33 34 public void allOf() { 35 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 36 CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200); 37 CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300); 38 CompletableFuture.allOf(future, second, third); 39 40 } 41 42 public void anyOf() throws InterruptedException, ExecutionException { 43 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); 44 CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200); 45 CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300); 46 CompletableFuture.anyOf(future, second, third); 47 }
源碼分析
supplyAsync
supplyAsync(Supplier<U> supplier)
1 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 2 return asyncSupplyStage(asyncPool, supplier); // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(實現了Executor接口,里面的內容是{new Thread(r).start();}) 3 }
asyncSupplyStage(Executor e, Supplier<U> f)
1 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { 2 if (f == null) 3 throw new NullPointerException(); 4 CompletableFuture<U> d = new CompletableFuture<U>(); // 構建一個新的CompletableFuture, 以此構建AsyncSupply作為Executor的執行參數 5 e.execute(new AsyncSupply<U>(d, f)); // AsyncSupply繼承了ForkJoinTask, 實現了Runnable, AsynchronousCompletionTask接口 6 return d; // 返回d,立返 7 }
AsyncSupply
1 // CompletableFuture的靜態內部類,作為一個ForkJoinTask 2 static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { 3 CompletableFuture<T> dep; // AsyncSupply作為一個依賴Task,dep作為這個Task的Future 4 Supplier<T> fn; // fn作為這個Task的具體執行邏輯,函數式編程 5 6 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { 7 this.dep = dep; 8 this.fn = fn; 9 } 10 11 public final Void getRawResult() { 12 return null; 13 } 14 15 public final void setRawResult(Void v) { 16 } 17 18 public final boolean exec() { 19 run(); 20 return true; 21 } 22 23 public void run() { 24 CompletableFuture<T> d; 25 Supplier<T> f; 26 if ((d = dep) != null && (f = fn) != null) { // 非空判斷 27 dep = null; 28 fn = null; 29 if (d.result == null) { // 查看任務是否結束,如果已經結束(result != null),直接調用postComplete()方法 30 try { 31 d.completeValue(f.get()); // 等待任務結束,並設置結果 32 } catch (Throwable ex) { 33 d.completeThrowable(ex); // 異常 34 } 35 } 36 d.postComplete(); // 任務結束后,會執行所有依賴此任務的其他任務,這些任務以一個無鎖並發棧的形式存在 37 } 38 } 39 }
postComplete()
1 final void postComplete() { 2 CompletableFuture<?> f = this; // 當前CompletableFuture 3 Completion h; // 無鎖並發棧,(Completion next), 保存的是依靠當前的CompletableFuture一串任務,完成即觸發(回調) 4 while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { // 當f的stack為空時,使f重新指向當前的CompletableFuture,繼續后面的結點 5 CompletableFuture<?> d; 6 Completion t; 7 if (f.casStack(h, t = h.next)) { // 從頭遍歷stack,並更新頭元素 8 if (t != null) { 9 if (f != this) { // 如果f不是當前CompletableFuture,則將它的頭結點壓入到當前CompletableFuture的stack中,使樹形結構變成鏈表結構,避免遞歸層次過深 10 pushStack(h); 11 continue; // 繼續下一個結點,批量壓入到當前棧中 12 } 13 h.next = null; // 如果是當前CompletableFuture, 解除頭節點與棧的聯系 14 } 15 f = (d = h.tryFire(NESTED)) == null ? this : d; // 調用頭節點的tryFire()方法,該方法可看作Completion的鈎子方法,執行完邏輯后,會向后傳播的 16 } 17 } 18 }
示意圖
每個CompletableFuture持有一個Completion棧stack, 每個Completion持有一個CompletableFuture -> dep, 如此遞歸循環下去,是層次很深的樹形結構,所以想辦法將其變成鏈表結構。
首先取出頭結點,下圖中灰色Completion結點,它會返回一個CompletableFuture, 同樣也擁有一個stack,策略是遍歷這個CompletableFuture的stack的每個結點,依次壓入到當前CompletableFuture的stack中,關系如下箭頭所示,灰色結點指的是處理過的結點。
第一個Completion結點返回的CompletableFuture, 將擁有的stack里面的所有結點都壓入了當前CompletableFuture的stack里面
后續的Completion結點返回的CompletableFuture, 將擁有的stack里面的所有結點都壓入了當前CompletableFuture的stack里面,重新構成了一個鏈表結構,后續也按照前面的邏輯操作,如此反復,便會遍歷完所有的CompletableFuture, 這些CompletableFuture(葉子結點)的stack為空,也是結束條件。
postComplete()最后調用的是Completion#tryFire()方法,先看下Completion的數據結構
Completion
1 abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { 2 volatile Completion next; // 無鎖並發棧 3 4 /** 5 * 鈎子方法,有三種模式,postComplete()方法里面使用的是NESTED模式,避免過深的遞歸調用 SYNC, ASYNC, or NESTED 6 */ 7 abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都調用了這個鈎子方法 8 9 /** cleanStack()方法里有用到 */ 10 abstract boolean isLive(); 11 12 public final void run() { 13 tryFire(ASYNC); 14 } 15 16 public final boolean exec() { 17 tryFire(ASYNC); 18 return true; 19 } 20 21 public final Void getRawResult() { 22 return null; 23 } 24 25 public final void setRawResult(Void v) { 26 } 27 }
static final int SYNC = 0; 同步
static final int ASYNC = 1; 異步
static final int NESTED = -1; 嵌套
繼承了ForkJoinTask, 實現了Runnable, AsynchronousCompletionTask接口,它有諸多子類,如下圖
后面的方法都對應着不同的子類。
先看一個子類UniCompletion
1 abstract static class UniCompletion<T,V> extends Completion { 2 Executor executor; // 執行器 3 CompletableFuture<V> dep; // 依賴的任務 4 CompletableFuture<T> src; // 被依賴的任務 5 6 UniCompletion(Executor executor, CompletableFuture<V> dep, 7 CompletableFuture<T> src) { 8 this.executor = executor; this.dep = dep; this.src = src; 9 } 10 11 final boolean claim() { // 如果當前任務可以被執行,返回true,否則,返回false; 保證任務只被執行一次 12 Executor e = executor; 13 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 14 if (e == null) 15 return true; 16 executor = null; // 設置為不可用 17 e.execute(this); 18 } 19 return false; 20 } 21 22 final boolean isLive() { return dep != null; } 23 }
claim()方法保證任務只被執行一次。
whenComplete
whenComplete()/whenCompleteAsync()
1 public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { 2 return uniWhenCompleteStage(null, action); 3 } 4 5 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { 6 return uniWhenCompleteStage(asyncPool, action); 7 }
xxx和xxxAsync方法的區別是,有沒有asyncPool作為入參,有的話,任務直接入參,不檢查任務是否完成。uniWhenCompleteStage方法有說明。
uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)
1 private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) { 2 if (f == null) 3 throw new NullPointerException(); 4 CompletableFuture<T> d = new CompletableFuture<T>(); // 構建future 5 if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果線程池不為空,直接構建任務入棧,並調用tryFire()方法;否則,調用uniWhenComplete()方法,檢查依賴的那個任務是否完成,沒有完成返回false, 6 // 完成了返回true, 以及后續一些操作。 7 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete繼承了UniCompletion 8 push(c); 9 c.tryFire(SYNC); // 先調一下鈎子方法,檢查一下任務是否結束 10 } 11 return d; 12 }
uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)
1 final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) { 2 Object r; 3 T t; 4 Throwable x = null; 5 if (a == null || (r = a.result) == null || f == null) // 被依賴的任務還未完成 6 return false; 7 if (result == null) { // 被依賴的任務完成了 8 try { 9 if (c != null && !c.claim()) // 判斷任務是否能被執行 10 return false; 11 if (r instanceof AltResult) { // 判斷異常,AltResult類型很簡單,里面只有一個屬性Throwable ex; 12 x = ((AltResult) r).ex; 13 t = null; 14 } else { 15 @SuppressWarnings("unchecked") 16 T tr = (T) r; // 正常的結果 17 t = tr; 18 } 19 f.accept(t, x); // 執行任務 20 if (x == null) { 21 internalComplete(r); // 任務的結果設置為被依賴任務的結果 22 return true; 23 } 24 } catch (Throwable ex) { 25 if (x == null) 26 x = ex; // 記錄異常 27 } 28 completeThrowable(x, r); // 設置異常和結果 29 } 30 return true; 31 }
push()
1 final void push(UniCompletion<?, ?> c) { 2 if (c != null) { 3 while (result == null && !tryPushStack(c)) 4 lazySetNext(c, null); // 失敗重置c的next域 5 } 6 } 7 8 final boolean tryPushStack(Completion c) { 9 Completion h = stack; 10 lazySetNext(c, h); 11 return UNSAFE.compareAndSwapObject(this, STACK, h, c); 12 } 13 14 static void lazySetNext(Completion c, Completion next) { 15 UNSAFE.putOrderedObject(c, NEXT, next); 16 }
UniWhenComplete
1 static final class UniWhenComplete<T> extends UniCompletion<T, T> { 2 BiConsumer<? super T, ? super Throwable> fn; 3 4 UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, 5 BiConsumer<? super T, ? super Throwable> fn) { 6 super(executor, dep, src); 7 this.fn = fn; 8 } 9 10 final CompletableFuture<T> tryFire(int mode) { // 鈎子方法 11 CompletableFuture<T> d; // 依賴的任務 12 CompletableFuture<T> a; // 被依賴的任務 13 if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是異步模式(mode = 1),就不判斷任務是否結束 14 return null; // dep為空,說明已經調用過了 15 dep = null; 16 src = null; 17 fn = null; 18 return d.postFire(a, mode); // 鈎子方法之后的處理 19 } 20 }
postFire(CompletableFuture<?> a, int mode)
1 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 2 if (a != null && a.stack != null) { // 被依賴的任務存在,且stack不為空,先處理它 3 if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任務的結果為空,直接清空棧 4 a.cleanStack(); 5 else 6 a.postComplete(); // 否則,調用postComplete()方法 7 } 8 if (result != null && stack != null) { // 再處理當前任務 9 if (mode < 0) // 嵌套模式,直接返回自身(樹 -> 鏈表,避免過深的遞歸調用) 10 return this; 11 else 12 postComplete(); // 調用postComplete()方法 13 } 14 return null; 15 }
cleanStack()
1 final void cleanStack() { // 過濾掉已經死掉的結點(Not isLive) 2 for (Completion p = null, q = stack; q != null;) { // q指針從頭節點開始,向右移動,s一直執行q的下一個結點,p要么為空,要么指向遍歷過的最后一個活着的結點,一旦發現q死掉了,就斷開q, 連接p, s 3 Completion s = q.next; 4 if (q.isLive()) { // 還活着,p指向遍歷過的最后一個結點,q向右移動 5 p = q; 6 q = s; 7 } else if (p == null) { // 說明第一個結點就是死掉的,cas stack, q指向stack 8 casStack(q, s); 9 q = stack; 10 } else { // 否則的話,連接p, s 11 p.next = s; 12 if (p.isLive()) // 再次判斷p結點是否還或者(在這期間是否有別的線程改動了) 13 q = s; // 還活着,q繼續向右移動 14 else { 15 p = null; // 過期的值,從新開始 16 q = stack; 17 } 18 } 19 } 20 }
如下圖
1. 第1個結點是無效結點,更新stack,更新指針
2. 第2個結點是有效結點,更新指針
3. 第3個結點是無效結點,更新指針
4. 第4個結點是有效結點,更新指針
thenApply
1 public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) { 2 return uniApplyStage(null, fn); 3 } 4 5 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) { 6 return uniApplyStage(asyncPool, fn); 7 } 8 9 private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) { 10 if (f == null) 11 throw new NullPointerException(); 12 CompletableFuture<V> d = new CompletableFuture<V>(); 13 if (e != null || !d.uniApply(this, f, null)) { 14 UniApply<T, V> c = new UniApply<T, V>(e, d, this, f); 15 push(c); 16 c.tryFire(SYNC); 17 } 18 return d; 19 } 20 21 final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) { 22 Object r; 23 Throwable x; 24 if (a == null || (r = a.result) == null || f == null) 25 return false; 26 tryComplete: if (result == null) { 27 if (r instanceof AltResult) { 28 if ((x = ((AltResult) r).ex) != null) { 29 completeThrowable(x, r); // 有異常,直接跳出 30 break tryComplete; 31 } 32 r = null; 33 } 34 try { 35 if (c != null && !c.claim()) 36 return false; 37 @SuppressWarnings("unchecked") 38 S s = (S) r; 39 completeValue(f.apply(s)); 40 } catch (Throwable ex) { 41 completeThrowable(ex); 42 } 43 } 44 return true; 45 } 46 47 static final class UniApply<T, V> extends UniCompletion<T, V> { 48 Function<? super T, ? extends V> fn; 49 50 UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, 51 Function<? super T, ? extends V> fn) { 52 super(executor, dep, src); 53 this.fn = fn; 54 } 55 56 final CompletableFuture<V> tryFire(int mode) { 57 CompletableFuture<V> d; 58 CompletableFuture<T> a; 59 if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) 60 return null; 61 dep = null; 62 src = null; 63 fn = null; 64 return d.postFire(a, mode); 65 } 66 }
一樣的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire
thenAccept
1 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2 return uniAcceptStage(null, action); 3 } 4 5 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 6 return uniAcceptStage(asyncPool, action); 7 } 8 9 private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { 10 if (f == null) 11 throw new NullPointerException(); 12 CompletableFuture<Void> d = new CompletableFuture<Void>(); 13 if (e != null || !d.uniAccept(this, f, null)) { 14 UniAccept<T> c = new UniAccept<T>(e, d, this, f); 15 push(c); 16 c.tryFire(SYNC); 17 } 18 return d; 19 } 20 21 final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) { 22 Object r; 23 Throwable x; 24 if (a == null || (r = a.result) == null || f == null) 25 return false; 26 tryComplete: if (result == null) { 27 if (r instanceof AltResult) { 28 if ((x = ((AltResult) r).ex) != null) { 29 completeThrowable(x, r); // 有異常直接跳出 30 break tryComplete; 31 } 32 r = null; 33 } 34 try { 35 if (c != null && !c.claim()) 36 return false; 37 @SuppressWarnings("unchecked") 38 S s = (S) r; 39 f.accept(s); 40 completeNull(); 41 } catch (Throwable ex) { 42 completeThrowable(ex); 43 } 44 } 45 return true; 46 } 47 48 static final class UniAccept<T> extends UniCompletion<T, Void> { 49 Consumer<? super T> fn; 50 51 UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) { 52 super(executor, dep, src); 53 this.fn = fn; 54 } 55 56 final CompletableFuture<Void> tryFire(int mode) { 57 CompletableFuture<Void> d; 58 CompletableFuture<T> a; 59 if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) 60 return null; 61 dep = null; 62 src = null; 63 fn = null; 64 return d.postFire(a, mode); 65 } 66 }
thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire
thenRun
1 public CompletableFuture<Void> thenRun(Runnable action) { 2 return uniRunStage(null, action); 3 } 4 5 public CompletableFuture<Void> thenRunAsync(Runnable action) { 6 return uniRunStage(asyncPool, action); 7 } 8 9 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 10 if (f == null) 11 throw new NullPointerException(); 12 CompletableFuture<Void> d = new CompletableFuture<Void>(); 13 if (e != null || !d.uniRun(this, f, null)) { 14 UniRun<T> c = new UniRun<T>(e, d, this, f); 15 push(c); 16 c.tryFire(SYNC); 17 } 18 return d; 19 } 20 21 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { 22 Object r; 23 Throwable x; 24 if (a == null || (r = a.result) == null || f == null) 25 return false; 26 if (result == null) { 27 if (r instanceof AltResult && (x = ((AltResult) r).ex) != null) 28 completeThrowable(x, r); 29 else 30 try { 31 if (c != null && !c.claim()) 32 return false; 33 f.run(); 34 completeNull(); 35 } catch (Throwable ex) { 36 completeThrowable(ex); 37 } 38 } 39 return true; 40 } 41 42 static final class UniRun<T> extends UniCompletion<T, Void> { 43 Runnable fn; 44 45 UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) { 46 super(executor, dep, src); 47 this.fn = fn; 48 } 49 50 final CompletableFuture<Void> tryFire(int mode) { 51 CompletableFuture<Void> d; 52 CompletableFuture<T> a; 53 if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this)) 54 return null; 55 dep = null; 56 src = null; 57 fn = null; 58 return d.postFire(a, mode); 59 } 60 }
thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire
thenAcceptBoth
thenAcceptBoth
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); }
biAcceptStage
private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o, BiConsumer<? super T, ? super U> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
bipush
1 final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) { 2 if (c != null) { 3 Object r; 4 while ((r = result) == null && !tryPushStack(c)) // a的result還沒准備好,c壓入棧 5 lazySetNext(c, null); // 失敗重置c的next域 6 if (b != null && b != this && b.result == null) { // b的result也還沒准備好 7 Completion q = (r != null) ? c : new CoCompletion(c); // 根據a的result決定是否構建CoCompletion, 如果a未結束,則構建一個CoCompletion, CoCompletion最后調用的也是BiCompletion的tryFire 8 while (b.result == null && !b.tryPushStack(q)) // 將q壓入棧 9 lazySetNext(q, null); // 失敗重置q的next域 10 } 11 } 12 }
CoCompletion
1 static final class CoCompletion extends Completion { 2 BiCompletion<?, ?, ?> base; 3 4 CoCompletion(BiCompletion<?, ?, ?> base) { 5 this.base = base; 6 } 7 8 final CompletableFuture<?> tryFire(int mode) { 9 BiCompletion<?, ?, ?> c; 10 CompletableFuture<?> d; 11 if ((c = base) == null || (d = c.tryFire(mode)) == null) // 調用的還是BiCompletion的tryFire方法 12 return null; 13 base = null; 14 return d; 15 } 16 17 final boolean isLive() { 18 BiCompletion<?, ?, ?> c; 19 return (c = base) != null && c.dep != null; 20 } 21 }
biAccept
1 final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f, 2 BiAccept<R, S> c) { 3 Object r, s; 4 Throwable x; 5 if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null) 6 return false; // a和b都完成了,才會往下走 7 tryComplete: if (result == null) { 8 if (r instanceof AltResult) { 9 if ((x = ((AltResult) r).ex) != null) { // a的異常檢查 10 completeThrowable(x, r); 11 break tryComplete; 12 } 13 r = null; 14 } 15 if (s instanceof AltResult) { 16 if ((x = ((AltResult) s).ex) != null) { // b的異常檢查 17 completeThrowable(x, s); 18 break tryComplete; 19 } 20 s = null; 21 } 22 try { 23 if (c != null && !c.claim()) 24 return false; 25 @SuppressWarnings("unchecked") 26 R rr = (R) r; 27 @SuppressWarnings("unchecked") 28 S ss = (S) s; 29 f.accept(rr, ss); // 執行任務 30 completeNull(); 31 } catch (Throwable ex) { 32 completeThrowable(ex); 33 } 34 } 35 return true; 36 }
BiAccept
1 static final class BiAccept<T, U> extends BiCompletion<T, U, Void> { 2 BiConsumer<? super T, ? super U> fn; 3 4 BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, 5 BiConsumer<? super T, ? super U> fn) { 6 super(executor, dep, src, snd); 7 this.fn = fn; 8 } 9 10 final CompletableFuture<Void> tryFire(int mode) { 11 CompletableFuture<Void> d; 12 CompletableFuture<T> a; 13 CompletableFuture<U> b; 14 if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 15 return null; 16 dep = null; 17 src = null; 18 snd = null; 19 fn = null; 20 return d.postFire(a, b, mode); 21 } 22 } 23 24 abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> { 25 CompletableFuture<U> snd; // second source for action 26 27 BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { 28 super(executor, dep, src); 29 this.snd = snd; 30 } 31 }
thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire
acceptEither
1 public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) { 2 return orAcceptStage(null, other, action); 3 } 4 5 public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) { 6 return orAcceptStage(asyncPool, other, action); 7 } 8 9 private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o, 10 Consumer<? super T> f) { 11 CompletableFuture<U> b; 12 if (f == null || (b = o.toCompletableFuture()) == null) 13 throw new NullPointerException(); 14 CompletableFuture<Void> d = new CompletableFuture<Void>(); 15 if (e != null || !d.orAccept(this, b, f, null)) { 16 OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f); 17 orpush(b, c); 18 c.tryFire(SYNC); 19 } 20 return d; 21 } 22 23 final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f, 24 OrAccept<R, S> c) { 25 Object r; 26 Throwable x; 27 if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null) 28 return false; // a和b有一個完成了就往下走 29 tryComplete: if (result == null) { 30 try { 31 if (c != null && !c.claim()) 32 return false; 33 if (r instanceof AltResult) { // 異常 34 if ((x = ((AltResult) r).ex) != null) { 35 completeThrowable(x, r); 36 break tryComplete; 37 } 38 r = null; 39 } 40 @SuppressWarnings("unchecked") 41 R rr = (R) r; 42 f.accept(rr); // 執行 43 completeNull(); 44 } catch (Throwable ex) { 45 completeThrowable(ex); 46 } 47 } 48 return true; 49 } 50 51 static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> { 52 Consumer<? super T> fn; 53 54 OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, 55 Consumer<? super T> fn) { 56 super(executor, dep, src, snd); 57 this.fn = fn; 58 } 59 60 final CompletableFuture<Void> tryFire(int mode) { 61 CompletableFuture<Void> d; 62 CompletableFuture<T> a; 63 CompletableFuture<U> b; 64 if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 65 return null; 66 dep = null; 67 src = null; 68 snd = null; 69 fn = null; 70 return d.postFire(a, b, mode); 71 } 72 } 73 74 final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) { 75 if (c != null) { 76 while ((b == null || b.result == null) && result == null) { // a和b的result都沒好,才會考慮入棧 77 if (tryPushStack(c)) { // 先入a的棧 78 if (b != null && b != this && b.result == null) { // 入a的棧成功,b的result還沒好 79 Completion q = new CoCompletion(c); // a還未結束,用c構建CoCompletion 80 while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判斷,a和b的result都沒好,才會考慮入棧 81 lazySetNext(q, null); // 失敗置空q的next域 82 } 83 break; 84 } 85 lazySetNext(c, null); // 失敗置空c的next域 86 } 87 } 88 }
acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire
allOf
1 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2 return andTree(cfs, 0, cfs.length - 1); 3 } 4 5 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個數組構建成一棵樹,二叉樹,動態規划 6 CompletableFuture<Void> d = new CompletableFuture<Void>(); 7 if (lo > hi) // empty 8 d.result = NIL; 9 else { 10 CompletableFuture<?> a, b; 11 int mid = (lo + hi) >>> 1; 12 if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null 13 || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null) 14 throw new NullPointerException(); 15 if (!d.biRelay(a, b)) { 16 BiRelay<?, ?> c = new BiRelay<>(d, a, b); 17 a.bipush(b, c); // both 18 c.tryFire(SYNC); 19 } 20 } 21 return d; 22 } 23 24 static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And 25 BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { 26 super(null, dep, src, snd); 27 } 28 29 final CompletableFuture<Void> tryFire(int mode) { 30 CompletableFuture<Void> d; 31 CompletableFuture<T> a; 32 CompletableFuture<U> b; 33 if ((d = dep) == null || !d.biRelay(a = src, b = snd)) 34 return null; 35 src = null; 36 snd = null; 37 dep = null; 38 return d.postFire(a, b, mode); 39 } 40 } 41 42 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 43 Object r, s; 44 Throwable x; 45 if (a == null || (r = a.result) == null || b == null || (s = b.result) == null) 46 return false; // a和b都結束了才往下執行 47 if (result == null) { 48 if (r instanceof AltResult && (x = ((AltResult) r).ex) != null) 49 completeThrowable(x, r); 50 else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null) 51 completeThrowable(x, s); 52 else 53 completeNull(); // 輔助結點,什么都不做 54 } 55 return true; 56 }
allOf -> andTree -> biRelay -> tryFire -> postFire
anyOf
1 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2 return orTree(cfs, 0, cfs.length - 1); 3 } 4 5 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個數組構建成一棵樹,二叉樹,動態規划 6 CompletableFuture<Object> d = new CompletableFuture<Object>(); 7 if (lo <= hi) { 8 CompletableFuture<?> a, b; 9 int mid = (lo + hi) >>> 1; 10 if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null 11 || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null) 12 throw new NullPointerException(); 13 if (!d.orRelay(a, b)) { 14 OrRelay<?, ?> c = new OrRelay<>(d, a, b); 15 a.orpush(b, c); 16 c.tryFire(SYNC); 17 } 18 } 19 return d; 20 } 21 22 static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or 23 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { 24 super(null, dep, src, snd); 25 } 26 27 final CompletableFuture<Object> tryFire(int mode) { 28 CompletableFuture<Object> d; 29 CompletableFuture<T> a; 30 CompletableFuture<U> b; 31 if ((d = dep) == null || !d.orRelay(a = src, b = snd)) 32 return null; 33 src = null; 34 snd = null; 35 dep = null; 36 return d.postFire(a, b, mode); 37 } 38 } 39 40 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 41 Object r; 42 if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null)) 43 return false; // a和b有一個結束就往下進行 44 if (result == null) 45 completeRelay(r); 46 return true; 47 }
anyOf -> orTree -> orRelay -> tryFire -> postFire
數組構建樹
allOf和anyOf都用到了數組構建成樹的策略。
假設有一個任務Z(虛擬的,什么都不做),依賴一組任務[A, B, C, D, E, F, G, H]
對於allOf, 當這組任務都完成時,才會執行Z;對於anyOf, 當這組任務中有任何一個完成,就執行任務Z。
如果這組任務是數組結構或者鏈表結構,我們該如何解決呢?遍歷數組或者是鏈表,當任務都完成或者有一個完成時,就執行Z,需要不停地遍歷,這是輪詢的方法,不合適。
整個基調是回調,是指,當一個任務完成時,會接着執行所有依賴於它的任務。
作為一個數組或者鏈表,該如何應用回調呢?誰在先,誰在后呢?因為不知道哪個任務會先完成,所以沒法確定次序。而且這組任務之間也不應該相互依賴,它們只不過都是被Z依賴。
如果這組任務只有一個的話,那就演變成了X.thenXXX(Z), 如果這組任務有兩個的話,allOf -> Both,anyOf -> Either
如果Z依賴Z1,Z2兩個個任務,Z1和Z2依賴Z11,Z12和Z21,Z22四個任務,依次類推,當虛擬的任務的個數達到真實任務的個數的一半時,就讓虛擬任務監聽真實的任務,動態規划加二叉樹,時間復雜度也只是logn級別的。
1 static String array2Tree(String[] cfs, int lo, int hi) { 2 String d = new String(cfs[lo] + cfs[hi]); 3 if (lo <= hi) { 4 String a, b; 5 int mid = (lo + hi) >>> 1; // 二分 6 if (lo == mid) { // a作為左半部分的的結果 7 a = cfs[lo]; // 當只有不超過兩個元素時,a直接取第一個值 8 } else { 9 a = array2Tree(cfs, lo, mid); 10 } 11 if (lo == hi) { // 當只有一個元素的時候,b取a的值 12 b = a; 13 } else { 14 if (hi == mid + 1) { // 右半部分只有兩個元素時,b取第二個元素的值 15 b = cfs[hi]; 16 } else { 17 b = array2Tree(cfs, mid + 1, hi); 18 } 19 } 20 if (a == null || b == null) { 21 throw new NullPointerException(); 22 } 23 System.out.println("[" + a + "][" + b + "]->[" + d + "]"); 24 } 25 return d; 26 }
Console
[A][B]->[AB]
[C][D]->[CD]
[AB][CD]->[AD]
[E][F]->[EF]
[G][H]->[GH]
[EF][GH]->[EH]
[AD][EH]->[AH]
如下圖
對於allOf, Z只要保證Z1和Z2都完成了就行,Z1和Z2分別保證Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22則分別保證了A-H任務都完成。
對應anyOf, Z 只要保證Z1和Z2有一個完成了就像,Z1和Z2聯合保證了Z11,Z12,Z21,Z22這4個任務只要有一個完成了就行,同理,Z11,Z12,Z21,Z22則聯合保證了A-H中有一個任務完成了就行。
然后,Z就可以執行了,其實Z什么也沒做,只是從這組任務里得出一個結果。
行文至此結束。
尊重他人的勞動,轉載請注明出處:http://www.cnblogs.com/aniao/p/aniao_cf.html