微言異步回調


前言

回調,顧名思義,回過頭來調用,詳細的說來就是用戶無需關心內部實現的具體邏輯,只需要在暴露出的回調函數中放入自己的業務邏輯即可。由於回調機制解耦了框架代碼和業務代碼,所以可以看做是對面向對象解耦的具體實踐之一。由於本文的側重點在於講解后端回調,所以對於前端回調甚至於類似JSONP的回調函數類的,利用本章講解的知識進行代入的時候,請斟酌一二,畢竟后端和前端還是有一定的區別,所謂差之毫厘,可能謬以千里,慎之。所以本章對回調的講解側重於后端,請知悉。

回調定義

說到回調,其實我的理解類似於函數指針的功能,怎么講呢?因為一個方法,一旦附加了回調入參,那么用戶在進行調用的時候,這個回調入參是可以用匿名方法直接替代的。回調的使用必須和方法的簽名保持一致性,下面我們來看一個JDK實現的例子:

    default boolean removeIf(Predicate<? super E> filter) {
        Objects.requireNonNull(filter);
        boolean removed = false;
        final Iterator<E> each = iterator();
        while (each.hasNext()) {
            if (filter.test(each.next())) {
                each.remove();
                removed = true;
            }
        }
        return removed;
    }

在JDK中,List結構有一個removeIf的方法,其實現方式如上所示。由於附帶了具體的注釋講解,我這里就不再進行過多的講述。我們需要着重關注的是其入參:Predicate,因為他就是一個函數式接口,入參為泛型E,出參為boolean,其實和Function<? super E, boolean>是等價的。由於List是一個公共的框架代碼,里面不可能糅合業務代碼,所以為了解耦框架代碼和業務代碼,JDK使用了內置的各種函數式接口作為方法的回調,將具體的業務實踐拋出去,讓用戶自己實現,而它自己只接受用戶返回的結果就行了:只要用戶處理返回true(filter.test(each.next()返回true),那么我就刪掉當前遍歷的數據;如果用戶處理返回false(filter.test(each.next()返回false),那么我就保留當前遍歷的數據。是不是非常的nice?

其實這種完美的協作關系,在JDK類庫中隨處可見,在其他經常用到的框架中也很常見,諸如Guava,Netty,實在是太多了(這也從側面說明,利用函數式接口解耦框架和業務,是正確的做法),我擷取了部分片段如下:

//將map中的所有entry進行替換
void replaceAll(BiFunction<? super K, ? super V, ? extends V> function)
//將map中的entry進行遍歷
void forEach(BiConsumer<? super K, ? super V> action)
//map中的entry值如果有,則用新值重新建立映射關系
V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

//Deque遍歷元素
void forEach(Consumer<? super T> action)
//Deque按給定條件移除元素
boolean removeIf(Predicate<? super E> filter)

//Guava中獲取特定元素
<T> T get(Object key, final Callable<T> valueLoader) 

//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

那么,回過頭來想想,如果我們封裝自己的組件,想要封裝的很JDK Style,該怎么做呢?如果上來直接理解Predicate,Function,Callable,Consumer,我想很多人是有困難的,那就聽我慢慢道來吧。

我們先假設如下一段代碼,這段代碼我相信很多人會很熟悉,很多人也封裝過,這就是我們的大名鼎鼎的RedisUtils封裝類:

  /**
     * key遞增特定的num
     * @param key Redis中保存的key
     * @param num 遞增的值
     * @return key計算完畢后返回的結果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            throw new RuntimeException("sendCouponService.redis.incrBy異常,key=" + key, e);
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

上面這段代碼只是一個示例,其實還有上百個方法基本上都是這種封裝結構,這種封裝有問題嗎?沒問題!而且封裝方式辛辣老道,一看就是高手所為,因為既加了監控,又做了異常處理,而且還有錯誤日志記錄,一旦發生問題,我們能夠第一時間知道哪里出了問題,哪個方法出了問題,然后設定對應的應對方法。

這種封裝方式,如果當做普通的Util來用,完全沒有問題,但是如果想封裝成組件,則欠缺點什么,我列舉如下:

1. 當前代碼寫死了用jrc操作,如果后期切換到jimdb,是不是還得為jimdb專門寫一套呢?

2. 當前代碼,上百個方法,其實很多地方都是重復的,唯有redis操作那塊不同,代碼重復度特別高,一旦擴展新方法,基本上是剖解原有代碼,然后拷貝現有方法,最后改成新方法。

3. 當前方法,包含的都是redis單操作,如果遇到那種涉及到多個操作組合的(比如先set,然后expire或者更復雜一點),需要添加新方法,本質上這種新方法其實和業務性有關了。

從上面列出的這幾點來看,其實我們可以完全將其打造成一個兼容jrc操作和cluster操作,同時具有良好框架擴展性(策略模式+模板模式)和良好代碼重復度控制(函數式接口回調)的框架。由於本章涉及內容為異步回調,所以這里我們將講解這種代碼如何保持良好的代碼重復度控制上。至於良好的框架擴展性,如果感興趣的話,我會在后面的章節進行講解。那么我們開始進行優化吧。

首先,找出公共操作部分(白色)和非公共操作部分(黃色):

/**
     * key遞增特定的num
     * @param key Redis中保存的key
     * @param num 遞增的值
     * @return key計算完畢后返回的結果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy異常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            return null;
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

通過上面的標記,我們發現非公共操作部分,有兩類:

1. ump提示語和日志提示語不一致

2. 操作方法不一致

標記出來了公共操作部分,之后我們開始封裝公共部分:

/**
     * 公共模板抽取
     *
     * @param method
     * @param callable
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
            //TODO 這里放置不同的redis操作方法
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

但是這里有個問題,我們雖然把公共模板抽取出來了,但是TODO標簽里面的內容怎么辦呢? 如何把不同的redis操作方法傳遞進來呢?

其實在java中,我們可以利用接口的方式,將具體的操作代理出去,由外部調用者來實現,聽起來是不是感覺又和IOC搭上了點關系,不錯,你想的沒錯,這確實是控制反轉依賴注入的一種做法,通過接口方式將具體的實踐代理出去,這也是進行回調操作的原理。接下來看我們的改造:

    /**
     * redis操作接口
     */
    public interface RedisOperation<T>{
        //調用redis方法,入參為空,出參為T泛型
        T invoke();
    }

    /**
     *  redis操作公共模板
     * @param method
     * @param  redisOperation
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method,RedisOperation redisOperation) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
           return  redisOperation.invoke();
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

這樣,我們就打造好了一個公共的redis操作模板,之后就可以像下面的方式來使用了:

    @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> {
            return redisUtils.incrBy(key, val);
        };
        return CacheHelper.invoke(method, process);
    }

之后的一百多個方法,你也可以使用這樣的方式來一一進行包裝,之后你會發現原來RedisUtils封裝完畢,代碼寫了2000行,但是用這種方式之后,代碼只寫了1000行,而且后續有新的聯合操作過來,你只需要在如下代碼段里面直接把級聯操作添加進去即可:

 RedisOperation<Long> process = () -> {
           //TODO other methods
           //TODO other methods
            return redisUtils.incrBy(key, val);
};

是不是很方便快捷?在這里我需要所以下的是,由於RedisOperation里面的invoke方法是沒有入參,帶有一個出參結果的調用。所以在回調這里,我用了匿名表達式來()->{}來match這種操作。但是如果回調這里,一個入參,一個出參的話,那么我的匿名表達式需要這樣寫 param->{}, 多個入參,那就變成了這樣 (param1, param2, param3)->{} 。由於這里並非重點,我不想過多講解,如果對這種使用方式不熟悉,可以完全使用如下的方式來進行書寫也行:

 @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> incrByOperation(key, val);
        return CacheHelper.invoke(method, process);
    }
   
   private Long incrByOperation(String key, long val){
       return redisUtils.incrBy(key, val);
   }

其實說到這里的時候,我就有必要提一下開頭的埋下的線索了。其實之前演示的Netty的代碼:

//Netty中設置監聽
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

GenericFutureListener這個接口

就是按照上面的寫法來做的,是不是豁然開朗呢?至於其調用方式,也和上面講解的一致,只要符合接口里面方法的調用標准就行(入參和出參符合就行), 比如 future –> {}。

說到這里,我們可能認為這樣太麻煩了,自己定義接口,然后注入到框架中,最后用戶自己實現調用方法,一長串。是的,你說的沒錯,這樣確實太麻煩了,JDK於是專門用了一個 FunctionalInterface的annotation來幫我們做了,所以在JDK中,如果你看到Consumer,Function,Supplier等,帶有@FunctionalInterface標注的接口,那么就說明他是一個函數式接口,而這種接口是干什么的,具體的原理就是我上面講的。下面我們來梳理梳理這些接口吧。

先看一下我們的RedisUtils使用JDK自帶的函數式接口的最終封裝效果:

image

從圖示代碼可以看出,整體封裝變得簡潔許多,而且我們用了JDK內置的函數式接口,所以也無需寫其他多余的代碼,看上去很清爽,重復代碼基本不見了。而且,由於JDK提供的其他的函數式接口有運算操作,比如Predicate.or, Predicate.and操作等,大大加強了封裝的趣味性和樂趣。

下面我將JDK中涉及的常用的函數式接口列舉一遍,然后來詳細講解講解吧,列表如下:

Consumer, 提供void accept(T t)回調

Runnable, 提供void run()回調

Callable, 提供V call() throws Exception回調

Supplier, 提供T get()回調

Function, 提供R apply(T t)回調, 有andThen接續操作

Predicate, 提供boolean test(T t)回調, 等價於 Function<T, boolean>

BiConsumer, 提供void accept(T t, U u)回調,注意帶Bi的回調接口,表明入參都是雙參數,比如BiPredicate    
......


其實還有很多,我這里就不一一列舉了。感興趣的朋友可以在這里找到JDK提供的所有函數式接口

接下來,我們來講解其使用示范,以便於明白怎么樣去使用它。

對於Consumer函數式接口,內部的void accept(T t)回調方法,表明了它只能回調有一個入參,沒有返參的方法。示例如下:

/**
     * Consumer調用的例子
     */
    public void ConsumerSample() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("key", "val");
        linkedHashMap.forEach((k, v) -> {
            System.out.println("key" + k + ",val" + v);
        });
    }

對於Callable接口,其實和Supplier接口是一樣的,只是有無Exception拋出的區別,示例如下:

 /**
     * Callable調用的例子
     */
    public Boolean setnx(String key, String val){
        String method = "com.jd.marketing.util.RedisUtil.setnx";
        Callable<Boolean> process = () -> {
            Long rst = redisUtils.setnx(key, val);
            if(rst == null || rst == 0){
                return false;
            }
            return true;
        };
        return CacheHelper.invoke(method, process);
    }

對於Predicate<T>接口,等價於Function<T, Boolean>, 示例如下:

 /**
     * Precidate調用的例子
     */
    public void PredicateSample() {
        List list = new ArrayList();
        list.add("a")
        list.removeIf(item -> {
            return item.equals("a");
        });
    }

說明一下,Predicate的入參為一個參數,出參為boolean,很適合進行條件判斷的場合。在JDK的List數據結構中,由於removeIf方法無法耦合進去業務代碼,所以利用Predicate函數式接口將業務邏輯實現部分拋給了用戶自行處理,用戶處理完畢,只要返回給我true,我就刪掉當前的item;返回給我false,我就保留當前的item。解耦做的非常漂亮。那么List的removeIf實現方式你覺得是怎樣實現的呢?如果我不看JDK代碼的話,我覺得實現方式如下:

 public boolean removeIf(Predicate<T> predicate){
        final Iterator<T> iterator = getIterator();
        while (iterator.hasNext()) {
            T current = iterator.next();
            boolean result = predicate.test(current);
            if(result){
                iterator.remove();
                return true;
            }
        }
        return false;
    }

但是實際你去看看List默認的removeIf實現,源碼大概和我寫的差不多。所以只要理解了函數式接口,我們也能寫出JDK Style的代碼,酷吧。

CompletableFuture實現異步處理

好了,上面就是函數式接口的整體介紹和使用簡介,不知道你看了之后,理解了多少呢?接下來我們要講解的異步,完全基於上面的函數式接口回調,如果之前的都看懂了,下面的講解你將豁然開朗;反之則要悟了。但是正確的方向都已經指出來了,所以入門應該是沒有難度的。

CompletableFuture,很長的一個名字,我對他的印象停留在一次代碼評審會上,當時有人提到了這個類,我只是簡單的記錄下來了,之后去JDK源碼中搜索了一下,看看主要干什么的,也沒有怎么想去看它。結果當我搜到這個類,然后看到Author的時候,我覺得我發現了金礦一樣,於是我決定深入的研究下去,那個作者的名字就是:

/**
 * A {@link Future} that may be explicitly completed (setting its
 * value and status), and may be used as a {@link CompletionStage},
 * supporting dependent functions and actions that trigger upon its
 * completion.
 *
 * <p>When two or more threads attempt to
 * {@link #complete complete},
 * {@link #completeExceptionally completeExceptionally}, or
 * {@link #cancel cancel}
 * a CompletableFuture, only one of them succeeds.
 *
 * <p>In addition to these and related methods for directly
 * manipulating status and results, CompletableFuture implements
 * interface {@link CompletionStage} with the following policies: <ul>
 *
 * <li>Actions supplied for dependent completions of
 * <em>non-async</em> methods may be performed by the thread that
 * completes the current CompletableFuture, or by any other caller of
 * a completion method.</li>
 *
 * <li>All <em>async</em> methods without an explicit Executor
 * argument are performed using the {@link ForkJoinPool#commonPool()}
 * (unless it does not support a parallelism level of at least two, in
 * which case, a new Thread is created to run each task).  To simplify
 * monitoring, debugging, and tracking, all generated asynchronous
 * tasks are instances of the marker interface {@link
 * AsynchronousCompletionTask}. </li>
 *
 * <li>All CompletionStage methods are implemented independently of
 * other public methods, so the behavior of one method is not impacted
 * by overrides of others in subclasses.  </li> </ul>
 *
 * <p>CompletableFuture also implements {@link Future} with the following
 * policies: <ul>
 *
 * <li>Since (unlike {@link FutureTask}) this class has no direct
 * control over the computation that causes it to be completed,
 * cancellation is treated as just another form of exceptional
 * completion.  Method {@link #cancel cancel} has the same effect as
 * {@code completeExceptionally(new CancellationException())}. Method
 * {@link #isCompletedExceptionally} can be used to determine if a
 * CompletableFuture completed in any exceptional fashion.</li>
 *
 * <li>In case of exceptional completion with a CompletionException,
 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
 * {@link ExecutionException} with the same cause as held in the
 * corresponding CompletionException.  To simplify usage in most
 * contexts, this class also defines methods {@link #join()} and
 * {@link #getNow} that instead throw the CompletionException directly
 * in these cases.</li> </ul>
 *
 * @author Doug Lea
 * @since 1.8
 */

Doug Lea,Java並發編程的大神級人物,整個JDK里面的並發編程包,幾乎都是他的作品,很務實的一個老爺子,目前在紐約州立大學奧斯威戈分校執教。比如我們異常熟悉的AtomicInteger類也是其作品:

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
  // ignore code 
}

想查閱老爺子的最新資料,建議到Wikipedia上查找,里面有他的博客鏈接等,我這里就不再做過多介紹,回到正題上來,我們繼續談談CompletableFuture吧。我剛才貼的關於這個類的描述,都是英文的,而且特別長,我們不妨貼出中文釋義來,看看具體是個什么玩意兒:

 繼承自Future,帶有明確的結束標記;同時繼承自CompletionStage,支持多函數調用行為直至完成態。
 當兩個以上的線程對CompletableFuture進行complete調用,completeExceptionally調用或者cancel調用,只有一個會成功。
 
 為了直觀的保持相關方法的狀態和結果,CompletableFuture按照如下原則繼承並實現了CompletionStage接口:

 1. 多個同步方法的級聯調用,可能會被當前的CompletableFuture置為完成態,也可能會被級聯函數中的任何一個方法置為完成態。

 2. 異步方法的執行,默認使用ForkJoinPool來進行(如果當前的並行標記不支持多並發,那么將會為每個任務開啟一個新的線程來進行)。
    為了簡化監控,調試,代碼跟蹤等,所有的異步任務必須繼承自AsynchronousCompletionTask。

 3. 所有的CompletionStage方法都是獨立的,overrid子類中的其他的方法並不會影響當前方法行為。

 CompletableFuture同時也按照如下原則繼承並實現了Future接口:

 1. 由於此類無法控制完成態(一旦完成,直接返回給調用方),所以cancellation被當做是另一種帶有異常的完成狀態. 在這種情況下cancel方法和CancellationException是等價的。
    方法isCompletedExceptionally可以用來監控CompletableFuture在一些異常調用的場景下是否完成。

 2. get方法和get(long, TimeUint)方法將會拋出ExecutionException異常,一旦計算過程中有CompletionException的話。
    為了簡化使用,這個類同時也定義了join()方法和getNow()方法來避免CompletionException的拋出(在CompletionException拋出之前就返回了結果)。
 

由於沒有找到中文文檔,所以這里自行勉強解釋了一番,有些差強人意。

在我們日常生活中,我們的很多行為其實都是要么有結果的,要么無結果的。比如說做蛋糕,做出來的蛋糕就是結果,那么一般我們用Callable或者Supplier來代表這個行為,因為這兩個函數式接口的執行,是需要有返回結果的。再比如說吃蛋糕,吃蛋糕這個行為,是無結果的。因為他僅僅代表我們去干了一件事兒,所以會用Consumer或者Runnable來代表吃飯這個行為。因為這兩個函數式接口的執行,是不返回結果的。有時候我發現家里沒有做蛋糕的工具,於是我便去外面的蛋糕店委托蛋糕師傅給我做一個,那么這種委托行為,其實就是一種異步行為,會用Future來描述。因為Future神奇的地方在於,可以讓一個同步執行的方法編程異步的,就好似委托蛋糕師傅做蛋糕一樣。這樣我們就可以在蛋糕師傅給我們做蛋糕期間去做一些其他的事兒,比如聽音樂等等。但是由於Future不具有事件完成告知的能力,所以得需要自己去一遍一遍的問師傅,做好了沒有。而CompletableFuture則具有這種能力,所以總結起來如下:

  • Callable,有結果的同步行為,比如做蛋糕
  • Runnable,無結果的同步行為,比如吃蛋糕
  • Future,異步封裝Callable/Runnable,比如委托給蛋糕師傅(其他線程)去做
  • CompletableFuture,封裝Future,使其擁有回調功能,比如讓師傅主動告訴我蛋糕做好了

那么上面描述的場景,我們用代碼封裝一下吧:

 public static void main(String... args) throws Exception {
        CompletableFuture
                .supplyAsync(() -> makeCake())
                .thenAccept(cake -> eatCake(cake));
        System.out.println("先回家聽音樂,蛋糕做好后給我打電話,我來取...");
        Thread.currentThread().join();
    }

    private static Cake makeCake() {
        System.out.println("我是蛋糕房,開始為你制作蛋糕...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Cake cake = new Cake();
        cake.setName("草莓蛋糕");
        cake.setShape("圓形");
        cake.setPrice(new BigDecimal(99));
        System.out.println("蛋糕制作完畢,請取回...");
        return cake;
    }

    private static void eatCake(Cake cake) {
        System.out.println("這個蛋糕是" + cake.getName() + ",我喜歡,開吃...");
    }

最后執行結果如下:

我是蛋糕房,開始為你制作蛋糕...
先回家聽音樂,蛋糕做好后給我打電話,我來取...
蛋糕制作完畢,請取回...
這個蛋糕是草莓蛋糕,我喜歡,開吃...

由於CompletableFuture的api有50幾個,數量非常多,我們可以先將其划分為若干大類(摘自理解CompletableFuture,總結的非常好,直接拿來用):

創建類:用於CompletableFuture對象創建,比如:

  • completedFuture
  • runAsync
  • supplyAsync
  • anyOf
  • allOf

狀態取值類:用於判斷當前狀態和同步等待取值,比如:

  • join
  • get
  • getNow
  • isCancelled
  • isCompletedExceptionally
  • isDone

控制類:可用於主動控制CompletableFuture完成行為,比如:

  • complete
  • completeExceptionally
  • cancel

接續類:CompletableFuture最重要的特性,用於注入回調行為,比如:

  • thenApply, thenApplyAsync
  • thenAccept, thenAcceptAsync
  • thenRun, thenRunAsync
  • thenCombine, thenCombineAsync
  • thenAcceptBoth, thenAcceptBothAsync
  • runAfterBoth, runAfterBothAsync
  • applyToEither, applyToEitherAsync
  • acceptEither, acceptEitherAsync
  • runAfterEither, runAfterEitherAsync
  • thenCompose, thenComposeAsync
  • whenComplete, whenCompleteAsync
  • handle, handleAsync
  • exceptionally

上面的方法非常多,而大多具有相似性,我們大可不必馬上記憶。先來看看幾個一般性的規律,便可輔助記憶(重要):

  1. 以Async后綴結尾的方法,均是異步方法,對應無Async則是同步方法。
  2. 以Async后綴結尾的方法,一定有兩個重載方法。其一是采用內部forkjoin線程池執行異步,其二是指定一個Executor去運行。
  3. 以run開頭的方法,其方法入參的lambda表達式一定是無參數,並且無返回值的,其實就是指定Runnable
  4. 以supply開頭的方法,其方法入參的lambda表達式一定是無參數,並且有返回值,其實就是指Supplier
  5. 以Accept為開頭或結尾的方法,其方法入參的lambda表達式一定是有參數,但是無返回值,其實就是指Consumer
  6. 以Apply為開頭或者結尾的方法,其方法入參的lambda表達式一定是有參數,但是有返回值,其實就是指Function
  7. 帶有either后綴的表示誰先完成則消費誰。

以上6條記住之后,就可以記住60%以上的API了。

先來看一下其具體的使用方式吧(網上有個外國人寫了CompletableFuture的20個例子,我看有中文版了,放到這里,大家可以參考下)。

  /**
     * CompletableFuture調用completedFuture方法,表明執行完畢
     */
    static void sample1() {
        CompletableFuture cf = CompletableFuture.completedFuture("message");
        Assert.assertTrue(cf.isDone());
        Assert.assertEquals("message", cf.getNow(null));
    }

sample1代碼,可以看出,如果想讓一個ComopletableFuture執行完畢,最簡單的方式就是調用其completedFuture方法即可。之后就可以用getNow對其結果進行獲取,如果獲取不到就返回默認值null。

 /**
     * 兩個方法串行執行,后一個方法依賴前一個方法的返回
     */
    static void sample2() {
        CompletableFuture cf = CompletableFuture
                .completedFuture("message")
                .thenApply(message -> {
                    Assert.assertFalse(Thread.currentThread().isDaemon());
                    return message.toUpperCase();
                });
        Assert.assertEquals("MESSAGE", cf.getNow(null));
    }

sample2代碼,利用thenApply實現兩個函數串行執行,后一個函數的執行以來前一個函數的返回結果。

/**
     * 兩個方法並行執行,兩個都執行完畢后,在進行匯總
     */
    static void sample3() {

        long start = System.currentTimeMillis();

        CompletableFuture cf = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture cf1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{
            System.out.println("都完成了");
        }).join();
        long end = System.currentTimeMillis();
        System.out.println((end-start));
    }

sample3最后的執行結果為:

都完成了
2087

可以看到,耗時為2087毫秒,如果是串行執行,需要耗時3000毫秒,但是並行執行,則以最長執行時間為准,其實這個特性在進行遠程RPC/HTTP服務調用的時候,將會非常有用,我們一會兒再進行講解如何用它來反哺業務。

 /**
     * 方法執行取消
     */
    static void sample4(){
        CompletableFuture cf = CompletableFuture.supplyAsync(()->{
            try {
                System.out.println("開始執行函數...");
                Thread.sleep(2000);
                System.out.println("執行函數完畢...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "ok";
        });
        CompletableFuture cf2 = cf.exceptionally(throwable -> {
            return throwable;
        });
        cf2.cancel(true);
        if(cf2.isCompletedExceptionally()){
            System.out.println("成功取消了函數的執行");
        }
        cf2.join();

    }

調用結果如下:

開始執行函數...
成功取消了函數的執行
Exception in thread "main" java.util.concurrent.CancellationException
    at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
    at com.jd.jsmartredis.article.Article.sample4(Article.java:108)
    at com.jd.jsmartredis.article.Article.main(Article.java:132)

可以看到我們成功的將函數執行中斷,同時由於cf2返回的會一個throwable的Exception,所以我們的console界面將其也原封不動的打印了出來。

講解了基本使用之后,如何使用其來反哺我們的業務呢?我們就以通用下單為例吧,來看看通用下單有哪些可以優化的點。

image

上圖就是我們在通用下單接口經常調用的接口,分為下單地址接口,商品信息接口,京豆接口,由於這三個接口沒有依賴關系,所以可以並行的來執行。如果換做是目前的做法,那么肯定是順序執行,假如三個接口獲取都耗時1s的話,那么三個接口獲取完畢,我們的耗時為3s。但是如果改成異步方式執行的話,那么將會簡單很多,接下來,我們開始改造吧。

public Result submitOrder(String pin, CartVO cartVO) {

        //獲取下單地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //獲取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        });

        //獲取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("獲取地址,商品,京豆信息失敗", throwable);
                //TODO 嘗試重新獲取
            } else {
                logger.error("獲取地址,商品,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);
        
        //TODO 后續處理
    }

這樣,我們利用將普通的RPC執行編程了異步,而且附帶了強大的錯誤處理,是不是很簡單?

但是如果遇到如下圖示的調用結構,CompletableFuture能否很輕松的應對呢?

image

由於業務變更,需要附帶延保信息,為了后續重新計算價格,所以必須將延保商品獲取出來,然后計算價格。其實這種既有同步,又有異步的做法,利用CompletableFuture來handle,也是輕松自然,代碼如下:

 public Result submitOrder(String pin, CartVO cartVO) {

        //獲取下單地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //獲取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        }).thenApplyAsync((goodsResult, Map)->{
            YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin);
            Map<String, Object> map = new HashMap<>();
            map.put("good", goodsResult);
            map.put("yanbao",yanbaoResult);
            return map;
        });

        //獲取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("獲取地址,商品-延保,京豆信息失敗", throwable);
                //TODO 嘗試重新獲取
            } else {
                logger.error("獲取地址,商品-延保,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);

        //TODO 后續處理
    }

這樣我們就可以了,當然這種改造給我們帶來的好處也是顯而易見的,我們不需要針對所有的接口進行OPS優化,而是針對性能最差的接口進行OPS優化,只要提升了性能最差的接口,那么整體的性能就上去了。

洋洋灑灑寫了這么多,希望對大家有用,謝謝。

參考資料:

理解CompletableFuture

CompletableFuture 詳解

JDK中CompletableFuture的源碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM