前言
回調,顧名思義,回過頭來調用,詳細的說來就是用戶無需關心內部實現的具體邏輯,只需要在暴露出的回調函數中放入自己的業務邏輯即可。由於回調機制解耦了框架代碼和業務代碼,所以可以看做是對面向對象解耦的具體實踐之一。由於本文的側重點在於講解后端回調,所以對於前端回調甚至於類似JSONP的回調函數類的,利用本章講解的知識進行代入的時候,請斟酌一二,畢竟后端和前端還是有一定的區別,所謂差之毫厘,可能謬以千里,慎之。所以本章對回調的講解側重於后端,請知悉。
回調定義
說到回調,其實我的理解類似於函數指針的功能,怎么講呢?因為一個方法,一旦附加了回調入參,那么用戶在進行調用的時候,這個回調入參是可以用匿名方法直接替代的。回調的使用必須和方法的簽名保持一致性,下面我們來看一個JDK實現的例子:
default boolean removeIf(Predicate<? super E> filter) { Objects.requireNonNull(filter); boolean removed = false; final Iterator<E> each = iterator(); while (each.hasNext()) { if (filter.test(each.next())) { each.remove(); removed = true; } } return removed; }
在JDK中,List結構有一個removeIf的方法,其實現方式如上所示。由於附帶了具體的注釋講解,我這里就不再進行過多的講述。我們需要着重關注的是其入參:Predicate,因為他就是一個函數式接口,入參為泛型E,出參為boolean,其實和Function<? super E, boolean>是等價的。由於List是一個公共的框架代碼,里面不可能糅合業務代碼,所以為了解耦框架代碼和業務代碼,JDK使用了內置的各種函數式接口作為方法的回調,將具體的業務實踐拋出去,讓用戶自己實現,而它自己只接受用戶返回的結果就行了:只要用戶處理返回true(filter.test(each.next()返回true),那么我就刪掉當前遍歷的數據;如果用戶處理返回false(filter.test(each.next()返回false),那么我就保留當前遍歷的數據。是不是非常的nice?
其實這種完美的協作關系,在JDK類庫中隨處可見,在其他經常用到的框架中也很常見,諸如Guava,Netty,實在是太多了(這也從側面說明,利用函數式接口解耦框架和業務,是正確的做法),我擷取了部分片段如下:
//將map中的所有entry進行替換 void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) //將map中的entry進行遍歷 void forEach(BiConsumer<? super K, ? super V> action) //map中的entry值如果有,則用新值重新建立映射關系 V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) //Deque遍歷元素 void forEach(Consumer<? super T> action) //Deque按給定條件移除元素 boolean removeIf(Predicate<? super E> filter) //Guava中獲取特定元素 <T> T get(Object key, final Callable<T> valueLoader) //Netty中設置監聽 ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
那么,回過頭來想想,如果我們封裝自己的組件,想要封裝的很JDK Style,該怎么做呢?如果上來直接理解Predicate,Function,Callable,Consumer,我想很多人是有困難的,那就聽我慢慢道來吧。
我們先假設如下一段代碼,這段代碼我相信很多人會很熟悉,很多人也封裝過,這就是我們的大名鼎鼎的RedisUtils封裝類:
/** * key遞增特定的num * @param key Redis中保存的key * @param num 遞增的值 * @return key計算完畢后返回的結果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result; try { result = jrc.incrBy(key, num); } catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num); Profiler.functionError(callerInfo); throw new RuntimeException("sendCouponService.redis.incrBy異常,key=" + key, e); } finally { Profiler.registerInfoEnd(callerInfo); } return result; }
上面這段代碼只是一個示例,其實還有上百個方法基本上都是這種封裝結構,這種封裝有問題嗎?沒問題!而且封裝方式辛辣老道,一看就是高手所為,因為既加了監控,又做了異常處理,而且還有錯誤日志記錄,一旦發生問題,我們能夠第一時間知道哪里出了問題,哪個方法出了問題,然后設定對應的應對方法。
這種封裝方式,如果當做普通的Util來用,完全沒有問題,但是如果想封裝成組件,則欠缺點什么,我列舉如下:
1. 當前代碼寫死了用jrc操作,如果后期切換到jimdb,是不是還得為jimdb專門寫一套呢?
2. 當前代碼,上百個方法,其實很多地方都是重復的,唯有redis操作那塊不同,代碼重復度特別高,一旦擴展新方法,基本上是剖解原有代碼,然后拷貝現有方法,最后改成新方法。
3. 當前方法,包含的都是redis單操作,如果遇到那種涉及到多個操作組合的(比如先set,然后expire或者更復雜一點),需要添加新方法,本質上這種新方法其實和業務性有關了。
從上面列出的這幾點來看,其實我們可以完全將其打造成一個兼容jrc操作和cluster操作,同時具有良好框架擴展性(策略模式+模板模式)和良好代碼重復度控制(函數式接口回調)的框架。由於本章涉及內容為異步回調,所以這里我們將講解這種代碼如何保持良好的代碼重復度控制上。至於良好的框架擴展性,如果感興趣的話,我會在后面的章節進行講解。那么我們開始進行優化吧。
首先,找出公共操作部分(白色)和非公共操作部分(黃色):
/** * key遞增特定的num * @param key Redis中保存的key * @param num 遞增的值 * @return key計算完畢后返回的結果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result;
try { result = jrc.incrBy(key, num);
} catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
Profiler.functionError(callerInfo);
return null;
} finally { Profiler.registerInfoEnd(callerInfo); } return result; }
通過上面的標記,我們發現非公共操作部分,有兩類:
1. ump提示語和日志提示語不一致
2. 操作方法不一致
標記出來了公共操作部分,之后我們開始封裝公共部分:
/** * 公共模板抽取 * * @param method * @param callable * @param <T> * @return */ public static <T> T invoke(String method) { CallerInfo info = Profiler.registerInfo(method, false, true); try { //TODO 這里放置不同的redis操作方法 } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
但是這里有個問題,我們雖然把公共模板抽取出來了,但是TODO標簽里面的內容怎么辦呢? 如何把不同的redis操作方法傳遞進來呢?
其實在java中,我們可以利用接口的方式,將具體的操作代理出去,由外部調用者來實現,聽起來是不是感覺又和IOC搭上了點關系,不錯,你想的沒錯,這確實是控制反轉依賴注入的一種做法,通過接口方式將具體的實踐代理出去,這也是進行回調操作的原理。接下來看我們的改造:
/** * redis操作接口 */ public interface RedisOperation<T>{ //調用redis方法,入參為空,出參為T泛型 T invoke(); } /** * redis操作公共模板 * @param method * @param redisOperation * @param <T> * @return */ public static <T> T invoke(String method,RedisOperation redisOperation) { CallerInfo info = Profiler.registerInfo(method, false, true); try { return redisOperation.invoke(); } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
這樣,我們就打造好了一個公共的redis操作模板,之后就可以像下面的方式來使用了:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> { return redisUtils.incrBy(key, val); }; return CacheHelper.invoke(method, process); }
之后的一百多個方法,你也可以使用這樣的方式來一一進行包裝,之后你會發現原來RedisUtils封裝完畢,代碼寫了2000行,但是用這種方式之后,代碼只寫了1000行,而且后續有新的聯合操作過來,你只需要在如下代碼段里面直接把級聯操作添加進去即可:
RedisOperation<Long> process = () -> { //TODO other methods //TODO other methods return redisUtils.incrBy(key, val); };
是不是很方便快捷?在這里我需要所以下的是,由於RedisOperation里面的invoke方法是沒有入參,帶有一個出參結果的調用。所以在回調這里,我用了匿名表達式來()->{}來match這種操作。但是如果回調這里,一個入參,一個出參的話,那么我的匿名表達式需要這樣寫 param->{}, 多個入參,那就變成了這樣 (param1, param2, param3)->{} 。由於這里並非重點,我不想過多講解,如果對這種使用方式不熟悉,可以完全使用如下的方式來進行書寫也行:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> incrByOperation(key, val); return CacheHelper.invoke(method, process); } private Long incrByOperation(String key, long val){ return redisUtils.incrBy(key, val); }
其實說到這里的時候,我就有必要提一下開頭的埋下的線索了。其實之前演示的Netty的代碼:
//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
GenericFutureListener這個接口
就是按照上面的寫法來做的,是不是豁然開朗呢?至於其調用方式,也和上面講解的一致,只要符合接口里面方法的調用標准就行(入參和出參符合就行), 比如 future –> {}。
說到這里,我們可能認為這樣太麻煩了,自己定義接口,然后注入到框架中,最后用戶自己實現調用方法,一長串。是的,你說的沒錯,這樣確實太麻煩了,JDK於是專門用了一個 FunctionalInterface的annotation來幫我們做了,所以在JDK中,如果你看到Consumer,Function,Supplier等,帶有@FunctionalInterface標注的接口,那么就說明他是一個函數式接口,而這種接口是干什么的,具體的原理就是我上面講的。下面我們來梳理梳理這些接口吧。
先看一下我們的RedisUtils使用JDK自帶的函數式接口的最終封裝效果:
從圖示代碼可以看出,整體封裝變得簡潔許多,而且我們用了JDK內置的函數式接口,所以也無需寫其他多余的代碼,看上去很清爽,重復代碼基本不見了。而且,由於JDK提供的其他的函數式接口有運算操作,比如Predicate.or, Predicate.and操作等,大大加強了封裝的趣味性和樂趣。
下面我將JDK中涉及的常用的函數式接口列舉一遍,然后來詳細講解講解吧,列表如下:
Consumer, 提供void accept(T t)回調 Runnable, 提供void run()回調 Callable, 提供V call() throws Exception回調 Supplier, 提供T get()回調 Function, 提供R apply(T t)回調, 有andThen接續操作 Predicate, 提供boolean test(T t)回調, 等價於 Function<T, boolean> BiConsumer, 提供void accept(T t, U u)回調,注意帶Bi的回調接口,表明入參都是雙參數,比如BiPredicate
......
其實還有很多,我這里就不一一列舉了。感興趣的朋友可以在這里找到JDK提供的所有函數式接口。
接下來,我們來講解其使用示范,以便於明白怎么樣去使用它。
對於Consumer函數式接口,內部的void accept(T t)回調方法,表明了它只能回調有一個入參,沒有返參的方法。示例如下:
/** * Consumer調用的例子 */ public void ConsumerSample() { LinkedHashMap linkedHashMap = new LinkedHashMap(); linkedHashMap.put("key", "val"); linkedHashMap.forEach((k, v) -> { System.out.println("key" + k + ",val" + v); }); }
對於Callable接口,其實和Supplier接口是一樣的,只是有無Exception拋出的區別,示例如下:
/** * Callable調用的例子 */ public Boolean setnx(String key, String val){ String method = "com.jd.marketing.util.RedisUtil.setnx"; Callable<Boolean> process = () -> { Long rst = redisUtils.setnx(key, val); if(rst == null || rst == 0){ return false; } return true; }; return CacheHelper.invoke(method, process); }
對於Predicate<T>接口,等價於Function<T, Boolean>, 示例如下:
/** * Precidate調用的例子 */ public void PredicateSample() { List list = new ArrayList(); list.add("a") list.removeIf(item -> { return item.equals("a"); }); }
說明一下,Predicate的入參為一個參數,出參為boolean,很適合進行條件判斷的場合。在JDK的List數據結構中,由於removeIf方法無法耦合進去業務代碼,所以利用Predicate函數式接口將業務邏輯實現部分拋給了用戶自行處理,用戶處理完畢,只要返回給我true,我就刪掉當前的item;返回給我false,我就保留當前的item。解耦做的非常漂亮。那么List的removeIf實現方式你覺得是怎樣實現的呢?如果我不看JDK代碼的話,我覺得實現方式如下:
public boolean removeIf(Predicate<T> predicate){ final Iterator<T> iterator = getIterator(); while (iterator.hasNext()) { T current = iterator.next(); boolean result = predicate.test(current); if(result){ iterator.remove(); return true; } } return false; }
但是實際你去看看List默認的removeIf實現,源碼大概和我寫的差不多。所以只要理解了函數式接口,我們也能寫出JDK Style的代碼,酷吧。
CompletableFuture實現異步處理
好了,上面就是函數式接口的整體介紹和使用簡介,不知道你看了之后,理解了多少呢?接下來我們要講解的異步,完全基於上面的函數式接口回調,如果之前的都看懂了,下面的講解你將豁然開朗;反之則要悟了。但是正確的方向都已經指出來了,所以入門應該是沒有難度的。
CompletableFuture,很長的一個名字,我對他的印象停留在一次代碼評審會上,當時有人提到了這個類,我只是簡單的記錄下來了,之后去JDK源碼中搜索了一下,看看主要干什么的,也沒有怎么想去看它。結果當我搜到這個類,然后看到Author的時候,我覺得我發現了金礦一樣,於是我決定深入的研究下去,那個作者的名字就是:
/** * A {@link Future} that may be explicitly completed (setting its * value and status), and may be used as a {@link CompletionStage}, * supporting dependent functions and actions that trigger upon its * completion. * * <p>When two or more threads attempt to * {@link #complete complete}, * {@link #completeExceptionally completeExceptionally}, or * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * * <p>In addition to these and related methods for directly * manipulating status and results, CompletableFuture implements * interface {@link CompletionStage} with the following policies: <ul> * * <li>Actions supplied for dependent completions of * <em>non-async</em> methods may be performed by the thread that * completes the current CompletableFuture, or by any other caller of * a completion method.</li> * * <li>All <em>async</em> methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in * which case, a new Thread is created to run each task). To simplify * monitoring, debugging, and tracking, all generated asynchronous * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}. </li> * * <li>All CompletionStage methods are implemented independently of * other public methods, so the behavior of one method is not impacted * by overrides of others in subclasses. </li> </ul> * * <p>CompletableFuture also implements {@link Future} with the following * policies: <ul> * * <li>Since (unlike {@link FutureTask}) this class has no direct * control over the computation that causes it to be completed, * cancellation is treated as just another form of exceptional * completion. Method {@link #cancel cancel} has the same effect as * {@code completeExceptionally(new CancellationException())}. Method * {@link #isCompletedExceptionally} can be used to determine if a * CompletableFuture completed in any exceptional fashion.</li> * * <li>In case of exceptional completion with a CompletionException, * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an * {@link ExecutionException} with the same cause as held in the * corresponding CompletionException. To simplify usage in most * contexts, this class also defines methods {@link #join()} and * {@link #getNow} that instead throw the CompletionException directly * in these cases.</li> </ul> * * @author Doug Lea * @since 1.8 */
Doug Lea,Java並發編程的大神級人物,整個JDK里面的並發編程包,幾乎都是他的作品,很務實的一個老爺子,目前在紐約州立大學奧斯威戈分校執教。比如我們異常熟悉的AtomicInteger類也是其作品:
/** * An {@code int} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicInteger} is used in applications such as atomically * incremented counters, and cannot be used as a replacement for an * {@link java.lang.Integer}. However, this class does extend * {@code Number} to allow uniform access by tools and utilities that * deal with numerically-based classes. * * @since 1.5 * @author Doug Lea */ public class AtomicInteger extends Number implements java.io.Serializable { // ignore code }
想查閱老爺子的最新資料,建議到Wikipedia上查找,里面有他的博客鏈接等,我這里就不再做過多介紹,回到正題上來,我們繼續談談CompletableFuture吧。我剛才貼的關於這個類的描述,都是英文的,而且特別長,我們不妨貼出中文釋義來,看看具體是個什么玩意兒:
繼承自Future,帶有明確的結束標記;同時繼承自CompletionStage,支持多函數調用行為直至完成態。
當兩個以上的線程對CompletableFuture進行complete調用,completeExceptionally調用或者cancel調用,只有一個會成功。
為了直觀的保持相關方法的狀態和結果,CompletableFuture按照如下原則繼承並實現了CompletionStage接口:
1. 多個同步方法的級聯調用,可能會被當前的CompletableFuture置為完成態,也可能會被級聯函數中的任何一個方法置為完成態。
2. 異步方法的執行,默認使用ForkJoinPool來進行(如果當前的並行標記不支持多並發,那么將會為每個任務開啟一個新的線程來進行)。
為了簡化監控,調試,代碼跟蹤等,所有的異步任務必須繼承自AsynchronousCompletionTask。
3. 所有的CompletionStage方法都是獨立的,overrid子類中的其他的方法並不會影響當前方法行為。
CompletableFuture同時也按照如下原則繼承並實現了Future接口:
1. 由於此類無法控制完成態(一旦完成,直接返回給調用方),所以cancellation被當做是另一種帶有異常的完成狀態. 在這種情況下cancel方法和CancellationException是等價的。
方法isCompletedExceptionally可以用來監控CompletableFuture在一些異常調用的場景下是否完成。
2. get方法和get(long, TimeUint)方法將會拋出ExecutionException異常,一旦計算過程中有CompletionException的話。
為了簡化使用,這個類同時也定義了join()方法和getNow()方法來避免CompletionException的拋出(在CompletionException拋出之前就返回了結果)。
由於沒有找到中文文檔,所以這里自行勉強解釋了一番,有些差強人意。
在我們日常生活中,我們的很多行為其實都是要么有結果的,要么無結果的。比如說做蛋糕,做出來的蛋糕就是結果,那么一般我們用Callable或者Supplier來代表這個行為,因為這兩個函數式接口的執行,是需要有返回結果的。再比如說吃蛋糕,吃蛋糕這個行為,是無結果的。因為他僅僅代表我們去干了一件事兒,所以會用Consumer或者Runnable來代表吃飯這個行為。因為這兩個函數式接口的執行,是不返回結果的。有時候我發現家里沒有做蛋糕的工具,於是我便去外面的蛋糕店委托蛋糕師傅給我做一個,那么這種委托行為,其實就是一種異步行為,會用Future來描述。因為Future神奇的地方在於,可以讓一個同步執行的方法編程異步的,就好似委托蛋糕師傅做蛋糕一樣。這樣我們就可以在蛋糕師傅給我們做蛋糕期間去做一些其他的事兒,比如聽音樂等等。但是由於Future不具有事件完成告知的能力,所以得需要自己去一遍一遍的問師傅,做好了沒有。而CompletableFuture則具有這種能力,所以總結起來如下:
- Callable,有結果的同步行為,比如做蛋糕
- Runnable,無結果的同步行為,比如吃蛋糕
- Future,異步封裝Callable/Runnable,比如委托給蛋糕師傅(其他線程)去做
- CompletableFuture,封裝Future,使其擁有回調功能,比如讓師傅主動告訴我蛋糕做好了
那么上面描述的場景,我們用代碼封裝一下吧:
public static void main(String... args) throws Exception { CompletableFuture .supplyAsync(() -> makeCake()) .thenAccept(cake -> eatCake(cake)); System.out.println("先回家聽音樂,蛋糕做好后給我打電話,我來取..."); Thread.currentThread().join(); } private static Cake makeCake() { System.out.println("我是蛋糕房,開始為你制作蛋糕..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } Cake cake = new Cake(); cake.setName("草莓蛋糕"); cake.setShape("圓形"); cake.setPrice(new BigDecimal(99)); System.out.println("蛋糕制作完畢,請取回..."); return cake; } private static void eatCake(Cake cake) { System.out.println("這個蛋糕是" + cake.getName() + ",我喜歡,開吃..."); }
最后執行結果如下:
我是蛋糕房,開始為你制作蛋糕... 先回家聽音樂,蛋糕做好后給我打電話,我來取... 蛋糕制作完畢,請取回... 這個蛋糕是草莓蛋糕,我喜歡,開吃...
由於CompletableFuture的api有50幾個,數量非常多,我們可以先將其划分為若干大類(摘自理解CompletableFuture,總結的非常好,直接拿來用):
創建類:用於CompletableFuture對象創建,比如:
- completedFuture
- runAsync
- supplyAsync
- anyOf
- allOf
狀態取值類:用於判斷當前狀態和同步等待取值,比如:
- join
- get
- getNow
- isCancelled
- isCompletedExceptionally
- isDone
控制類:可用於主動控制CompletableFuture完成行為,比如:
- complete
- completeExceptionally
- cancel
接續類:CompletableFuture最重要的特性,用於注入回調行為,比如:
- thenApply, thenApplyAsync
- thenAccept, thenAcceptAsync
- thenRun, thenRunAsync
- thenCombine, thenCombineAsync
- thenAcceptBoth, thenAcceptBothAsync
- runAfterBoth, runAfterBothAsync
- applyToEither, applyToEitherAsync
- acceptEither, acceptEitherAsync
- runAfterEither, runAfterEitherAsync
- thenCompose, thenComposeAsync
- whenComplete, whenCompleteAsync
- handle, handleAsync
- exceptionally
上面的方法非常多,而大多具有相似性,我們大可不必馬上記憶。先來看看幾個一般性的規律,便可輔助記憶(重要):
- 以Async后綴結尾的方法,均是異步方法,對應無Async則是同步方法。
- 以Async后綴結尾的方法,一定有兩個重載方法。其一是采用內部forkjoin線程池執行異步,其二是指定一個Executor去運行。
- 以run開頭的方法,其方法入參的lambda表達式一定是
無參數
,並且無返回值
的,其實就是指定Runnable - 以supply開頭的方法,其方法入參的lambda表達式一定是
無參數
,並且有返回值
,其實就是指Supplier - 以Accept為開頭或結尾的方法,其方法入參的lambda表達式一定是
有參數
,但是無返回值
,其實就是指Consumer - 以Apply為開頭或者結尾的方法,其方法入參的lambda表達式一定是
有參數
,但是有返回值
,其實就是指Function - 帶有either后綴的表示誰先完成則消費誰。
以上6條記住之后,就可以記住60%以上的API了。
先來看一下其具體的使用方式吧(網上有個外國人寫了CompletableFuture的20個例子,我看有中文版了,放到這里,大家可以參考下)。
/** * CompletableFuture調用completedFuture方法,表明執行完畢 */ static void sample1() { CompletableFuture cf = CompletableFuture.completedFuture("message"); Assert.assertTrue(cf.isDone()); Assert.assertEquals("message", cf.getNow(null)); }
sample1代碼,可以看出,如果想讓一個ComopletableFuture執行完畢,最簡單的方式就是調用其completedFuture方法即可。之后就可以用getNow對其結果進行獲取,如果獲取不到就返回默認值null。
/** * 兩個方法串行執行,后一個方法依賴前一個方法的返回 */ static void sample2() { CompletableFuture cf = CompletableFuture .completedFuture("message") .thenApply(message -> { Assert.assertFalse(Thread.currentThread().isDaemon()); return message.toUpperCase(); }); Assert.assertEquals("MESSAGE", cf.getNow(null)); }
sample2代碼,利用thenApply實現兩個函數串行執行,后一個函數的執行以來前一個函數的返回結果。
/** * 兩個方法並行執行,兩個都執行完畢后,在進行匯總 */ static void sample3() { long start = System.currentTimeMillis(); CompletableFuture cf = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture cf1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{ System.out.println("都完成了"); }).join(); long end = System.currentTimeMillis(); System.out.println((end-start)); }
sample3最后的執行結果為:
都完成了
2087
可以看到,耗時為2087毫秒,如果是串行執行,需要耗時3000毫秒,但是並行執行,則以最長執行時間為准,其實這個特性在進行遠程RPC/HTTP服務調用的時候,將會非常有用,我們一會兒再進行講解如何用它來反哺業務。
/** * 方法執行取消 */ static void sample4(){ CompletableFuture cf = CompletableFuture.supplyAsync(()->{ try { System.out.println("開始執行函數..."); Thread.sleep(2000); System.out.println("執行函數完畢..."); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; }); CompletableFuture cf2 = cf.exceptionally(throwable -> { return throwable; }); cf2.cancel(true); if(cf2.isCompletedExceptionally()){ System.out.println("成功取消了函數的執行"); } cf2.join(); }
調用結果如下:
開始執行函數... 成功取消了函數的執行 Exception in thread "main" java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263) at com.jd.jsmartredis.article.Article.sample4(Article.java:108) at com.jd.jsmartredis.article.Article.main(Article.java:132)
可以看到我們成功的將函數執行中斷,同時由於cf2返回的會一個throwable的Exception,所以我們的console界面將其也原封不動的打印了出來。
講解了基本使用之后,如何使用其來反哺我們的業務呢?我們就以通用下單為例吧,來看看通用下單有哪些可以優化的點。
上圖就是我們在通用下單接口經常調用的接口,分為下單地址接口,商品信息接口,京豆接口,由於這三個接口沒有依賴關系,所以可以並行的來執行。如果換做是目前的做法,那么肯定是順序執行,假如三個接口獲取都耗時1s的話,那么三個接口獲取完畢,我們的耗時為3s。但是如果改成異步方式執行的話,那么將會簡單很多,接下來,我們開始改造吧。
public Result submitOrder(String pin, CartVO cartVO) { //獲取下單地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //獲取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }); //獲取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("獲取地址,商品,京豆信息失敗", throwable); //TODO 嘗試重新獲取 } else { logger.error("獲取地址,商品,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 后續處理 }
這樣,我們利用將普通的RPC執行編程了異步,而且附帶了強大的錯誤處理,是不是很簡單?
但是如果遇到如下圖示的調用結構,CompletableFuture能否很輕松的應對呢?
由於業務變更,需要附帶延保信息,為了后續重新計算價格,所以必須將延保商品獲取出來,然后計算價格。其實這種既有同步,又有異步的做法,利用CompletableFuture來handle,也是輕松自然,代碼如下:
public Result submitOrder(String pin, CartVO cartVO) { //獲取下單地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //獲取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }).thenApplyAsync((goodsResult, Map)->{ YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin); Map<String, Object> map = new HashMap<>(); map.put("good", goodsResult); map.put("yanbao",yanbaoResult); return map; }); //獲取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("獲取地址,商品-延保,京豆信息失敗", throwable); //TODO 嘗試重新獲取 } else { logger.error("獲取地址,商品-延保,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 后續處理 }
這樣我們就可以了,當然這種改造給我們帶來的好處也是顯而易見的,我們不需要針對所有的接口進行OPS優化,而是針對性能最差的接口進行OPS優化,只要提升了性能最差的接口,那么整體的性能就上去了。
洋洋灑灑寫了這么多,希望對大家有用,謝謝。
參考資料: