【JUC源碼解析】CompletableFuture


簡介

先說Future, 它用來描述一個異步計算的結果。isDone方法可以用來檢查計算是否完成,get方法可以用來獲取結果,直到完成前一直阻塞當前線程,cancel方法可以取消任務。而對於結果的獲取,只能通過阻塞(get())或者輪詢的方式[while(!isDone)]. 阻塞的方式違背了異步編程的理念,輪詢的方式耗費無謂的CPU資源(CPU空轉)。於是,CompletableFuture應運而生。

樣例

后面介紹的源碼都會以下面的用例為切入點,循着調用軌跡理解源碼。如果任務很耗時,記得傳Executor, 或者方法末尾加上future.get(); 因為CompletableFuture默認使用ForkJoinPool, 而ForkJoinPool里面的線程都是daemon線程,主線程跑完了,虛擬機也就over了。

 1     public void whenComplete() {
 2         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
 3         future.whenComplete((l, r) -> System.out.println(l));
 4     }
 5 
 6     public void thenApply() {
 7         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
 8         future.thenApply(i -> -i);
 9     }
10 
11     public void thenAccept() {
12         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
13         future.thenAccept(System.out::println);
14     }
15 
16     public void thenRun() {
17         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
18         future.thenRun(() -> System.out.println("Done"));
19     }
20 
21     public void thenAcceptBoth() {
22         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
23         CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
24         future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y));
25     }
26 
27     public void acceptEither() {
28         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
29         CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200);
30         future.acceptEither(other, System.out::println);
31 
32     }
33 
34     public void allOf() {
35         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
36         CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
37         CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
38         CompletableFuture.allOf(future, second, third);
39 
40     }
41 
42     public void anyOf() throws InterruptedException, ExecutionException {
43         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
44         CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200);
45         CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300);
46         CompletableFuture.anyOf(future, second, third);
47     }

 

源碼分析

supplyAsync

supplyAsync(Supplier<U> supplier)

1     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2         return asyncSupplyStage(asyncPool, supplier); // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(實現了Executor接口,里面的內容是{new Thread(r).start();})
3     }

 

asyncSupplyStage(Executor e, Supplier<U> f)

1     static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
2         if (f == null)
3             throw new NullPointerException();
4         CompletableFuture<U> d = new CompletableFuture<U>(); // 構建一個新的CompletableFuture, 以此構建AsyncSupply作為Executor的執行參數
5         e.execute(new AsyncSupply<U>(d, f)); // AsyncSupply繼承了ForkJoinTask, 實現了Runnable, AsynchronousCompletionTask接口
6         return d; // 返回d,立返
7     }

 

AsyncSupply

 1     // CompletableFuture的靜態內部類,作為一個ForkJoinTask
 2     static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
 3         CompletableFuture<T> dep; // AsyncSupply作為一個依賴Task,dep作為這個Task的Future
 4         Supplier<T> fn; // fn作為這個Task的具體執行邏輯,函數式編程
 5 
 6         AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
 7             this.dep = dep;
 8             this.fn = fn;
 9         }
10 
11         public final Void getRawResult() {
12             return null;
13         }
14 
15         public final void setRawResult(Void v) {
16         }
17 
18         public final boolean exec() {
19             run();
20             return true;
21         }
22 
23         public void run() {
24             CompletableFuture<T> d;
25             Supplier<T> f;
26             if ((d = dep) != null && (f = fn) != null) { // 非空判斷
27                 dep = null;
28                 fn = null;
29                 if (d.result == null) { // 查看任務是否結束,如果已經結束(result != null),直接調用postComplete()方法
30                     try {
31                         d.completeValue(f.get()); // 等待任務結束,並設置結果
32                     } catch (Throwable ex) {
33                         d.completeThrowable(ex); // 異常
34                     }
35                 }
36                 d.postComplete(); // 任務結束后,會執行所有依賴此任務的其他任務,這些任務以一個無鎖並發棧的形式存在
37             }
38         }
39     }

 

postComplete()

 1     final void postComplete() {
 2         CompletableFuture<?> f = this; // 當前CompletableFuture
 3         Completion h; // 無鎖並發棧,(Completion next), 保存的是依靠當前的CompletableFuture一串任務,完成即觸發(回調)
 4         while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { // 當f的stack為空時,使f重新指向當前的CompletableFuture,繼續后面的結點
 5             CompletableFuture<?> d;
 6             Completion t;
 7             if (f.casStack(h, t = h.next)) { // 從頭遍歷stack,並更新頭元素
 8                 if (t != null) {
 9                     if (f != this) { // 如果f不是當前CompletableFuture,則將它的頭結點壓入到當前CompletableFuture的stack中,使樹形結構變成鏈表結構,避免遞歸層次過深
10                         pushStack(h);
11                         continue; // 繼續下一個結點,批量壓入到當前棧中
12                     }
13                     h.next = null; // 如果是當前CompletableFuture, 解除頭節點與棧的聯系
14                 }
15                 f = (d = h.tryFire(NESTED)) == null ? this : d; // 調用頭節點的tryFire()方法,該方法可看作Completion的鈎子方法,執行完邏輯后,會向后傳播的
16             }
17         }
18     }

 

示意圖

每個CompletableFuture持有一個Completion棧stack, 每個Completion持有一個CompletableFuture -> dep, 如此遞歸循環下去,是層次很深的樹形結構,所以想辦法將其變成鏈表結構。

首先取出頭結點,下圖中灰色Completion結點,它會返回一個CompletableFuture, 同樣也擁有一個stack,策略是遍歷這個CompletableFuture的stack的每個結點,依次壓入到當前CompletableFuture的stack中,關系如下箭頭所示,灰色結點指的是處理過的結點。

第一個Completion結點返回的CompletableFuture, 將擁有的stack里面的所有結點都壓入了當前CompletableFuture的stack里面

 

后續的Completion結點返回的CompletableFuture, 將擁有的stack里面的所有結點都壓入了當前CompletableFuture的stack里面,重新構成了一個鏈表結構,后續也按照前面的邏輯操作,如此反復,便會遍歷完所有的CompletableFuture, 這些CompletableFuture(葉子結點)的stack為空,也是結束條件。

 

postComplete()最后調用的是Completion#tryFire()方法,先看下Completion的數據結構

 

Completion

 1     abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
 2         volatile Completion next; // 無鎖並發棧
 3 
 4         /**
 5          * 鈎子方法,有三種模式,postComplete()方法里面使用的是NESTED模式,避免過深的遞歸調用 SYNC, ASYNC, or NESTED
 6          */
 7         abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都調用了這個鈎子方法
 8 
 9         /** cleanStack()方法里有用到 */
10         abstract boolean isLive();
11 
12         public final void run() {
13             tryFire(ASYNC);
14         }
15 
16         public final boolean exec() {
17             tryFire(ASYNC);
18             return true;
19         }
20 
21         public final Void getRawResult() {
22             return null;
23         }
24 
25         public final void setRawResult(Void v) {
26         }
27     }

static final int SYNC = 0;       同步
static final int ASYNC = 1;    異步
static final int NESTED = -1; 嵌套

 

繼承了ForkJoinTask, 實現了Runnable, AsynchronousCompletionTask接口,它有諸多子類,如下圖

后面的方法都對應着不同的子類。 

先看一個子類UniCompletion

 1     abstract static class UniCompletion<T,V> extends Completion {
 2         Executor executor;                 // 執行器
 3         CompletableFuture<V> dep;          // 依賴的任務
 4         CompletableFuture<T> src;          // 被依賴的任務
 5 
 6         UniCompletion(Executor executor, CompletableFuture<V> dep,
 7                       CompletableFuture<T> src) {
 8             this.executor = executor; this.dep = dep; this.src = src;
 9         }
10 
11         final boolean claim() { // 如果當前任務可以被執行,返回true,否則,返回false; 保證任務只被執行一次
12             Executor e = executor;
13             if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
14                 if (e == null)
15                     return true;
16                 executor = null; // 設置為不可用
17                 e.execute(this);
18             }
19             return false;
20         }
21 
22         final boolean isLive() { return dep != null; }
23     }

 claim()方法保證任務只被執行一次。

 

whenComplete

whenComplete()/whenCompleteAsync()

1     public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
2         return uniWhenCompleteStage(null, action);
3     }
4 
5     public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
6         return uniWhenCompleteStage(asyncPool, action);
7     }

xxx和xxxAsync方法的區別是,有沒有asyncPool作為入參,有的話,任務直接入參,不檢查任務是否完成。uniWhenCompleteStage方法有說明。

 

uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)

 1     private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) {
 2         if (f == null)
 3             throw new NullPointerException();
 4         CompletableFuture<T> d = new CompletableFuture<T>(); // 構建future
 5         if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果線程池不為空,直接構建任務入棧,並調用tryFire()方法;否則,調用uniWhenComplete()方法,檢查依賴的那個任務是否完成,沒有完成返回false,
 6                                                                 // 完成了返回true, 以及后續一些操作。
 7             UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete繼承了UniCompletion
 8             push(c);
 9             c.tryFire(SYNC); // 先調一下鈎子方法,檢查一下任務是否結束
10         }
11         return d;
12     }

 

uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)

 1     final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) {
 2         Object r;
 3         T t;
 4         Throwable x = null;
 5         if (a == null || (r = a.result) == null || f == null) // 被依賴的任務還未完成
 6             return false;
 7         if (result == null) { // 被依賴的任務完成了
 8             try {
 9                 if (c != null && !c.claim()) // 判斷任務是否能被執行
10                     return false;
11                 if (r instanceof AltResult) { // 判斷異常,AltResult類型很簡單,里面只有一個屬性Throwable ex; 
12                     x = ((AltResult) r).ex;
13                     t = null;
14                 } else {
15                     @SuppressWarnings("unchecked")
16                     T tr = (T) r; // 正常的結果
17                     t = tr;
18                 }
19                 f.accept(t, x); // 執行任務
20                 if (x == null) {
21                     internalComplete(r); // 任務的結果設置為被依賴任務的結果
22                     return true;
23                 }
24             } catch (Throwable ex) {
25                 if (x == null)
26                     x = ex; // 記錄異常
27             }
28             completeThrowable(x, r); // 設置異常和結果
29         }
30         return true;
31     }

 

push()

 1     final void push(UniCompletion<?, ?> c) {
 2         if (c != null) {
 3             while (result == null && !tryPushStack(c))
 4                 lazySetNext(c, null); // 失敗重置c的next域
 5         }
 6     }
 7     
 8     final boolean tryPushStack(Completion c) {
 9         Completion h = stack;
10         lazySetNext(c, h);
11         return UNSAFE.compareAndSwapObject(this, STACK, h, c);
12     }
13     
14     static void lazySetNext(Completion c, Completion next) {
15         UNSAFE.putOrderedObject(c, NEXT, next);
16     }

 

UniWhenComplete

 1     static final class UniWhenComplete<T> extends UniCompletion<T, T> {
 2         BiConsumer<? super T, ? super Throwable> fn;
 3 
 4         UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src,
 5                 BiConsumer<? super T, ? super Throwable> fn) {
 6             super(executor, dep, src);
 7             this.fn = fn;
 8         }
 9 
10         final CompletableFuture<T> tryFire(int mode) { // 鈎子方法
11             CompletableFuture<T> d; // 依賴的任務
12             CompletableFuture<T> a; // 被依賴的任務
13             if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是異步模式(mode = 1),就不判斷任務是否結束
14                 return null; // dep為空,說明已經調用過了
15             dep = null;
16             src = null;
17             fn = null;
18             return d.postFire(a, mode); // 鈎子方法之后的處理
19         }
20     }

 

postFire(CompletableFuture<?> a, int mode)

 1     final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
 2         if (a != null && a.stack != null) { // 被依賴的任務存在,且stack不為空,先處理它
 3             if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任務的結果為空,直接清空棧
 4                 a.cleanStack();
 5             else
 6                 a.postComplete(); // 否則,調用postComplete()方法
 7         }
 8         if (result != null && stack != null) { // 再處理當前任務
 9             if (mode < 0) // 嵌套模式,直接返回自身(樹 -> 鏈表,避免過深的遞歸調用)
10                 return this;
11             else
12                 postComplete(); // 調用postComplete()方法
13         }
14         return null;
15     }

 

 cleanStack()

 1     final void cleanStack() { // 過濾掉已經死掉的結點(Not isLive)
 2         for (Completion p = null, q = stack; q != null;) { // q指針從頭節點開始,向右移動,s一直執行q的下一個結點,p要么為空,要么指向遍歷過的最后一個活着的結點,一旦發現q死掉了,就斷開q, 連接p, s
 3             Completion s = q.next;
 4             if (q.isLive()) { // 還活着,p指向遍歷過的最后一個結點,q向右移動
 5                 p = q;
 6                 q = s;
 7             } else if (p == null) { // 說明第一個結點就是死掉的,cas stack, q指向stack
 8                 casStack(q, s);
 9                 q = stack;
10             } else { // 否則的話,連接p, s
11                 p.next = s;
12                 if (p.isLive()) // 再次判斷p結點是否還或者(在這期間是否有別的線程改動了)
13                     q = s; // 還活着,q繼續向右移動
14                 else {
15                     p = null; // 過期的值,從新開始
16                     q = stack;
17                 }
18             }
19         }
20     }

 

 如下圖

1. 第1個結點是無效結點,更新stack,更新指針

2. 第2個結點是有效結點,更新指針

3. 第3個結點是無效結點,更新指針

4. 第4個結點是有效結點,更新指針

 

 

thenApply

 1     public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
 2         return uniApplyStage(null, fn);
 3     }
 4 
 5     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
 6         return uniApplyStage(asyncPool, fn);
 7     }
 8 
 9     private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) {
10         if (f == null)
11             throw new NullPointerException();
12         CompletableFuture<V> d = new CompletableFuture<V>();
13         if (e != null || !d.uniApply(this, f, null)) {
14             UniApply<T, V> c = new UniApply<T, V>(e, d, this, f);
15             push(c);
16             c.tryFire(SYNC);
17         }
18         return d;
19     }
20 
21     final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) {
22         Object r;
23         Throwable x;
24         if (a == null || (r = a.result) == null || f == null)
25             return false;
26         tryComplete: if (result == null) {
27             if (r instanceof AltResult) {
28                 if ((x = ((AltResult) r).ex) != null) {
29                     completeThrowable(x, r); // 有異常,直接跳出
30                     break tryComplete;
31                 }
32                 r = null;
33             }
34             try {
35                 if (c != null && !c.claim())
36                     return false;
37                 @SuppressWarnings("unchecked")
38                 S s = (S) r;
39                 completeValue(f.apply(s));
40             } catch (Throwable ex) {
41                 completeThrowable(ex);
42             }
43         }
44         return true;
45     }
46 
47     static final class UniApply<T, V> extends UniCompletion<T, V> {
48         Function<? super T, ? extends V> fn;
49 
50         UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src,
51                 Function<? super T, ? extends V> fn) {
52             super(executor, dep, src);
53             this.fn = fn;
54         }
55 
56         final CompletableFuture<V> tryFire(int mode) {
57             CompletableFuture<V> d;
58             CompletableFuture<T> a;
59             if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this))
60                 return null;
61             dep = null;
62             src = null;
63             fn = null;
64             return d.postFire(a, mode);
65         }
66     }

一樣的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire

thenAccept

 1     public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
 2         return uniAcceptStage(null, action);
 3     }
 4 
 5     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
 6         return uniAcceptStage(asyncPool, action);
 7     }
 8 
 9     private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
10         if (f == null)
11             throw new NullPointerException();
12         CompletableFuture<Void> d = new CompletableFuture<Void>();
13         if (e != null || !d.uniAccept(this, f, null)) {
14             UniAccept<T> c = new UniAccept<T>(e, d, this, f);
15             push(c);
16             c.tryFire(SYNC);
17         }
18         return d;
19     }
20 
21     final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) {
22         Object r;
23         Throwable x;
24         if (a == null || (r = a.result) == null || f == null)
25             return false;
26         tryComplete: if (result == null) {
27             if (r instanceof AltResult) {
28                 if ((x = ((AltResult) r).ex) != null) {
29                     completeThrowable(x, r); // 有異常直接跳出
30                     break tryComplete;
31                 }
32                 r = null;
33             }
34             try {
35                 if (c != null && !c.claim())
36                     return false;
37                 @SuppressWarnings("unchecked")
38                 S s = (S) r;
39                 f.accept(s);
40                 completeNull();
41             } catch (Throwable ex) {
42                 completeThrowable(ex);
43             }
44         }
45         return true;
46     }
47 
48     static final class UniAccept<T> extends UniCompletion<T, Void> {
49         Consumer<? super T> fn;
50 
51         UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) {
52             super(executor, dep, src);
53             this.fn = fn;
54         }
55 
56         final CompletableFuture<Void> tryFire(int mode) {
57             CompletableFuture<Void> d;
58             CompletableFuture<T> a;
59             if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this))
60                 return null;
61             dep = null;
62             src = null;
63             fn = null;
64             return d.postFire(a, mode);
65         }
66     }

thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire

 

thenRun

 

 1     public CompletableFuture<Void> thenRun(Runnable action) {
 2         return uniRunStage(null, action);
 3     }
 4 
 5     public CompletableFuture<Void> thenRunAsync(Runnable action) {
 6         return uniRunStage(asyncPool, action);
 7     }
 8 
 9     private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
10         if (f == null)
11             throw new NullPointerException();
12         CompletableFuture<Void> d = new CompletableFuture<Void>();
13         if (e != null || !d.uniRun(this, f, null)) {
14             UniRun<T> c = new UniRun<T>(e, d, this, f);
15             push(c);
16             c.tryFire(SYNC);
17         }
18         return d;
19     }
20 
21     final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {
22         Object r;
23         Throwable x;
24         if (a == null || (r = a.result) == null || f == null)
25             return false;
26         if (result == null) {
27             if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
28                 completeThrowable(x, r);
29             else
30                 try {
31                     if (c != null && !c.claim())
32                         return false;
33                     f.run();
34                     completeNull();
35                 } catch (Throwable ex) {
36                     completeThrowable(ex);
37                 }
38         }
39         return true;
40     }
41 
42     static final class UniRun<T> extends UniCompletion<T, Void> {
43         Runnable fn;
44 
45         UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) {
46             super(executor, dep, src);
47             this.fn = fn;
48         }
49 
50         final CompletableFuture<Void> tryFire(int mode) {
51             CompletableFuture<Void> d;
52             CompletableFuture<T> a;
53             if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this))
54                 return null;
55             dep = null;
56             src = null;
57             fn = null;
58             return d.postFire(a, mode);
59         }
60     }

thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire 

thenAcceptBoth

thenAcceptBoth

    public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
            BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

 

biAcceptStage

    private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o,
            BiConsumer<? super T, ? super U> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biAccept(this, b, f, null)) {
            BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

 

bipush

 1     final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
 2         if (c != null) {
 3             Object r;
 4             while ((r = result) == null && !tryPushStack(c)) // a的result還沒准備好,c壓入棧
 5                 lazySetNext(c, null); // 失敗重置c的next域
 6             if (b != null && b != this && b.result == null) { // b的result也還沒准備好
 7                 Completion q = (r != null) ? c : new CoCompletion(c); // 根據a的result決定是否構建CoCompletion, 如果a未結束,則構建一個CoCompletion, CoCompletion最后調用的也是BiCompletion的tryFire
 8                 while (b.result == null && !b.tryPushStack(q)) // 將q壓入棧
 9                     lazySetNext(q, null); // 失敗重置q的next域
10             }
11         }
12     }

 

CoCompletion

 1     static final class CoCompletion extends Completion {
 2         BiCompletion<?, ?, ?> base;
 3 
 4         CoCompletion(BiCompletion<?, ?, ?> base) {
 5             this.base = base;
 6         }
 7 
 8         final CompletableFuture<?> tryFire(int mode) {
 9             BiCompletion<?, ?, ?> c;
10             CompletableFuture<?> d;
11             if ((c = base) == null || (d = c.tryFire(mode)) == null) // 調用的還是BiCompletion的tryFire方法
12                 return null;
13             base = null;
14             return d;
15         }
16 
17         final boolean isLive() {
18             BiCompletion<?, ?, ?> c;
19             return (c = base) != null && c.dep != null;
20         }
21     }

 

biAccept

 1     final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f,
 2             BiAccept<R, S> c) {
 3         Object r, s;
 4         Throwable x;
 5         if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null)
 6             return false; // a和b都完成了,才會往下走
 7         tryComplete: if (result == null) {
 8             if (r instanceof AltResult) {
 9                 if ((x = ((AltResult) r).ex) != null) { // a的異常檢查
10                     completeThrowable(x, r);
11                     break tryComplete;
12                 }
13                 r = null;
14             }
15             if (s instanceof AltResult) {
16                 if ((x = ((AltResult) s).ex) != null) { // b的異常檢查
17                     completeThrowable(x, s);
18                     break tryComplete;
19                 }
20                 s = null;
21             }
22             try {
23                 if (c != null && !c.claim())
24                     return false;
25                 @SuppressWarnings("unchecked")
26                 R rr = (R) r;
27                 @SuppressWarnings("unchecked")
28                 S ss = (S) s;
29                 f.accept(rr, ss); // 執行任務
30                 completeNull();
31             } catch (Throwable ex) {
32                 completeThrowable(ex);
33             }
34         }
35         return true;
36     }

 

BiAccept

 1     static final class BiAccept<T, U> extends BiCompletion<T, U, Void> {
 2         BiConsumer<? super T, ? super U> fn;
 3 
 4         BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
 5                 BiConsumer<? super T, ? super U> fn) {
 6             super(executor, dep, src, snd);
 7             this.fn = fn;
 8         }
 9 
10         final CompletableFuture<Void> tryFire(int mode) {
11             CompletableFuture<Void> d;
12             CompletableFuture<T> a;
13             CompletableFuture<U> b;
14             if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this))
15                 return null;
16             dep = null;
17             src = null;
18             snd = null;
19             fn = null;
20             return d.postFire(a, b, mode);
21         }
22     }
23 
24     abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> {
25         CompletableFuture<U> snd; // second source for action
26 
27         BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
28             super(executor, dep, src);
29             this.snd = snd;
30         }
31     }

 thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire

 

acceptEither

 1     public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
 2         return orAcceptStage(null, other, action);
 3     }
 4 
 5     public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
 6         return orAcceptStage(asyncPool, other, action);
 7     }
 8 
 9     private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o,
10             Consumer<? super T> f) {
11         CompletableFuture<U> b;
12         if (f == null || (b = o.toCompletableFuture()) == null)
13             throw new NullPointerException();
14         CompletableFuture<Void> d = new CompletableFuture<Void>();
15         if (e != null || !d.orAccept(this, b, f, null)) {
16             OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f);
17             orpush(b, c);
18             c.tryFire(SYNC);
19         }
20         return d;
21     }
22 
23     final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f,
24             OrAccept<R, S> c) {
25         Object r;
26         Throwable x;
27         if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null)
28             return false; // a和b有一個完成了就往下走
29         tryComplete: if (result == null) {
30             try {
31                 if (c != null && !c.claim())
32                     return false;
33                 if (r instanceof AltResult) { // 異常
34                     if ((x = ((AltResult) r).ex) != null) {
35                         completeThrowable(x, r);
36                         break tryComplete;
37                     }
38                     r = null;
39                 }
40                 @SuppressWarnings("unchecked")
41                 R rr = (R) r;
42                 f.accept(rr); // 執行
43                 completeNull();
44             } catch (Throwable ex) {
45                 completeThrowable(ex);
46             }
47         }
48         return true;
49     }
50 
51     static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> {
52         Consumer<? super T> fn;
53 
54         OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd,
55                 Consumer<? super T> fn) {
56             super(executor, dep, src, snd);
57             this.fn = fn;
58         }
59 
60         final CompletableFuture<Void> tryFire(int mode) {
61             CompletableFuture<Void> d;
62             CompletableFuture<T> a;
63             CompletableFuture<U> b;
64             if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this))
65                 return null;
66             dep = null;
67             src = null;
68             snd = null;
69             fn = null;
70             return d.postFire(a, b, mode);
71         }
72     }
73 
74     final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) {
75         if (c != null) {
76             while ((b == null || b.result == null) && result == null) { // a和b的result都沒好,才會考慮入棧
77                 if (tryPushStack(c)) { // 先入a的棧
78                     if (b != null && b != this && b.result == null) { // 入a的棧成功,b的result還沒好
79                         Completion q = new CoCompletion(c); // a還未結束,用c構建CoCompletion
80                         while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判斷,a和b的result都沒好,才會考慮入棧
81                             lazySetNext(q, null); // 失敗置空q的next域
82                     }
83                     break;
84                 }
85                 lazySetNext(c, null); // 失敗置空c的next域
86             }
87         }
88     }

acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire

 

allOf

 1     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
 2         return andTree(cfs, 0, cfs.length - 1);
 3     }
 4 
 5     static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個數組構建成一棵樹,二叉樹,動態規划
 6         CompletableFuture<Void> d = new CompletableFuture<Void>();
 7         if (lo > hi) // empty
 8             d.result = NIL;
 9         else {
10             CompletableFuture<?> a, b;
11             int mid = (lo + hi) >>> 1;
12             if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null
13                     || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null)
14                 throw new NullPointerException();
15             if (!d.biRelay(a, b)) {
16                 BiRelay<?, ?> c = new BiRelay<>(d, a, b);
17                 a.bipush(b, c); // both
18                 c.tryFire(SYNC);
19             }
20         }
21         return d;
22     }
23 
24     static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And
25         BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
26             super(null, dep, src, snd);
27         }
28 
29         final CompletableFuture<Void> tryFire(int mode) {
30             CompletableFuture<Void> d;
31             CompletableFuture<T> a;
32             CompletableFuture<U> b;
33             if ((d = dep) == null || !d.biRelay(a = src, b = snd))
34                 return null;
35             src = null;
36             snd = null;
37             dep = null;
38             return d.postFire(a, b, mode);
39         }
40     }
41 
42     boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
43         Object r, s;
44         Throwable x;
45         if (a == null || (r = a.result) == null || b == null || (s = b.result) == null)
46             return false; // a和b都結束了才往下執行
47         if (result == null) {
48             if (r instanceof AltResult && (x = ((AltResult) r).ex) != null)
49                 completeThrowable(x, r);
50             else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null)
51                 completeThrowable(x, s);
52             else
53                 completeNull(); // 輔助結點,什么都不做
54         }
55         return true;
56     }

allOf -> andTree -> biRelay -> tryFire -> postFire

 

anyOf

 1     public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
 2         return orTree(cfs, 0, cfs.length - 1);
 3     }
 4 
 5     static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 將一個數組構建成一棵樹,二叉樹,動態規划
 6         CompletableFuture<Object> d = new CompletableFuture<Object>();
 7         if (lo <= hi) {
 8             CompletableFuture<?> a, b;
 9             int mid = (lo + hi) >>> 1;
10             if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null
11                     || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null)
12                 throw new NullPointerException();
13             if (!d.orRelay(a, b)) {
14                 OrRelay<?, ?> c = new OrRelay<>(d, a, b);
15                 a.orpush(b, c);
16                 c.tryFire(SYNC);
17             }
18         }
19         return d;
20     }
21 
22     static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or
23         OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) {
24             super(null, dep, src, snd);
25         }
26 
27         final CompletableFuture<Object> tryFire(int mode) {
28             CompletableFuture<Object> d;
29             CompletableFuture<T> a;
30             CompletableFuture<U> b;
31             if ((d = dep) == null || !d.orRelay(a = src, b = snd))
32                 return null;
33             src = null;
34             snd = null;
35             dep = null;
36             return d.postFire(a, b, mode);
37         }
38     }
39 
40     final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
41         Object r;
42         if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null))
43             return false; // a和b有一個結束就往下進行
44         if (result == null)
45             completeRelay(r);
46         return true;
47     }

anyOf -> orTree -> orRelay -> tryFire -> postFire

數組構建樹

allOf和anyOf都用到了數組構建成樹的策略。

假設有一個任務Z(虛擬的,什么都不做),依賴一組任務[A, B, C, D, E, F, G, H]

對於allOf, 當這組任務都完成時,才會執行Z;對於anyOf, 當這組任務中有任何一個完成,就執行任務Z。

如果這組任務是數組結構或者鏈表結構,我們該如何解決呢?遍歷數組或者是鏈表,當任務都完成或者有一個完成時,就執行Z,需要不停地遍歷,這是輪詢的方法,不合適。

整個基調是回調,是指,當一個任務完成時,會接着執行所有依賴於它的任務。

作為一個數組或者鏈表,該如何應用回調呢?誰在先,誰在后呢?因為不知道哪個任務會先完成,所以沒法確定次序。而且這組任務之間也不應該相互依賴,它們只不過都是被Z依賴。

如果這組任務只有一個的話,那就演變成了X.thenXXX(Z), 如果這組任務有兩個的話,allOf -> Both,anyOf -> Either

 

如果Z依賴Z1,Z2兩個個任務,Z1和Z2依賴Z11,Z12和Z21,Z22四個任務,依次類推,當虛擬的任務的個數達到真實任務的個數的一半時,就讓虛擬任務監聽真實的任務,動態規划加二叉樹,時間復雜度也只是logn級別的。

 1     static String array2Tree(String[] cfs, int lo, int hi) {
 2         String d = new String(cfs[lo] + cfs[hi]);
 3         if (lo <= hi) {
 4             String a, b;
 5             int mid = (lo + hi) >>> 1; // 二分
 6             if (lo == mid) { // a作為左半部分的的結果
 7                 a = cfs[lo]; // 當只有不超過兩個元素時,a直接取第一個值
 8             } else {
 9                 a = array2Tree(cfs, lo, mid);
10             }
11             if (lo == hi) { // 當只有一個元素的時候,b取a的值
12                 b = a;
13             } else {
14                 if (hi == mid + 1) { // 右半部分只有兩個元素時,b取第二個元素的值
15                     b = cfs[hi];
16                 } else {
17                     b = array2Tree(cfs, mid + 1, hi);
18                 }
19             }
20             if (a == null || b == null) {
21                 throw new NullPointerException();
22             }
23             System.out.println("[" + a + "][" + b + "]->[" + d + "]");
24         }
25         return d;
26     }

 

Console

[A][B]->[AB]
[C][D]->[CD]
[AB][CD]->[AD]
[E][F]->[EF]
[G][H]->[GH]
[EF][GH]->[EH]
[AD][EH]->[AH]

 

如下圖

 

對於allOf, Z只要保證Z1和Z2都完成了就行,Z1和Z2分別保證Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22則分別保證了A-H任務都完成。

對應anyOf, Z 只要保證Z1和Z2有一個完成了就像,Z1和Z2聯合保證了Z11,Z12,Z21,Z22這4個任務只要有一個完成了就行,同理,Z11,Z12,Z21,Z22則聯合保證了A-H中有一個任務完成了就行。

然后,Z就可以執行了,其實Z什么也沒做,只是從這組任務里得出一個結果。

 

行文至此結束。

 

尊重他人的勞動,轉載請注明出處:http://www.cnblogs.com/aniao/p/aniao_cf.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM