從CompletableFuture到異步編程設計


從CompletableFuture到異步編程設計,筆者就分為2部分來分享CompletableFuture異步編程設計,前半部分總結下CompletableFuture使用實踐,后半部分分享下CompletableFuture實現原理和異步編程設計機制。

(ps:本文內容較多,請耐心閱讀。如果讀者了解CompletableFuture使用的話,可以直接看后半部分內容;如果熟悉CompletableFuture及異步編程設計的話,可以直接翻到文檔末尾點個“推薦”就好了,因為你已經掌握了Java異步設計精髓了 :) ,若有不正確地方,感謝評論區指正交流~ )

Java8新增了CompletableFuture類,該類實現了CompletionStage和Future接口,簡化了Java異步編程能力,該類方法較多,其實套路只有一個,那就是任務執行完成之后執行“回調”。

CompletableFuture使用實踐

Java8新增的CompletableFuture 提供對異步計算的支持,可以通過回調的方式處理計算結果。CompletableFuture 類實現了CompletionStage和Future接口,所以還可以像之前使用Future那樣使用CompletableFuture ,盡管已不再推薦這樣用了。

CompletableFuture的創建

// 創建一個帶result的CompletableFuture
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
future.get();
 
// 默認創建的CompletableFuture是沒有result的,這時調用future.get()會一直阻塞下去知道有result或者出現異常
future = new CompletableFuture<>();
try {
    future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // no care
}
 
// 給future填充一個result
future.complete("result");
assert "result".equals(future.get());
 
// 給future填充一個異常
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("exception"));
try {
    future.get();
} catch (Exception e) {
    assert "exception".equals(e.getCause().getMessage());
}

上面的示例是自己設置future的result,一般情況下我們都是讓其他線程或者線程池來執行future這些異步任務。除了直接創建CompletableFuture 對象外(不推薦這樣使用),還可以使用如下4個方法創建CompletableFuture 對象:

// runAsync是Runnable任務,不帶返回值的,如果入參有executor,則使用executor來執行異步任務
public static CompletableFuture<Void>  runAsync(Runnable runnable)
public static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync是待返回結果的異步任務
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

// 使用示例
CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
}, executor);
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});

如果入參不帶executor,則默認使用ForkJoinPool.commonPool()作為執行異步任務的線程池;否則使用executor執行任務。

CompletableFuture的完成動作

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

// 使用示例
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).whenCompleteAsync((result, e) -> {
    System.out.println(result + " " + e);
}).exceptionally((e) -> {
    System.out.println("exception " + e);
    return "exception";
});

action是Action類型,從上面可以看出它既可以處理正常返回值也可以處理異常,whenComplete會在任務執行完成后直接在當前線程內執行action動作,后綴帶Async的方法是交給其他線程執行action(如果是線程池,執行action的可能和之前執行異步任務的是同一個線程),入參帶executor的交給executor線程池來執行action動作,當發生異常時,會在當前線程內執行exceptionally方法。

除了用上面的whenComplete來執行完成動作之外,還可以使用handle方法,該方法可以返回一個新的CompletableFuture的返回類型。

public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

// handle方法示例:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
    System.out.println("handle");
    return 1;
});

 除了使用handle方法來執行CompletableFuture返回類型轉換之外,還可以使用thenApply方法,二者不同的是前者會處理正常返回值和異常,因此可以屏蔽異常,避免繼續拋出;而后者只能處理正常返回值,一旦有異常就會拋出。

public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

// thenApply方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply((r) -> {
    System.out.println(r);
    return "aaa";
}).thenApply((r) -> {
    System.out.println(r);
    return 1;
});

注意,上面的handle、thenApply都是返回新的CompletableFuture類型,如果只是為了在CompletableFuture完成之后執行某些消費動作,而不返回新的CompletableFuture類型,則可以使用thenAccept方法。

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

// thenAccept方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAccept(r -> {
    System.out.println(r);
}).thenAccept(r -> {
    // 這里的r為Void(null)了
    System.out.println(r);
});

上面的handle、thenApply和thenAppept都是對上一個CompletableFuture執行完的結果進行某些操作。那么可不可以同時對2個CompletableFuture執行結果執行某些操作呢?其實也是可以的,使用thenAppeptBoth方法即可。注意,thenAppeptBoth和handle/thenApply/thenAppep的流程是一樣的,只不過thenAppeptBoth中包含了另一個CompletableFuture對象(注意,這里另一個CompletableFuture對象的執行可並不是上一個CompletableFuture執行結束才開始執行的)。

public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)


// thenAcceptBoth方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
});

注意,thenAcceptBoth方法是沒有返回值的(CompletableFuture<Void>),如果想用thenAcceptBoth這樣的功能並且還帶有返回值的CompletableFuture,那么thenCombine方法就該上場了。

public <U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

// thenCombine方法示例
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
    return r1 + "-" + r2;
});

thenAcceptBoth和runAfterBoth是當兩個CompletableFuture都計算完成,而下面的方法是當任意一個CompletableFuture計算完成的時候就會執行。

public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
 
public <U> CompletableFuture<U>  applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

如果當想在多個CompletableFuture都計算完成或者多個CompletableFuture中的一個計算完成后執行某個動作,可使用方法 allOf 和 anyOf。

public static CompletableFuture<Void>      allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>    anyOf(CompletableFuture<?>... cfs)

如果當任務完成時並不想用CompletableFuture的結果,可以使用thenRun方法來執行一個Runnable。

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)

以上方法都是在方法中返回一個值(或者不返回值),其實還可以返回一個CompletableFuture,是不是很像類的組合一樣。

public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

// thenCompose方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenCompose(r -> {
    System.out.println(r);
    return CompletableFuture.supplyAsync(() -> {
        System.out.println(r + " result2");
        return r + " result2";
    });
});

// 上面的代碼和下面的代碼效果是一樣的
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply(r -> {
    System.out.println(r);
    return r;
}).thenApplyAsync(r -> {
    System.out.println(r + " result2");
    return r + " result2";
});

 CompletableFuture實現機制

先拋開 CompletableFuture 不談,如果程序中使用了線程池,如何才能在某個任務執行完成之后執行某些動作呢?其實Java線程池本身已經提供了任務執行前后的hook方法(beforeExecute和afterExecute),如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    // ...
}

我們只需要自定義線程池繼承ThreadPoolExecutor ,然后重寫beforeExecute和afterExecute方法即可,在afterExecute里可以執行一些動作。關於重寫ThreadPoolExecutor 的一個示例可點擊ListenableThreadPoolExecutor查看。

那么CompletableFuture 的實現機制是怎樣的呢?其實,和上面的所說的“afterExecute機制”是類似的(本質是一樣的,回調機制),也是在任務執行完成后執行某些動作,如下代碼:

CompletableFuture.supplyAsync(() -> {
    // callable任務
    System.out.println("hello world");
    return "result";
}).thenApply(r -> {
    // 任務完成之后的動作(回調方法),類似於ThreadPoolExecutor.afterExecute方法
    System.out.println(r);
    return r;
});

上面的示例代碼其實主要完成了3個步驟,這3個步驟其實也是CompletableFuture的實現流程:

  1. 執行任務
  2. 添加任務完成之后的動作(回調方法)
  3. 執行回調

下面筆者就以上面的示例代碼,按照這3個步驟依次進行分析,此時建議讀者打開idea,寫個demo進行debug,這里篇幅有限,筆者就只講解主要流程代碼,其他代碼自行閱讀即可 :) 

1、執行任務

 執行任務的主要邏輯就是 AsyncSupply.run 方法:

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    // dep是當前CompletableFuture,fn是任務執行邏輯
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                // 1 任務執行 & result cas設置
                d.completeValue(f.get());
            } catch (Throwable ex) {
                // 1.1 result cas異常設置
                d.completeThrowable(ex);
            }
        }
        // 2 任務完成,可能涉及到回調的執行
        d.postComplete();
    }
}

2、添加回調

添加回調方法的流程是從 thenApply 開始的:

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        // 當上一個CompletableFuture未完成時,將該CompletableFuture添加
        // 到上一個CompletableFuture的statck中
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

CompletableFuture.statck 是 UniCompletion 類型的,該類型如下:

UniCompletion<T,V> {
    volatile Completion next;      // Treiber stack link
    Executor executor;                 // executor to use (null if none)
    CompletableFuture<V> dep;          // the dependent to complete
    CompletableFuture<T> src;          // source for action
}

3、執行回調

執行回調是從CompletableFuture.postComplete 開始的:

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // cas設置h.next到當前CompletableFuture.statck
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

// UniAccept
final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 執行回調
        return null;
    dep = null; src = null; fn = null;
    // 返回當前CompletableFuture 或者 遞歸調用postComplete
    return d.postFire(a, mode);
}

看完上面3個步驟,是不是還不太清楚多個CompletableFuture之間的執行流程呢,說實話筆者第一次看的時候也是這樣的 :(,下面我們換個例子並給出圖示來看:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world f1");
    sleep(1); // TimeUnit.SECONDS.sleep(1)
    return "result f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f3 = f2.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});

CompletableFuture<String> f4 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});

上面代碼對應的CompletableFuture及其Completion關系如下圖:

結合上圖和postComplete流程,可以看出執行回調的順序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。(如果這里沒看懂,可以回過頭再看下postComplete方法的源碼~)

異步編程設計

分析完了CompletableFuture,相信大家都已經對CompletableFuture的設計與實現有了進一步的理解。那么對於異步編程有哪些實際應用場景,其本質到底是什么呢?

異步處理的本質其實就是回調(系統層借助於指針來實現,准確來說是函數指針),用戶提供一個回調方法,回調函數不是由該函數的實現方直接調用,而是在特定的事件或條件發生時由另外的一方調用的,用於對該事件或條件進行響應。從“宏觀”來看,CompletableFuture的實現其實很簡單,就是回調,即在任務執行完成之后進行回調,回調中可能涉及到其他操作,比如下一個回調或者執行下一個任務。

異步編程在應用場景較多,很多語言,比如Node.js,采用回調的方式實現異步編程。Java的一些框架,比如Netty,自己擴展了Java的 Future接口,提供了addListener等多個擴展方法:

ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .localAddress(8080)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new EchoHandler());
        }
    });

dubbo中consumer對於RPC response的處理是基於回調機制的,Google guava也提供了通用的擴展Future:ListenableFuture、SettableFuture 以及輔助類Futures等,方便異步編程。

final String name = ...;
inFlight.add(name);
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
  public void run() {
    processedCount.incrementAndGet();
    inFlight.remove(name);
    lastProcessed.set(name);
    logger.info("Done with {0}", name);
  }
}, executor);

 

參考資料:

1、Java CompletableFuture 詳解

2、https://www.cnblogs.com/aniao/p/aniao_cf.html

3、https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM