並發編程(三)Promise, Future 和 Callback


並發編程(三)Promise, Future 和 Callback

異步操作的有兩個經典接口:Future 和 Promise,其中的 Future 表示一個可能還沒有實際完成的異步任務的結果,針對這個結果可以添加 Callback 以便在任務執行成功或失敗后做出對應的操作,而 Promise 交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。 可以說這一套模型是很多異步非阻塞架構的基礎。

這一套經典的模型在 Scala、C# 中得到了原生的支持,但 JDK 中暫時還只有無 Callback 的 Future 出現,當然也並非在 Java 界就沒有發展了,比如 Guava 就提供了 ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 機制。

一、Future 模式 - 將來式(JDK)

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(() -> {
    TimeUnit.SECONDS.sleep(5);
    return 5;
});
Integer result = future.get();

二、Future 模式--回調式(Guava)

Future 模式的第二種用法便是回調。很不幸的事,JDK 實現的 Future 並沒有實現 callback, addListener 這樣的方法,想要在 JAVA 中體驗到 callback 的特性,得引入一些額外的框架。

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
Futures.addCallback(future, new FutureCallback<Integer>() {
    public void onSuccess(Integer result) {
        System.out.println("success:" + result);
    }

    public void onFailure(Throwable throwable) {
        System.out.println("fail, e = " + throwable);
    }
});

Thread.currentThread().join();

三、Future 模式--回調式(Netty4)

Netty 除了是一個高性能的網絡通信框架之外,還對 jdk 的Future 做了擴展,引入 Netty 的 maven 依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>
EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
io.netty.util.concurrent.Future<Integer> f = group.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
f.addListener(new FutureListener<Object>() {
    @Override
    public void operationComplete(io.netty.util.concurrent.Future<Object> objectFuture) throws Exception {
        System.out.println("計算結果::"+objectFuture.get());
    }
});

四、由 Callback Hell 引出 Promise 模式

同樣的如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什么概念。

回調是一種我們推崇的異步調用方式,但也會遇到問題,也就是回調的嵌套。當需要多個異步回調一起書寫時,就會出現下面的代碼(以 js 為例):

asyncFunc1(opt, (...args1) => {
   asyncFunc2(opt, (...args2) => {
       asyncFunc3(opt, (...args3) => {
            asyncFunc4(opt, (...args4) => {
                // some operation
            });
        });
    });
});

雖然在 Java 業務代碼中很少出現回調的多層嵌套,這樣的代碼不易讀,嵌套太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回調地獄的問題。可能就會有人想問:Java 中存在 Promise 模式嗎?答案是肯定的。

前面提到了 Netty 和 Guava 的擴展都提供了 addListener 這樣的接口,用於處理 Callback 調用,但其實 jdk1.8 已經提供了一種更為高級的回調方式:CompletableFuture。首先嘗試用 CompletableFuture 來解決回調的問題。

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    TimeUnit.SECONDS.sleep(5);
    return 100;
});
completableFuture.whenComplete((result, e) -> {
    System.out.println("結果:" + result);
});
Thread.currentThread().join();

五、Future 和 Promise

Netty 文檔說明 Netty 的網絡操作都是異步的, 在源碼上大量使用了 Future/Promise 模型,在 Netty 里面也是這樣定義的:

  • Future 接口定義了 isSuccess(), isCancellable(), cause() 這些判斷異步執行狀態的方法。(read-only)
  • Promise 接口在 extends Future 的基礎上增加了 setSuccess(), setFailure() 來標記任務完成或者失敗。(writable)

Future

JDK 的 Future

public interface Future<V> {
    // 取消異步操作
    boolean cancel(boolean mayInterruptIfRunning);
    // 異步操作是否取消
    boolean isCancelled();
    // 異步操作是否完成,正常終止、異常、取消都是完成
    boolean isDone();

    // 阻塞直到取得異步操作結果
    V get() throws InterruptedException, ExecutionException;
    // 同上,但最長阻塞時間為timeout
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Netty 對 JDK 的 Future 進行了擴展

public interface Future<V> extends java.util.concurrent.Future<V> {
    // 異步操作完成且正常終止
    boolean isSuccess();
    // 異步操作是否可以取消
    boolean isCancellable();
    // 異步操作失敗的原因
    Throwable cause();
    // 添加一個監聽者,異步操作完成時回調,類比javascript的回調函數
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 阻塞直到異步操作完成
    Future<V> await() throws InterruptedException;
    // 同上,但異步操作失敗時拋出異常
    Future<V> sync() throws InterruptedException;
    // 非阻塞地返回異步結果,如果尚未完成返回null
    V getNow();
}

Netty 的 Promise 對又對 Future 進行了擴展

public interface Promise<V> extends Future<V> {    
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
}

DefaultChannelPromise 是 ChannelPromise 的實現類,它是實際運行時的 Promoise 實例。

參考:

  1. 《並發編程 Promise, Future 和 Callback》:https://ifeve.com/promise-future-callback/

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM