對於jdk底層已經有對Future的實現,用來執行異步操作並且提供相應對結果操作的方法。
但是,在netty內部也同樣實現了自己的Future,
並且繼承了jdk中的Future接口,提供了一些額外的方法來針對在netty中相關的異步I/O操作來進行處理
jdk中的Future
該接口表示的是異步計算的結果,提供若干方法來監測計算是否完成、等待計算完成、獲取計算的結果。下面舉例其使用方法:
{ interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); }}); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
}
future的get來獲取異步計算的結果,該方法會阻塞直到計算完成。
上述submit方法也可以被以下替換:
{ FutureTask<String> future = new FutureTask<String>(new Callable<String>() { public String call() { return searcher.search(target); }}); executor.execute(future); }
netty中的Future
public interface Future<V> extends java.util.concurrent.Future<V>
下面是一些比較重要方法的定義,其中addListener方法非常重要:
- cause方法表示如果I/O操作失敗,返回異常信息
- cancel方法的boolean參數表示是否對已經開始執行的操作進行中斷
- isSuccess方法表示I/O操作是否已經成功的完成。對於上述jdk中Future申明的isDone方法,只能知道I/O是否結束,有可能是成功完成、被取消、異常中斷。netty中Future的此isSuccess方法能夠很好的判斷出操作是否正真地成功完成
- sync方法阻塞直到future完成操作,如果操作失敗會重新拋出異常
- addListener方法會添加特定的監聽器到future,這些監聽器會在future isDone返回true的時候立刻被通知。這是netty中很重要的擴展方法,這里用到了觀察者模式
public interface GenericFutureListener<F extends Future<?>> extends EventListener { /** * Invoked when the operation associated with the {@link Future} has been completed. * * @param future the source {@link Future} which called this callback */ void operationComplete(F future) throws Exception; }
為什么future中有get方法來獲取異步的結果,這里又擴展了監聽器這種方法。如果使用get,我們會考慮到底在什么時候使用,
因為該方法會阻塞后續邏輯代碼,如果我們使用監聽器,毫無疑問,會更加優雅地在合理的時間來處理我們的邏輯代碼
ChannelFuture
netty中的ChannelFuture繼承來netty中的自己的Future
addListener方法傳入的監聽器會實現以下接口,也就是被通知的時候operationComplete方法會被調用:
public interface ChannelFuture extends Future<Void>
ChannelFuture表示Channel中異步I/O操作的結果,在netty中所有的I/O操作都是異步的,I/O的調用會直接返回,可以通過ChannelFuture來獲取I/O操作的結果狀態。對於多種狀態的表示如下:
* | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
需要注意的是failure和cancellation都會表示操作完成,但是對應的狀態是不同的。與Future類似,可以通過添加ChannelFutureListener監聽器,
當I/O操作完成的時候來通知調用。相比於wait()方式也更推薦這種方式來獲取結果狀態或者執行后續操作。
此外,不建議在ChannelHandler中調用await(),因為ChannelHandler中事件驅動的方法被一個I/O線程調用,
可能一直不回完成,那么await()也可能被I/O線程調用,同樣會一直block,因此會產生死鎖。例如:
//永遠不要這樣做 * // BAD - NEVER DO THIS * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) { * {@link ChannelFuture} future = ctx.channel().close(); * future.awaitUninterruptibly(); * // Perform post-closure operation * // ... * } //而應該這樣做: * // GOOD * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) { * {@link ChannelFuture} future = ctx.channel().close(); * future.addListener(new {@link ChannelFutureListener}() { * public void operationComplete({@link ChannelFuture} future) { * // Perform post-closure operation * // ... * } * }); * }
對於I/O超時和await()超時的區別:
//永遠不要這樣做 * // BAD - NEVER DO THIS * {@link Bootstrap} b = ...; * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(10, TimeUnit.SECONDS); * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * // You might get a NullPointerException here because the future * // might not be completed yet. * f.cause().printStackTrace(); * } else { * // Connection established successfully * } //當awaitUninterruptibly也就是await超時之后,ChannelFuture對應的連接是可能沒有完成,那么執行后續的操作就會異常 //而應該這樣做 * // GOOD * {@link Bootstrap} b = ...; * // Configure the connect timeout option. * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b> * {@link ChannelFuture} f = b.connect(...); * f.awaitUninterruptibly(); * * // Now we are sure the future is completed. * assert f.isDone(); * * if (f.isCancelled()) { * // Connection attempt cancelled by user * } else if (!f.isSuccess()) { * f.cause().printStackTrace(); * } else { * // Connection established successfully * } //當通過option的方式添加超時時間,如果超時則會被當做failure結果返回,同時再調用awaitUninterruptibly的時候一定是future已經操作完成
ChannelFuture中需要注意的是添加了channel方法來獲取Channel:
/** * Returns a channel where the I/O operation associated with this * future takes place. */ Channel channel();
JDK所提供的Future只能通過手工方式檢查執行結果,而這個操作是會阻塞的;Netty則對ChannelFuture進行來增強,通過ChannelFutureListener以回調的方式來獲取執行結果,
去除來手工檢查阻塞的操作。需要注意的是ChannelFutureListener的operationComplete方法是由I/O線程執行的,因此要注意的是不要在這里執行耗時操作,否則需要通過另外的線程或線程池來執行
ChannelPromise
ChannelPromise是一種可寫的特殊ChannelFuture
public interface ChannelPromise extends ChannelFuture, Promise<Void>
對於Promise:
public interface Promise<V> extends Future<V>
定義了可以標識Future成功或者失敗的方法,並且每一個Future只能夠被標識一次,如果成功將會去通知之前所定義的listeners
/** * Marks this future as a success and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a failure. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable();
在DefaultChannelPromise默認實現中,當表示為成功時會通知相應listeners
@Override public ChannelPromise setSuccess(Void result) { super.setSuccess(result); return this; }
在setSuccess方法中:
private void notifyListenersNow() { ... for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<?>) listeners); } synchronized (this) { if (this.listeners == null) { // Nothing can throw from within this method, so setting notifyingListeners back to false does not // need to be in a finally block. notifyingListeners = false; return; } listeners = this.listeners; this.listeners = null; } } }