今天是猿燈塔“365篇原創計划”第三篇。
接下來的時間燈塔君持續更新Netty系列一共九篇
Netty 源碼解析(一): 開始
Netty 源碼解析(二): Netty 的 Channel
當前:Netty 源碼解析(三): Netty 的 Future 和 Promise
Netty 源碼解析(四): Netty 的 ChannelPipeline
Netty 源碼解析(五): Netty 的線程池分析
Netty 源碼解析(六): Channel 的 register 操作
Netty 源碼解析(七): NioEventLoop 工作流程
Netty 源碼解析(八): 回到 Channel 的 register 操作
Netty 源碼解析(九): connect 過程和 bind 過程分析
今天呢!燈塔君跟大家講:
Netty 的 Future 和 Promise
Netty 中的異步編程: Future 和 Promise
Netty 中非常多的異步調用,所以在介紹更多 NIO 相關的內容之前,我們來看看它的異步接口是怎么使用的。前面我們在介紹 Echo 例子的時候,已經用過了 ChannelFuture 這個接口了:
爭取在看完本節后,讀者能搞清楚上面的這幾行划線部分是怎么走的。
關於 Future 接口,我想大家應該都很熟悉,用得最多的就是在使用 Java 的線程池 ThreadPoolExecutor 的時候了。在 submit 一個任務到線程池中的時候,返回的就是一個 Future 實例,通過它來獲取提交的任務的執行狀態和最終的執行結果,我們最常用它的 isDone()
和 get()
方法。下面是 JDK 中的 Future 接口 java.util.concurrent.Future:
public interface Future<V> {
// 取消該任務
boolean cancel(boolean mayInterruptIfRunning);
// 任務是否已取消
boolean isCancelled();
// 任務是否已完成
boolean isDone();
// 阻塞獲取任務執行結果
V get() throws InterruptedException, ExecutionException;
// 帶超時參數的獲取任務執行結果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty 中的 Future 接口(同名)繼承了 JDK 中的 Future 接口,然后添加了一些方法:
// io.netty.util.concurrent.Future
public interface Future<V> extends java.util.concurrent.Future<V> {
// 是否成功
boolean isSuccess();
// 是否可取消
boolean isCancellable();
// 如果任務執行失敗,這個方法返回異常信息
Throwable cause();
// 添加 Listener 來進行回調
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待任務結束,如果任務失敗,將“導致失敗的異常”重新拋出來
Future<V> sync() throws InterruptedException;
// 不響應中斷的 sync(),這個大家應該都很熟了
Future<V> syncUninterruptibly();
// 阻塞等待任務結束,和 sync() 功能是一樣的,不過如果任務失敗,它不會拋出執行過程中的異常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 獲取執行結果,不阻塞。我們都知道 java.util.concurrent.Future 中的 get() 是阻塞的
V getNow();
// 取消任務執行,如果取消成功,任務會因為 CancellationException 異常而導致失敗
// 也就是 isSuccess()==false,同時上面的 cause() 方法返回 CancellationException 的實例。
// mayInterruptIfRunning 說的是:是否對正在執行該任務的線程進行中斷(這樣才能停止該任務的執行),
// 似乎 Netty 中 Future 接口的各個實現類,都沒有使用這個參數
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
看完上面的 Netty 的 Future 接口,我們可以發現,它加了 sync() 和 await() 用於阻塞等待,還加了 Listeners,只要任務結束去回調 Listener 們就可以了,那么我們就不一定要主動調用 isDone() 來獲取狀態,或通過 get() 阻塞方法來獲取值。
所以它其實有兩種使用范式
順便說下 sync() 和 await() 的區別:sync() 內部會先調用 await() 方法,等 await() 方法返回后,會檢查下這個任務是否失敗,如果失敗,重新將導致失敗的異常拋出來。也就是說,如果使用 await(),任務拋出異常后,await() 方法會返回,但是不會拋出異常,而 sync() 方法返回的同時會拋出異常。
我們也可以看到,Future 接口沒有和 IO 操作關聯在一起,還是比較 純凈的接口。
接下來,我們來看 Future 接口的子接口 ChannelFuture,這個接口用得最多,它將和 IO 操作中的 Channel 關聯在一起了,用於異步處理 Channel 中的事件。
public interface ChannelFuture extends Future<Void> {
// ChannelFuture 關聯的 Channel
Channel channel();
// 覆寫以下幾個方法,使得它們返回值為 ChannelFuture 類型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
// 用來標記該 future 是 void 的,
// 這樣就不允許使用 addListener(...), sync(), await() 以及它們的幾個重載方法
boolean isVoid();
}
我們看到,ChannelFuture 接口相對於 Future 接口,除了將 channel 關聯進來沒有增加什么東西。還有個 isVoid() 方法算是不那么重要的存在吧。其他幾個都是方法覆寫,為了讓返回值類型變為 ChannelFuture,而不是原來的 Future。
這里有點跳,我們來介紹下 Promise 接口,它和 ChannelFuture 接口無關,而是和前面的 Future 接口相關,Promise 這個接口非常重要。Promise 接口和 ChannelFuture 一樣,也繼承了 Netty 的 Future 接口,然后加了一些 Promise 的內容:
public interface Promise<V> extends Future<V> {
// 標記該 future 成功及設置其執行結果,並且會通知所有的 listeners。
// 如果該操作失敗,將拋出異常(失敗指的是該 future 已經有了結果了,成功的結果,或者失敗的結果)
Promise<V> setSuccess(V result);
// 和 setSuccess 方法一樣,只不過如果失敗,它不拋異常,返回 false
boolean trySuccess(V result);
// 標記該 future 失敗,及其失敗原因。
// 如果失敗,將拋出異常(失敗指的是已經有了結果了)
Promise<V> setFailure(Throwable cause);
// 標記該 future 失敗,及其失敗原因。
// 如果已經有結果,返回 false,不拋出異常
boolean tryFailure(Throwable cause);
// 標記該 future 不可以被取消
boolean setUncancellable();
// 這里和 ChannelFuture 一樣,對這幾個方法進行覆寫,目的是為了返回 Promise 類型的實例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
可能有些讀者對 Promise 的概念不是很熟悉,這里簡單說兩句。
我覺得只要明白一點,Promise 實例內部是一個任務,任務的執行往往是異步的,通常是一個線程池來處理任務。Promise 提供的 setSuccess(V result) 或 setFailure(Throwable t) 將來會被某個執行任務的線程在執行完成以后調用,同時那個線程在調用 setSuccess(result) 或 setFailure(t) 后會回調 listeners 的回調函數(當然,回調的具體內容不一定要由執行任務的線程自己來執行,它可以創建新的線程來執行,也可以將回調任務提交到某個線程池來執行)。而且,一旦 setSuccess(...) 或 setFailure(...) 后,那些 await() 或 sync() 的線程就會從等待中返回。所以這里就有兩種編程方式,一種是用 await(),等 await() 方法返回后,得到 promise 的執行結果,然后處理它;另一種就是提供 Listener 實例,我們不太關心任務什么時候會執行完,只要它執行完了以后會去執行 listener 中的處理方法就行。接下來,我們再來看下 ChannelPromise,它繼承了前面介紹的 ChannelFuture 和 Promise 接口
ChannelPromise 接口在 Netty 中使用得比較多,因為它綜合了 ChannelFuture 和 Promise 兩個接口:
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
// 覆寫 ChannelFuture 中的 channel() 方法,其實這個方法一點沒變
@Override
Channel channel();
// 下面幾個方法是覆寫 Promise 中的接口,為了返回值類型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
// 到這里大家應該都熟悉了,下面幾個方法的覆寫也是為了得到 ChannelPromise 類型的實例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
// 我們忽略這個方法吧。
ChannelPromise unvoid();
}
我們可以看到,它綜合了 ChannelFuture 和 Promise 中的方法,只不過通過覆寫將返回值都變為 ChannelPromise 了而已,沒有增加什么新的功能。
小結一下,我們上面介紹了幾個接口,Future 以及它的子接口 ChannelFuture 和 Promise,然后是 ChannelPromise 接口同時繼承了 ChannelFuture 和 Promise。我把這幾個接口的主要方法列在一起,這樣大家看得清晰些:
接下來,我們需要來一個實現類,這樣才能比較直觀地看出它們是怎么使用的,因為上面的這些都是接口定義,具體還得看實現類是怎么工作的。下面,我們來介紹下 DefaultPromise 這個實現類,這個類很常用,它的源碼也不短,我們先介紹幾個關鍵的內容,然后介紹一個示例使用。首先,我們看下它有哪些屬性:
可以看出,此類實現了 Promise,但是沒有實現 ChannelFuture,所以它和 Channel 聯系不起來。別急,我們后面會碰到另一個類 DefaultChannelPromise 的使用,這個類是綜合了 ChannelFuture 和 Promise 的,但是它的實現其實大部分都是繼承自這里的 DefaultPromise 類的。public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 保存執行結果
private volatile Object result;
// 執行任務的線程池,promise 持有 executor 的引用,這個其實有點奇怪了
// 因為“任務”其實沒必要知道自己在哪里被執行的
private final EventExecutor executor;
// 監聽者,回調函數,任務結束后(正常或異常結束)執行
private Object listeners;
// 等待這個 promise 的線程數(調用sync()/await()進行等待的線程數量)
private short waiters;
// 是否正在喚醒等待線程,用於防止重復執行喚醒,不然會重復執行 listeners 的回調方法
private boolean notifyingListeners;
......
}
說完上面的屬性以后,大家可以看下 setSuccess(V result)
、trySuccess(V result)
和 setFailure(Throwable cause)
、 tryFailure(Throwable cause)
這幾個方法:
看出 setSuccess(result) 和 trySuccess(result) 的區別了嗎?
上面幾個方法都非常簡單,先設置好值,然后執行監聽者們的回調方法。notifyListeners() 方法感興趣的讀者也可以看一看,不過它還涉及到 Netty 線程池的一些內容,我們還沒有介紹到線程池,這里就不展開了。上面的代碼,在 setSuccess0 或 setFailure0 方法中都會喚醒阻塞在 sync() 或 await() 的線程另外,就是可以看下 sync() 和 await() 的區別,其他的我覺得隨便看看就好了。
@Override
public Promise<V> sync() throws InterruptedException {
await();
// 如果任務是失敗的,重新拋出相應的異常
rethrowIfFailed();
return this;
}
接下來,我們來寫個實例代碼吧:
public static void main(String[] args) {
// 構造線程池
EventExecutor executor = new DefaultEventExecutor();
// 創建 DefaultPromise 實例
Promise promise = new DefaultPromise(executor);
// 下面給這個 promise 添加兩個 listener
promise.addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任務結束,結果:" + future.get());
} else {
System.out.println("任務失敗,異常:" + future.cause());
}
}
}).addListener(new GenericFutureListener<Future<Integer>>() {
@Override
public void operationComplete(Future future) throws Exception {
System.out.println("任務結束,balabala...");
}
});
// 提交任務到線程池,五秒后執行結束,設置執行 promise 的結果
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
// 設置 promise 的結果
// promise.setFailure(new RuntimeException());
promise.setSuccess(123456);
}
});
// main 線程阻塞等待執行結果
try {
promise.sync();
} catch (InterruptedException e) {
}
}
運行代碼,兩個 listener 將在 5 秒后將輸出:任務結束,結果:123456
任務結束,balabala...
讀者這里可以試一下 sync() 和 await() 的區別,在任務中調用 promise.setFailure(new RuntimeException()) 試試看。
上面的代碼中,大家可能會對線程池 executor 和 promise 之間的關系感到有點迷惑。讀者應該也要清楚,具體的任務不一定就要在這個 executor 中被執行。任務結束以后,需要調用 promise.setSuccess(result) 作為通知。
通常來說,promise 代表的 future 是不需要和線程池攪在一起的,future 只關心任務是否結束以及任務的執行結果,至於是哪個線程或哪個線程池執行的任務,future 其實是不關心的。不過 Netty 畢竟不是要創建一個通用的線程池實現,而是和它要處理的 IO 息息相關的,所以我們只不過要理解它就好了。
這節就說這么多吧,我們回過頭來再看一下這張圖,看看大家是不是看懂了這節內容:
我們就說說上圖左邊的部分吧,雖然我們還不知道 bind() 操作中具體會做什么工作,但是我們應該可以猜出一二。顯然,main 線程調用 b.bind(port) 這個方法會返回一個 ChannelFuture,bind() 是一個異步方法,當某個執行線程執行了真正的綁定操作后,那個執行線程一定會標記這個 future 為成功(我們假定 bind 會成功),然后這里的 sync() 方法(main 線程)就會返回了。
如果 bind(port) 失敗,我們知道,sync() 方法會將異常拋出來,然后就會執行到 finally 塊了。
一旦綁定端口 bind 成功,進入下面一行,f.channel() 方法會返回該 future 關聯的 channel。channel.closeFuture() 也會返回一個 ChannelFuture,然后調用了 sync() 方法,這個 sync() 方法返回的條件是:有其他的線程關閉了 NioServerSocketChannel,往往是因為需要停掉服務了,然后那個線程會設置 future 的狀態( setSuccess(result) 或 setFailure(cause) ),這個 sync() 方法才會返回。這篇文章就到這里,希望大家對 Netty 中的異步編程有些了解,后續碰到源碼的時候能知道是怎么使用的了。
365天干貨不斷,可以微信搜索「 猿燈塔」第一時間閱讀,回復【資料】【面試】【簡歷】有我准備的一線大廠面試資料和簡歷模板