Future用於獲取異步操作的結果,而Promise則比較抽象,無法直接猜測出其功能。
Future
Future最早來源於JDK的java.util.concurrent.Future,它用於代表異步操作的結果。
可以通過get方法獲取操作結果,如果操作尚未完成,則會同步阻塞當前調用的線程;如果不允許阻塞太長時間或者無限期阻塞,可以通過帶超時時間的get方法獲取結果;如果到達超時時間操作仍然沒有完成,則拋出TimeoutException。通過isDone()方法可以判斷當前的異步操作是否完成,如果完成,無論成功與否,都返回true,否則返回false。通過cancel可以嘗試取消異步操作,它的結果是未知的,如果操作已經完成,或者發生其他未知的原因拒絕取消,取消操作將會失敗。
由於Netty的Future都是與異步I/O操作相關的,因此,命名為ChannelFuture,代表它與Channel操作相關。
在Netty中,所有的I/O操作都是異步的,這意味着任何I/O調用都會立即返回,而不是像傳統BIO那樣同步等待操作完成。異步操作會帶來一個問題:調用者如何獲取異步操作的結果?ChannelFuture就是為了解決這個問題而專門設計的。
ChannelFuture有兩種狀態:uncompleted和completed。當開始一個I/O操作時,一個新的ChannelFuture被創建,此時它處於uncompleted狀態——非失敗、非成功、非取消,因為I/O操作此時還沒有完成。一旦I/O操作完成,ChannelFuture將會被設置成completed,它的結果有如下三種可能。
- 操作成功;
- 操作失敗;
- 操作被取消。
ChannelFuture提供了一系列新的API,用於獲取操作結果、添加事件監聽器、取消I/O操作、同步等待等。
Netty強烈建議直接通過添加監聽器的方式獲取I/O操作結果,或者進行后續的相關操作。
ChannelFuture可以同時增加一個或者多個GenericFutureListener,也可以通過remove方法刪除GenericFutureListener。
當I/O操作完成之后,I/O線程會回調ChannelFuture中GenericFutureListener的operationComplete方法,並把ChannelFuture對象當作方法的入參。如果用戶需要做上下文相關的操作,需要將上下文信息保存到對應的ChannelFuture中。
推薦通過GenericFutureListener代替ChannelFuture的get等方法的原因是:當我們進行異步I/O操作時,完成的時間是無法預測的,如果不設置超時時間,它會導致調用線程長時間被阻塞,甚至掛死。而設置超時時間,時間又無法精確預測。利用異步通知機制回調GenericFutureListener是最佳的解決方案,它的性能最優。
ps:不要在ChannelHandler中調用ChannelFuture的await()方法,這會導致死鎖。原因是發起I/O操作之后,由I/O線程負責異步通知發起I/O操作的用戶線程,如果I/O線程和用戶線程是同一個線程,就會導致I/O線程等待自己通知操作完成,這就導致了死鎖,這跟經典的兩個線程互等待死鎖不同,屬於自己把自己掛死。
異步I/O操作有兩類超時:一個是TCP層面的I/O超時,另一個是業務邏輯層面的操作超時。兩者沒有必然的聯系,但是通常情況下業務邏輯超時時間應該大於I/O超時時間,它們兩者是包含的關系。
ps:ChannelFuture超時並不代表I/O超時,這意味着ChannelFuture超時后,如果沒有關閉連接資源,隨后連接依舊可能會成功,這會導致嚴重的問題。所以通常情況下,必須要考慮究竟是設置I/O超時還是ChannelFuture超時。
ChannelFuture源碼分析
AbstractFuture實現Future接口,它不允許I/O操作被取消。
@Override public V get() throws InterruptedException, ExecutionException { //調用await()方法進行無限期阻塞,當I/O操作完成后會被notify()。 await(); Throwable cause = cause(); //程序繼續向下執行,檢查I/O操作是否發生了異常 if (cause == null) { //如果沒有異常,則通過getNow()方法獲取結果並返回。 return getNow(); } //否則,將異常堆棧進行包裝,拋出ExecutionException。 throw new ExecutionException(cause); } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { //調用await(long timeout, TimeUnit unit)方法即可 if (await(timeout, unit)) { Throwable cause = cause(); //如果沒有超時,則依次判斷是否發生了I/O異常等情況 if (cause == null) { //操作與無參數的get方法相同。 return getNow(); } throw new ExecutionException(cause); } //如果超時,則拋出TimeoutException。 throw new TimeoutException(); }
Promise
Promise是可寫的Future,Future自身並沒有寫操作相關的接口,Netty通過Promise對Future進行擴展,用於設置I/O操作的結果。
Promise相關的寫操作接口定義如圖:
Netty發起I/O操作的時候,會創建一個新的Promise對象,例如調用ChannelHandlerContext的write(Object object)方法時,會創建一個新的ChannelPromise。
@Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE); next.invoker.invokeWrite(next, msg, promise); return promise; } @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }
Promise源碼分析
分析一個它的實現子類的源碼DefaultPromise 。
setSuccess方法的實現
@Override public Promise<V> setSuccess(V result) { //調用setSuccess0方法並對其操作結果進行判斷,如果操作成功,則調用notifyListeners方法通知listener。 if (setSuccess0(result)) { notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { //首先判斷當前Promise的操作結果是否已經被設置,如果已經被設置,則不允許重復設置,返回設置失敗。 if (isDone()) { return false; } //由於可能存在I/O線程和用戶線程同時操作Promise,所以設置操作結果的時候需要加鎖保護,防止並發操作。 synchronized (this) { //對操作結果是否被設置進行二次判斷(為了提升並發性能的二次判斷),如果已經被設置,則返回操作失敗。 if (isDone()) { return false; } //對操作結果result進行判斷,如果為空,說明僅僅需要notify在等待的業務線程,不包含具體的業務邏輯對象。 //因此,將result設置為系統默認的SUCCESS。 if (result == null) { this.result = SUCCESS; } else { //如果操作結果非空,將結果設置為result。 this.result = result; } //如果有正在等待異步I/O操作完成的用戶線程或者其他系統線程 if (hasWaiters()) { //調用notifyAll方法喚醒所有正在等待的線程。注意,notifyAll和wait方法都必須在同步塊內使用。 notifyAll(); } } return true; }
await方法的實現
@Override public Promise<V> await() throws InterruptedException { //如果當前的Promise已經被設置,則直接返回。 if (isDone()) { return this; } //如果線程已經被中斷,則拋出中斷異常。 if (Thread.interrupted()) { throw new InterruptedException(toString()); } //通過同步關鍵字鎖定當前Promise對象 synchronized (this) { //使用循環判斷對isDone結果進行判斷,進行循環判斷的原因是防止線程被意外喚醒導致的功能異常。 while (!isDone()) { checkDeadLock(); incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; }
由於在I/O線程中調用Promise的await或者sync方法會導致死鎖,所以在循環體中需要對死鎖進行保護性校驗,防止I/O線程被掛死,最后調用java.lang.Object.wait()方法進行無限期等待,直到I/O線程調用setSuccess方法、trySuccess方法、setFailure或者tryFailure方法。