Java並發包異步執行器CompletableFuture


前言

CompletableFuture是對Future的一種強有力的擴展,Future只能通過輪詢isDone()方法或者調用get()阻塞等待獲取一個異步任務的結果,才能繼續執行下一步,當我們執行的異步任務很多,而且相互之前還要依賴結果的時候,可能會創建很多這樣的Future,並通過get或者輪詢等待執行結果返回之后繼續執行,這樣的代碼顯得很不方便而且也不高效。

通過前面的CompletionStage接口給我們提供了一系列將多個階段(甚至是異步的)的結果相互關聯執行的方法,如果把它和Future結合起來,那么可將這種便利與高效編程方式用於異步任務的執行。CompletableFuture就是這樣的一個類,同時繼承了CompletionStage和Future,可以幫助我們簡化異步編程的復雜性,並且提供了函數式編程的能力,可以通過lambda表達式的風格處理各個執行階段的結果。

實現

CompletableFuture通過以下策略實現了接口CompletionStage:

  1. 依賴的非異步階段提供的操作可以由完成當前CompletableFuture的線程執行,也可以由完成方法的任何其他調用者執行。
  2. 所有沒有顯式指定Executor參數的異步方法都使用ForkJoinPool.commonPool執行(除非它不支持至少兩個並行級別,否則將創建一個新線程來運行每個任務).為了簡化監視、調試和跟蹤,所有生成的異步任務都是CompletableFuture.AsynchronousCompletionTask的實例。
  3. 所有實現CompletionStage的接口方法都是獨立於其他公共方法實現的,因此一個方法的行為不會受到子類中其他方法重寫的影響。

CompletableFuture通過以下策略實現了接口Future:

  1. CompletableFuture將取消看作是異常完成的另一種形式。cancel方法具有與completeExceptionally(new CancellationException())相同的效果。
  2. 在以CompletionException異常完成的情況下,get()和get(long, TimeUnit)方法拋出一個ExecutionException,其異常原因與對應的CompletionException中的原因相同。為了簡化在大多數上下文中的使用,這個類還定義了join()和getNow方法,它們在這種情況下直接拋出CompletionException。

以下是CompletableFuture的內部實現概述:

由於CompletableFuture可以依賴其他一個甚至多個CompletableFuture,所以在內部實現的時候,每一個CompletableFuture都擁有一個依賴操作棧,棧中的元素是Completion的子類,它包含相關的操作、CompletableFuture以及源操作。當一個CompletableFuture完成之后會從棧中彈出並遞歸執行那些依賴它的CompletableFuture。由於依賴棧中的那些Completion元素也包含CompletableFuture對象,其CompletableFuture對象可能也擁有一個依賴棧,因此將形成一個非常復雜的依賴樹。

CompletableFuture對每一種形式的實現使用了不同的Completion子類,例如:單輸入(UniCompletion)、雙輸入(BiCompletion)、投影(使用BiCompletion兩個輸入中的任何一個(而不是兩個)的雙輸入)、共享(CoCompletion,由兩個源中的第二個使用)、零輸入(不消費不產出的Runnable)操作和解除阻塞等待(get()、join()方法)的信號器Signallers。Completion類擴展了ForkJoinTask來啟用異步執行(不增加空間開銷,因為我們利用它的“標記”方法來維護聲明).它還被聲明為Runnable,可以被任意線程池調度執行。

CompletableFuture又在UniCompletion、BiCompletion、CoCompletion等這幾種Completion子類的基礎上擴展出了實現CompletionStage具體接口方法的前綴為"Uni", "Bi", "Or"的子類。例如實現單個輸入、兩個輸入、兩者之一的thenApply對應的就是UniApply、BiApply、OrApply。CompletableFuture在實現CompletionStage接口方法甚至自己獨有的方法使都采用了相同的模式,以及調度策略,因此只要立即了一種方法的實現,其他方法都是類似的原理。

源碼簡述

runAsync/supplyAsync

雖然CompletableFuture提供了無參的構造方法,但我們一般從它的靜態方法開始,根據是否有返回值,它對外提供了兩種形式的執行異步任務的方法:

1 //執行無返回值的異步任務
2 public static CompletableFuture<Void> runAsync(Runnable runnable)
3 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) 
4 
5 //執行有返回值的異步任務
6 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

它們都以async為后綴,根據CompletionStage的接口定義規律也可以知道是通過異步安排執行,又比如方法中帶有run表示不消費也不產出型方法,再如,參數帶有Executor的用自定義的線程池調度執行,否則使用默認的ForkJoinPool.commonPool執行。對於不支持並行運算的環境,例如單核CPU,CompletableFuture默認將采用一個任務創建一個Thread實例的方式執行。

我們以supplyAsync(Supplier<U> supplier)方法為例,繼續向下分析:

 1 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
 2     return asyncSupplyStage(asyncPool, supplier);
 3 }
 4 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
 5                                                  Supplier<U> f) {
 6     if (f == null) throw new NullPointerException();
 7     CompletableFuture<U> d = new CompletableFuture<U>(); //新創建一個CompletableFuture
 8     e.execute(new AsyncSupply<U>(d, f)); //安排異步執行
 9     return d; //立即返回
10 }
View Code

 可見,supplyAsync具體的實現調用了asyncSupplyStage,這也是CompletableFuture的內部實現慣例,每一種方法的實現都對應一個xStage方法。用於創建stage對象(這里就是實現了CompletionStage接口的CompletableFuture),並安排任務的執行。這里由於是異步任務,所以直接創建了異步任務實例AsyncSupply,然后交給線程池執行。接着看AsyncSupply實現:

 1 //實現了ForkJoinTask,Runnable可以被ForkJoinPool,或者其他實現Executor的自定義線程池調度
 2 static final class AsyncSupply<T> extends ForkJoinTask<Void> 
 3         implements Runnable, AsynchronousCompletionTask {
 4     CompletableFuture<T> dep; //依賴的CompletableFuture
 5     Supplier<T> fn;              //具體的任務執行邏輯
 6     AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
 7         this.dep = dep; this.fn = fn;
 8     }
 9 
10     public final Void getRawResult() { return null; }
11     public final void setRawResult(Void v) {}
12     //exec由ForkJoinPool調度執行,最終直接調用run
13     public final boolean exec() { run(); return true; }
14 
15     //由ForkJoinPool間接調度,或其他自定義線程池直接調用
16     public void run() {
17         CompletableFuture<T> d; Supplier<T> f;
18         if ((d = dep) != null && (f = fn) != null) {
19             dep = null; fn = null;
20             if (d.result == null) { //result為空表示任務還沒完成
21                 try {
22                     //執行任務並將結果設置到依賴的CompletableFuture
23                     d.completeValue(f.get());
24                 } catch (Throwable ex) {
25                     //異常完成的情況
26                     d.completeThrowable(ex);
27                 }
28             }
29             //從依賴棧中彈出並觸發執行依賴當前CompletableFuture的其他階段
30             d.postComplete();    
31         }
32     }
33 }
View Code

 AsyncSupply實現了ForkJoinTask,Runnable是為了兼容ForkJoinPool線程池和其他自定義的Executor線程池實現,run方法就是線程調度時執行任務的邏輯,就是執行給定的操作,並將結果設置到當前任務對應的CompletableFuture對象d(也就是依賴該任務的階段),最后通過d.postComplete觸發其他依賴階段d的其他任務執行。postComplete的邏輯如下:

 1 //遞歸觸發其他依賴當前階段的其他階段執行
 2 final void postComplete() {
 3     /*
 4      * On each step, variable f holds current dependents to pop and run.
 5      * It is extended along only one path at a time, pushing others to avoid unbounded recursion.
 6      */
 7     CompletableFuture<?> f = this; Completion h;
 8     while ((h = f.stack) != null ||        //f對應的棧不為空
 9            (f != this && (h = (f = this).stack) != null)) { //f對應的棧為空了,重新回到this,繼續另一條路徑
10         CompletableFuture<?> d; Completion t;
11         if (f.casStack(h, t = h.next)) { //將f的棧頂元素h出棧
12             if (t != null) { //表示出棧的h不是最后一個元素
13                 if (f != this) { //f不是this,即不是當前棧
14                     pushStack(h); //將f出棧的元素h壓入當前的棧,這里是為了避免遞歸層次太深
15                     continue;
16                 }
17                 h.next = null;    // detach 輔助GC
18             }
19             //tryFire就是觸發當前棧的棧頂h被執行,完成之后又返回依賴h的其它CompletableFuture,
20             //使其通過while循環繼續觸發依賴它的其余階段任務的執行
21             f = (d = h.tryFire(NESTED)) == null ? this : d; 
22         }
23     }
24 }
View Code

 postComplete的代碼很簡短,但是其代表的邏輯含義確非常不容易立即,這主要是因為形成的依賴樹結構復雜,總之,postComplete就是遞歸的觸發依賴當前階段的其他任務的執行,它一次只沿着一條路徑將其壓入當前棧,避免遞歸調用的層次太深。具體的觸發其他任務的執行是通過內嵌模式的tryFire方法來完成的,嵌套模式只用於這里。為了理解tryFire,我們再以thenApply為例。

CompletionStage實現之thenApply

這是實現CompletionStage的接口方法thenApply,它包含三種形式(同步、默認線程池的異步、指定線程池的異步)這里我以同步模式為例,一般來說,我們都會以下面這種形式使用thenApply:

1 CompletableFuture.supplyAsync(() -> {
2             try {
3                 Thread.sleep(3000);
4             } catch (InterruptedException e) {
5                 e.printStackTrace();
6             }
7             return "hello";
8         }).thenApply(s -> s + " world")
9           .thenAccept(System.out::println);

 按照CompletableFuture的慣例,thenApply由一個uniApplyStage方法實現,創建一個新的CompletableFuture,並安排任務執行:

 1 private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {
 2     if (f == null) throw new NullPointerException();
 3     CompletableFuture<V> d =  new CompletableFuture<V>(); //創建CompletableFuture實例
 4     if (e != null         //Executor不為空,表示需要安排異步執行
 5             || !d.uniApply(this, f, null)) {    //嘗試立即同步執行
 6         //需要被安排異步執行,或者依賴的上一個階段this還沒完成,需要等待
 7         UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);    //構造UniApply實例
 8         push(c);    //將當前任務入棧,注意這里入的上一個階段即this的棧,作為依賴其的階段
 9         c.tryFire(SYNC);    //
10     }
11     return d;
12 }
View Code

 對於同步任務,在入棧等待前會通過一個布爾型的uniApply方法先嘗試安排執行這個任務,這個布爾型的方法也是CompletableFuture實現其他多種形式的方法的慣例,對應每一種形式的方法實現都有一個這樣的返回布爾型的方法:uniAccept、uniRun、uniWhenComplete、uniHandle、uniExceptionally、uniCompose、biApply等等。

 1 //根據依賴的上一個階段a是否完成,看要不要立即安排當前任務執行
 2 //返回true表示已經同步完成執行了當前任務。為false表示依賴的階段a還沒完成,需要等待,或者已經安排異步執行(如果是異步任務的話)
 3 final <S> boolean uniApply(CompletableFuture<S> a,
 4                            Function<? super S,? extends T> f,
 5                            UniApply<S,T> c) {
 6     Object r; Throwable x;
 7     if (a == null || (r = a.result) == null || f == null)
 8         return false;    //表示依賴的階段a還沒完成,還不能執行當前階段
 9     tryComplete: if (result == null) { //依賴的階段a已經完成,當前階段還沒完成
10         if (r instanceof AltResult) {
11             //如果依賴的階段a是異常結束,那么當前階段也異常結束
12             if ((x = ((AltResult)r).ex) != null) {
13                 completeThrowable(x, r);
14                 break tryComplete;
15             }
16             r = null;
17         }
18         //到這里表示依賴的階段a是正常結束
19         try {
20             if (c != null && !c.claim())
21                 return false; //只有在c不為空,並且不能被執行或者已經安排異步執行才會返回false
22                 
23             //拿到已經完成的依賴階段a的結果,執行同步執行當前任務,並把結果設置到當前CompletableFuture階段
24             @SuppressWarnings("unchecked") S s = (S) r;
25             completeValue(f.apply(s));
26         } catch (Throwable ex) {
27             //異常完成的處理
28             completeThrowable(ex);
29         }
30     }
31     return true;
32 }
33 
34 
35 //通過自定義TAG,標記任務正在被執行,保證任務只會被執行一次。
36 //該方法只會在不能被執行或者已經安排異步執行才會返回false
37 final boolean claim() {
38     Executor e = executor;
39     //解鎖成功,表示可以執行了
40     if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
41         if (e == null)    //需要被安排同步執行,立即返回true
42             return true;
43         executor = null; // disable 賦值GC
44         e.execute(this);    //否則立即安排異步執行
45     }
46     return false;
47 }
View Code

 這個布爾型的方法返回true表示已經同步完成執行了當前任務。為false表示依賴的上一個階段a還沒完成,需要等待,或者已經安排異步執行(如果是異步任務的話),其中的claim方法通過CAS加鎖保證任務只會被執行一次,同時還可以安排異步任務的執行。

回到uniApplyStage,如果是異步任務,或者還不能立即執行的同步任務(因為上一個階段還沒結束),則創建UniApply實例,並入棧,但在入棧之后,還會通過tryFire進行一次嘗試同步執行,下面來看其tryFire實現:

 1 //UniCompletion是Completion的子類,Completion繼承了ForkJoinTask,實現了Runnable, AsynchronousCompletionTask 
 2 //UniCompletion的子類,主要就是實現tryFire
 3 static final class UniApply<T,V> extends UniCompletion<T,V> {
 4     Function<? super T,? extends V> fn;
 5     UniApply(Executor executor, CompletableFuture<V> dep,
 6              CompletableFuture<T> src,
 7              Function<? super T,? extends V> fn) {
 8         super(executor, dep, src); this.fn = fn;
 9     }
10     
11     //根據不同的模式,嘗試觸發執行
12     final CompletableFuture<V> tryFire(int mode) {
13         CompletableFuture<V> d; CompletableFuture<T> a;
14         if ((d = dep) == null || //已經執行過的階段的dep才會為null
15             !d.uniApply(a = src, fn, mode > 0 ? null : this)) //嘗試執行
16             return null;    //已經安排異步任務異步執行,或者同步任務需要等待,返回null
17         dep = null; src = null; fn = null;
18         return d.postFire(a, mode);    //同步任務已經執行完成,觸發依賴它的其他階段執行
19     }
20 }
View Code

 首先,UniApply是UniCompletion的子類,UniCompletion是Completion的子類,Completion繼承了ForkJoinTask,實現了Runnable, AsynchronousCompletionTask ,所以不論是同步任務還是異常任務其實都是 AsynchronousCompletionTask 的實現類。

可見tryFire還是調用uniApply方法嘗試執行的,不過這時候其第三個參數c不再是null,而是當前任務。因此會有機會執行claim方法來安排異步任務被線程池調度執行。在同步任務完成之后,通過postFire清理棧,並觸發其他依賴該階段的其他階段執行。

 1 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
 2     if (a != null && a.stack != null) {     //源棧不為空
 3         if (mode < 0 || a.result == null) //是內嵌模式或者源階段a還沒完成,清理一下其棧
 4             a.cleanStack();
 5         else
 6             a.postComplete(); //否則不是內嵌模式,並且源階段a已經結束,繼續觸發依賴該階段的其他階段
 7     }
 8     if (result != null && stack != null) { //當前階段已經完成,並且有依賴它的其他階段
 9         if (mode < 0)    //內嵌模式,返回當前階段
10             return this;
11         else
12             postComplete(); //同步或者異常模式,觸發依賴它的其他階段執行
13     }
14     return null;
15 }
View Code

 這里的postFire中mode小於0的內嵌模式就是上面supplyAsync中postComplete的while循環中傳遞給tryFire的參數,它會返回this,避免了遞歸太深。

 通過以上源碼的分析,可見跟在supplyAsync/runAsync異步階段后面的同步階段的執行可能會是調用整個階段的外部主線程,也可能是執行異步階段的線程池中的線程。如果在安排同步任務的時候,剛好上一個異步階段已經結束,那么就會使用外部主線程執行,否則入棧之后,在異步任務完成后,會通過內嵌的方式由執行異步任務的線程池中的線程調度執行,而異步任務則始終會被線程池中的線程調度執行。

實現CompletionStage的其他接口方法都是類似thenApply相同的套路模式,就不一一列舉了。其中有一個與用戶方法不對應的“Relay”類/方法.它們將結果從一個階段復制到另一個階段,用於輔助實現其中的一些方法。

Future實現

cancel和isCancelled

說完了關於CompletionStage的實現,接下來介紹一下實現Future的接口,首先從最簡單的取消開始,cancel / isCancelled:

 1 //如果尚未完成,則使用CancellationException異常完成此CompletableFuture。
 2 //尚未完成的依賴CompletableFutures也將以一個CancellationException導致的CompletionException異常完成。
 3 public boolean cancel(boolean mayInterruptIfRunning) {
 4     boolean cancelled = (result == null) &&             //還沒完成
 5         internalComplete(new AltResult(new CancellationException()));    //嘗試以CancellationException異常完成
 6     postComplete();     //觸發依賴它的其他階段也異常結束
 7     return cancelled || isCancelled();
 8 }
 9 
10 //如果此CompletableFuture在正常完成之前被取消,則返回true。
11 public boolean isCancelled() {
12     Object r;
13     //若結果是CancellationException異常,則表示是被取消的
14     return ((r = result) instanceof AltResult) && 
15         (((AltResult)r).ex instanceof CancellationException);
16 }
View Code

 可見,cancel只會嘗試取消還沒完成的CompletableFuture(即還沒有設置結果字段result),由於cancel 的參數mayInterruptIfRunning並沒有使用,一旦任務已經在執行了,則不會中斷其執行。取消行為只會在其未完成之前修改其結果指示其已經被取消,即使已經處於執行中的任務最后成功完成也不能再修改其結果。取消成功(即把result成功修改為被取消狀態)的將會以CancellationException異常完成。不論取消是否成功,都會通過postComplete遞歸的觸發依賴當前階段的其他任務的嘗試執行。

isDone()方法只要有了結果(result字段不為null),不論是正常還是異常結束都會返回true,由於CompletableFutures內部實現的時候將本身返回null的結果包裝成了AltResult對象,所以當返回null結果時也不例外。

get()

對Future的實現最重要的就是非get莫屬了:

 1 //阻塞等待結果,可以被中斷。
 2 public T get() throws InterruptedException, ExecutionException {
 3     Object r;
 4     return reportGet((r = result) == null ? waitingGet(true) : r);
 5 }
 6 //在等待之后返回原始結果,如果是可中斷或已經中斷的,則返回null。
 7 private Object waitingGet(boolean interruptible) {
 8     Signaller q = null;
 9     boolean queued = false;
10     int spins = -1;
11     Object r;
12     while ((r = result) == null) { //只要沒完成就繼續
13         if (spins < 0) //設置多處理上的自旋次數
14             spins = (Runtime.getRuntime().availableProcessors() > 1) ?
15                 1 << 8 : 0; // Use brief spin-wait on multiprocessors
16         else if (spins > 0) { //自旋
17             if (ThreadLocalRandom.nextSecondarySeed() >= 0)
18                 --spins;
19         } //自旋結束,還沒完成,初始化Signaller
20         else if (q == null) 
21             q = new Signaller(interruptible, 0L, 0L);
22         else if (!queued) //將Signaller入棧
23             queued = tryPushStack(q);
24         else if (interruptible && q.interruptControl < 0) { //可以被中斷,並且已經被中斷
25             q.thread = null;    //輔助GC
26             cleanStack(); //清理一下其棧
27             return null;  返回null
28         }
29         else if (q.thread != null && result == null) {
30             try {
31                 ForkJoinPool.managedBlock(q); //阻塞等待
32             } catch (InterruptedException ie) {
33                 q.interruptControl = -1;  //被中斷了
34             }
35         }
36     }
37     
38     //到這里說明result已經有了結果了
39     if (q != null) {
40         q.thread = null; //輔助GC
41         if (q.interruptControl < 0) { //有被中斷過
42             if (interruptible)
43                 r = null; // report interruption    //若支持中斷,則返回null
44             else
45                 Thread.currentThread().interrupt(); //不支持中斷,則補償中斷標記
46         }
47     }
48     postComplete(); //遞歸觸發其他依賴當前階段的其他階段執行
49     return r;    //支持中斷並且被中斷過,返回null,否則返回原始結果
50 }
51     
52 //使用Future報告結果。
53 private static <T> T reportGet(Object r)
54     throws InterruptedException, ExecutionException {
55     if (r == null) // 結果為null表示可以被中斷,並且被中斷了,立即拋出中斷異常
56         throw new InterruptedException();
57     if (r instanceof AltResult) { //空結果或者異常結果
58         Throwable x, cause;
59         if ((x = ((AltResult)r).ex) == null) //空結果返回null
60             return null;
61         if (x instanceof CancellationException) //被取消的,返回CancellationException異常
62             throw (CancellationException)x;
63         if ((x instanceof CompletionException) &&
64             (cause = x.getCause()) != null)        //CompletionException異常結果的,返回導致其異常結束的異常
65             x = cause;
66         throw new ExecutionException(x); //其他運行時異常,包裝成CompletionException異常拋出
67     }
68     @SuppressWarnings("unchecked") T t = (T) r; //非異常結束,返回正常結果
69     return t;
70 }
View Code

 先說結果:get()方法是可以被中斷的,因此發生中斷的話將會拋出InterruptedException(即使已經拿到結果),被取消的則會拋出CancellationException異常,任務執行產生的其他異常導致異常結束的,其異常會被封裝成ExecutionException拋出。

其實現過程首先使用waitingGet方法返回獲得的結果,如果支持中斷並且被中斷過則返回null,否則返回響應的結果,等待的過程采用了自旋 + ForkJoinPool.managedBlock方式,它將調用get方法等待結果也視為一種任務,構造成Completion的子類Signaller進入被等待執行結束的CompletableFutures依賴棧,一旦它完成就會通過postComplete方法的tryFire觸發執行,其tryFire會喚醒阻塞的線程,從而使get方法返回,值得注意的是,這里阻塞使用了 ForkJoinPool.managedBlock的方法,因為阻塞的線程可能是ForkJoinPool線程池中的工作線程,為了不讓線程池中的任務由於過多的工作線程被阻塞導致飢餓堆積等待, ForkJoinPool.managedBlock在阻塞的時候會激活新的線程補償當前阻塞的線程,保證線程池的並行性。

最后,reportGet方法根據waitingGet方法返回的結果,該跑異常的跑異常,該返回正常結果的,返回正常結果。

超時版本的get方法,其實現阻塞等待的方法timedGet方法與waitingGet原理差不多,除了因為也支持中斷可能拋出InterruptedException之外,還可能會因超時拋出TimeoutException,就不再詳細說明了。

CompletableFutures自己的方法

T join()

join方法不是實現Future接口的方法,是CompletableFutures自己的方法,它與get方法的作用都是等待執行返回結果,但是它不支持中斷,必須要等到執行結果:

 1 public T join() {
 2     Object r;
 3     return reportJoin((r = result) == null ? waitingGet(false) : r);
 4 }
 5 
 6 //解析結果,或者拋出未捕獲的異常
 7 private static <T> T reportJoin(Object r) {
 8     if (r instanceof AltResult) { //空結果或者異常結果
 9         Throwable x;
10         if ((x = ((AltResult)r).ex) == null) //空結果返回null
11             return null;
12         if (x instanceof CancellationException)
13             throw (CancellationException)x; //被取消的,返回CancellationException異常
14         if (x instanceof CompletionException)
15             throw (CompletionException)x;  //CompletionException異常
16         throw new CompletionException(x);    //其他異常也被包裝成CompletionException異常
17     }
18     @SuppressWarnings("unchecked") T t = (T) r;
19     return t;
20 }
View Code

可見,join與get方法的區別有兩點:一、不支持中斷,一定要等到執行有了結果才會返回,但若被中斷過,在返回前會恢復中斷標記;二、除了被取消拋出CancellationException,其他異常一律被包裝成CompletionException拋出。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

當所有給定的CompletableFutures都執行結束時,它就完成,或者其中任何一個異常結束,它也會立即以同樣的異常結束。列表中任何一個CompletableFuture的結果都不會反映到返回的CompletableFuture中,但可以遍歷列表通過其get方法獲取。如果一個CompletableFuture都沒指定,即cfs為長度為0的空數組,那么將返回一個完成結果為null的CompletableFuture。該方法的應用之一是,在繼續一個程序之前,等待一組相互獨立的任務完成,例如:CompletableFuture.allOf(c1, c2, c3).join();

由於allOf返回的是無返回結果的CompletableFuture,因此不能直接獲取那些列表中每一個CompletableFuture的結果,例如這樣將打印出null:

System.out.println(CompletableFuture.allOf(a,b,c,....).thenApply(V -> V).join());

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

當給定的CompletableFutures其中任何一個執行結束時,不論是正常還是異常結束,它就以相同的結果或者異常結束。注意異常結束的會被包裝成CompletionException異常。如果一個CompletableFuture都沒指定,即cfs為長度為0的空數組,那么將返回一個執行沒有結束的CompletableFuture。注意,如果給定的cfs數組中的CompletableFuture其返回類型不一致,那么anyOf的最終返回的CompletableFuture的結果的類型也將不確定。不像allOf返回的Void的結果,anyOf可以用返回的結果繼續處理,例如:

1 CompletableFuture.anyOf(
2                 CompletableFuture.supplyAsync(() -> "Tom"),
3                 CompletableFuture.supplyAsync(() -> "John"),
4                 CompletableFuture.supplyAsync(() -> "Jack")
5         ).thenApply(name -> "hello "+ name)
6         .thenAccept(System.out::println);
View Code

 T getNow(T valueIfAbsent)

1 public T getNow(T valueIfAbsent) {
2     Object r;
3     return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
4 }
View Code

如果已經執行完成,就返回與執行join時相同的結果:即被取消的拋出CancellationException,正常結果的返回結果,其他異常一律被包裝成CompletionException拋出。如果調用該方法的時候還沒執行結束,則返回指定的結果。注意,在發現執行還沒有結束時,該方法並沒有強制使執行結束。

public boolean complete(T value)

1 public boolean complete(T value) {
2     //以指定的值設置到結果result字段,只會在result還是null的時候成功
3     boolean triggered = completeValue(value);
4     postComplete();
5     return triggered;
6 }
View Code

如果執行還沒結束,就以指定的值作為其執行結果,並觸發依賴它的其他階段執行。由於該方法直接修改了執行的結果,即使后面該任務真正的邏輯執行完之后也不能再更新該result,所以如果此方法成功,那么調用該階段的get、join方法返回的值就是該方法指定的value值。

public boolean completeExceptionally(Throwable ex) 

如果還沒執行結果,就以指定的異常作為其執行結果,並觸發依賴它的其他階段執行。由於該方法直接修改了執行的結果,即使后面該任務真正的邏輯執行完之后也不能再更新該result,所以如果此方法成功,那么調用該階段的get、join方法也將拋出異常,不同的是,get可能拋出被封裝成ExecutionException的異常,join可能拋出被封裝成CompletionException的異常。

public void obtrudeValue(T value) 和 public void obtrudeException(Throwable ex)

強制以指定的值或者異常作為當前階段的執行結果,不論其是否已經完成。與complete和completeExceptionally不同,complete和completeExceptionally只會在執行還沒結束的情況,更新結果。在這只后返回的get、join也將返回相應的結果或異常。這兩方法一般用於錯誤恢復,但也不一定有用。

public boolean isCompletedExceptionally() 

如果執行以異常完成,則該方法返回true,這里的異常結束包括被取消,被completeExceptionally和obtrudeException方法主動異常結束,當然也包括任務執行過程中異常結束的情況。

public int getNumberOfDependents() 

返回等待該CompletableFutures完成的其他CompletableFutures的估計個數,只是一個瞬態值,一般用於監控系統狀態。

構造方法 

除了使用靜態方法runAsync,supplyAsync之外,構造一個CompletableFutures還可以通過以下幾種途徑,首先就是構造方法:

CompletableFutures提供了兩個構造方法,一個無參,一個可以指定結果:

1 public CompletableFuture() {
2 }
3 
4 private CompletableFuture(Object r) {
5     this.result = r;
6 }
View Code

 另一種就是使用靜態方法completedFuture:

public static <U> CompletableFuture<U> completedFuture(U value)

其內部還是使用的有參的構造方法返回一個CompletableFuture實例。利用構造無參的構造方法可以這些實現異步任務的執行:

 1 public static CompletableFuture<String> asyncDoSomething(String a, String b){
 2         CompletableFuture future = new CompletableFuture();
 3         new Thread(() -> {
 4             try{
 5                 Thread.sleep(TimeUnit.SECONDS.toSeconds(10000));
 6                 future.complete(a + b);
 7             }catch (Exception e){
 8                 future.completeExceptionally(e);
 9             }
10         }).start();
11         return future;
12     }
13     public static void main(String[] args) {
14         asyncDoSomething("hello"," world").thenAccept(System.out::println);
15     }
View Code

 但,一般我們也不會這樣用了,我這里只是為了舉一個例子而已。 

 

結束語

本文簡要介紹了CompletableFuture的實現源碼,其是CompletionStage與Future接口的實現類,它提供了大量的用於異步或同步執行任務的方法,這些方法的使用大都在上一篇CompletionStage的講述中做了介紹,其通過鏈表棧形成的樹形結構組織那些具有依賴關系的各個階段CompletableFuture,異步任務的執行默認通過ForkJoinPool.commonPool執行(其創建的工作線程都是守護線程,不用擔心JVM掛起),除非指定了Executor參數,因為默認的線程池並行度為CPU核心數並發度並不高,因此大多數時候我們都會指定自定義的Executor

實現類。同步任務的執行有可能是被主線程執行,也可能是被完成上一個階段的線程池中的線程執行。

另外所有實現CompletionStage的接口方法都是獨立於其他公共方法實現的,基本上每一個方法都對應了一個內部類,因此一個方法的行為不會受到子類中其他方法重寫的影響。

實現Future的cancel方法不會中中斷已經被安排執行的任務,僅僅是在任務的結果還沒被回寫之前,更新其結果為被取消狀態,一旦將其結果設置為被取消狀態,還沒有開始仔執行的將不會被調度執行,已經在執行的,最后就算正常完成也不能再修改結果。

Future.get與CompletableFuture的join方法除了join不能被中斷之外,對異常結束分別會將異常包裝成ExecutionException、CompletionException,當然InterruptedException、TimeoutException、CancellationException除外,通常我們都會使用join而不是get,大概是join方法不需要處理異常,而get方法有InterruptedException、TimeoutException異常需要處理吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM