netty中的Future、ChannelFuture與ChannelPromise詳解


  對於jdk底層已經有對Future的實現,用來執行異步操作並且提供相應對結果操作的方法。但是,在netty內部也同樣實現了自己的Future,並且繼承了jdk中的Future接口,提供了一些額外的方法來針對在netty中相關的異步I/O操作來進行處理。

1、jdk中的Future

  該接口表示的是異步計算的結果,提供若干方法來監測計算是否完成、等待計算完成、獲取計算的結果。下面舉例其使用方法:

{
  interface ArchiveSearcher { String search(String target); }
  class App {
    ExecutorService executor = ...
    ArchiveSearcher searcher = ...
    void showSearch(final String target)
        throws InterruptedException {
      Future<String> future
        = executor.submit(new Callable<String>() {
          public String call() {
              return searcher.search(target);
          }});
      displayOtherThings(); // do other things while searching
      try {
        displayText(future.get()); // 通過future的get來獲取異步計算的結果,該方法會阻塞直到計算完成
      } catch (ExecutionException ex) { cleanup(); return; }
    }
  }}

  上述submit方法也可以被以下替換:

{
  FutureTask<String> future =
    new FutureTask<String>(new Callable<String>() {
      public String call() {
        return searcher.search(target);
    }});
  executor.execute(future);}

2、netty中的Future

public interface Future<V> extends java.util.concurrent.Future<V>

  下面是一些比較重要方法的定義,其中addListener方法非常重要:

(1)cause方法表示如果I/O操作失敗,返回異常信息
(2)cancel方法的boolean參數表示是否對已經開始執行的操作進行中斷
(3)isSuccess方法表示I/O操作是否已經成功的完成。對於上述jdk中Future申明的isDone方法,只能知道I/O是否結束,有可能是成功完成、被取消、異常中斷。netty中Future的此(4)isSuccess方法能夠很好的判斷出操作是否正真地成功完成
(5)sync方法阻塞直到future完成操作,如果操作失敗會重新拋出異常
(6)addListener方法會添加特定的監聽器到future,這些監聽器會在future isDone返回true的時候立刻被通知。這是netty中很重要的擴展方法,這里用到了觀察者模式
(7)addListener方法傳入的監聽器會實現以下接口,也就是被通知的時候operationComplete方法會被調用:

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

  為什么future中有get方法來獲取異步的結果,這里又擴展了監聽器這種方法呢?因為如果使用get,我們會考慮到底在什么時候使用,因為該方法會阻塞后續邏輯代碼,如果我們使用監聽器,毫無疑問,會更加優雅地在合理的時間來處理我們的邏輯代碼。

3、ChannelFuture

  netty中的ChannelFuture繼承來netty中的自己的Future

public interface ChannelFuture extends Future<Void>

  ChannelFuture表示Channel中異步I/O操作的結果,在netty中所有的I/O操作都是異步的,I/O的調用會直接返回,可以通過ChannelFuture來獲取I/O操作的結果狀態。對於多種狀態的表示如下:

* <pre>
*                                      +---------------------------+
*                                      | Completed successfully    |
*                                      +---------------------------+
*                                 +---->      isDone() = <b>true</b>      |
* +--------------------------+    |    |   isSuccess() = <b>true</b>      |
* |        Uncompleted       |    |    +===========================+
* +--------------------------+    |    | Completed with failure    |
* |      isDone() = <b>false</b>    |    |    +---------------------------+
* |   isSuccess() = false    |----+---->   isDone() = <b>true</b>         |
* | isCancelled() = false    |    |    | cause() = <b>non-null</b>     |
* |    cause() = null     |    |    +===========================+
* +--------------------------+    |    | Completed by cancellation |
*                                 |    +---------------------------+
*                                 +---->      isDone() = <b>true</b>      |
*                                      | isCancelled() = <b>true</b>      |
*                                      +---------------------------+
* </pre>

  需要注意的是failure和cancellation都會表示操作完成,但是對應的狀態是不同的。與Future類似,可以通過添加ChannelFutureListener監聽器,當I/O操作完成的時候來通知調用。相比於wait()方式也更推薦這種方式來獲取結果狀態或者執行后續操作。
此外,不建議在ChannelHandler中調用await(),因為ChannelHandler中事件驅動的方法被一個I/O線程調用,可能一直不回完成,那么await()也可能被I/O線程調用,同樣會一直block,因此會產生死鎖。例如:

//永遠不要這樣做
 * // BAD - NEVER DO THIS
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.awaitUninterruptibly();
 *     // Perform post-closure operation
 *     // ...
 * }

 //而應該這樣做:
 * // GOOD
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.addListener(new {@link ChannelFutureListener}() {
 *         public void operationComplete({@link ChannelFuture} future) {
 *             // Perform post-closure operation
 *             // ...
 *         }
 *     });
 * }

  對於I/O超時和await()超時的區別:

//永遠不要這樣做
 * // BAD - NEVER DO THIS
 * {@link Bootstrap} b = ...;
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     // You might get a NullPointerException here because the future
 *     // might not be completed yet.
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
//當awaitUninterruptibly也就是await超時之后,ChannelFuture對應的連接是可能沒有完成,那么執行后續的操作就會異常

 //而應該這樣做
 * // GOOD
 * {@link Bootstrap} b = ...;
 * // Configure the connect timeout option.
 * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly();
 *
 * // Now we are sure the future is completed.
 * assert f.isDone();
 *
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }

  ChannelFuture中需要注意的是添加了channel方法來獲取Channel:

 /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();

  JDK所提供的Future只能通過手工方式檢查執行結果,而這個操作是會阻塞的;Netty則對ChannelFuture進行來增強,通過ChannelFutureListener以回調的方式來獲取執行結果,去除來手工檢查阻塞的操作。需要注意的是ChannelFutureListener的operationComplete方法是由I/O線程執行的,因此要注意的是不要在這里執行耗時操作,否則需要通過另外的線程或線程池來執行

4、ChannelPromise

  ChannelPromise是一種可寫的特殊ChannelFuture

public interface ChannelPromise extends ChannelFuture, Promise<Void>

  對於Promise:

public interface Promise<V> extends Future<V>

  定義了可以標識Future成功或者失敗的方法,並且每一個Future只能夠被標識一次,如果成功將會去通知之前所定義的listeners

  /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a success. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

  在DefaultChannelPromise默認實現中,當表示為成功時會通知相應listeners

@Override
    public ChannelPromise setSuccess(Void result) {
        super.setSuccess(result);
        return this;
    }

  在setSuccess方法中:

private void notifyListenersNow() {
        ...
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                    // need to be in a finally block.
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }

 

————————————————
版權聲明:本文為CSDN博主「BigMan-Hui」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/u011262847/article/details/78208583

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM