從源碼上理解Netty並發工具-Promise


前提

最近一直在看Netty相關的內容,也在編寫一個輕量級的RPC框架來練手,途中發現了Netty的源碼有很多亮點,某些實現甚至可以用苛刻來形容。另外,Netty提供的工具類也是相當優秀,可以開箱即用。這里分析一下個人比較喜歡的領域,並發方面的一個Netty工具模塊 - Promise

環境版本:

  • Netty:4.1.44.Final
  • JDK1.8

Promise簡介

Promise,中文翻譯為承諾或者許諾,含義是人與人之間,一個人對另一個人所說的具有一定憧憬的話,一般是可以實現的。

io.netty.util.concurrent.Promise在注釋中只有一句話:特殊的可寫的io.netty.util.concurrent.FuturePromise接口是io.netty.util.concurrent.Future的子接口)。而io.netty.util.concurrent.Futurejava.util.concurrent.Future的擴展,表示一個異步操作的結果。我們知道,JDK並發包中的Future是不可寫,也沒有提供可監聽的入口(沒有應用觀察者模式),而Promise很好地彌補了這兩個問題。另一方面從繼承關系來看,DefaultPromise是這些接口的最終實現類,所以分析源碼的時候需要把重心放在DefaultPromise類。一般一個模塊提供的功能都由接口定義,這里分析一下兩個接口的功能列表:

  • io.netty.util.concurrent.Promise
  • io.netty.util.concurrent.Future

先看io.netty.util.concurrent.Future接口:

public interface Future<V> extends java.util.concurrent.Future<V> {

    // I/O操作是否執行成功
    boolean isSuccess();

    // 標記是否可以通過下面的cancel(boolean mayInterruptIfRunning)取消I/O操作
    boolean isCancellable();

    // 返回I/O操作的異常實例 - 如果I/O操作本身是成功的,此方法返回null
    Throwable cause();

    // 為當前Future實例添加監聽Future操作完成的監聽器 - isDone()方法激活之后所有監聽器實例會得到回調
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    
    // 為當前Future移除監聽Future操作完成的監聽器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 同步等待Future完成得到最終結果(成功)或者拋出異常(失敗),響應中斷
    Future<V> sync() throws InterruptedException;

    // 同步等待Future完成得到最終結果(成功)或者拋出異常(失敗),不響應中斷
    Future<V> syncUninterruptibly();

    // 等待Future完成,響應中斷
    Future<V> await() throws InterruptedException;

    // 等待Future完成,不響應中斷
    Future<V> awaitUninterruptibly();

    // 帶超時時限的等待Future完成,響應中斷
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    
    // 帶超時時限的等待Future完成,不響應中斷
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);

    // 非阻塞馬上返回Future的結果,如果Future未完成,此方法一定返回null;有些場景下如果Future成功獲取到的結果是null則需要二次檢查isDone()方法是否為true
    V getNow();

    // 取消當前Future實例的執行,如果取消成功會拋出CancellationException異常
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

sync()await()方法類似,只是sync()會檢查異常執行的情況,一旦發現執行異常馬上把異常實例包裝拋出,而await()方法對異常無感知。

接着看io.netty.util.concurrent.Promise接口:

public interface Promise<V> extends Future<V> {
   
    // 標記當前Future成功,設置結果,如果設置成功,則通知所有的監聽器,如果Future已經成功或者失敗,則拋出IllegalStateException
    Promise<V> setSuccess(V result);

    // 標記當前Future成功,設置結果,如果設置成功,則通知所有的監聽器並且返回true,否則返回false
    boolean trySuccess(V result);

    // 標記當前Future失敗,設置結果為異常實例,如果設置成功,則通知所有的監聽器,如果Future已經成功或者失敗,則拋出IllegalStateException
    Promise<V> setFailure(Throwable cause);

    // 標記當前Future失敗,設置結果為異常實例,如果設置成功,則通知所有的監聽器並且返回true,否則返回false
    boolean tryFailure(Throwable cause);
    
    // 標記當前的Promise實例為不可取消,設置成功返回true,否則返回false
    boolean setUncancellable();

    // 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回類型為Promise

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

到此,Promise接口的所有功能都分析完畢,接下來從源碼角度詳細分析Promise的實現。

Promise源碼實現

Promise的實現類為io.netty.util.concurrent.DefaultPromise(其實DefaultPromise還有很多子類,某些實現是為了定制特定的場景做了擴展),而DefaultPromise繼承自io.netty.util.concurrent.AbstractFuture

public abstract class AbstractFuture<V> implements Future<V> {

    // 永久阻塞等待獲取結果的方法
    @Override
    public V get() throws InterruptedException, ExecutionException {
        // 調用響應中斷的永久等待方法進行阻塞
        await();
        // 從永久阻塞中喚醒后,先判斷Future是否執行異常
        Throwable cause = cause();
        if (cause == null) {
            // 異常為空說明執行成功,調用getNow()方法返回結果
            return getNow();
        }
        // 異常為空不為空,這里區分特定的取消異常則轉換為CancellationException拋出
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // 非取消異常的其他所有異常都被包裝為執行異常ExecutionException拋出
        throw new ExecutionException(cause);
    }
    
    // 帶超時阻塞等待獲取結果的方法
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // 調用響應中斷的帶超時時限等待方法進行阻塞
        if (await(timeout, unit)) {
             // 從帶超時時限阻塞中喚醒后,先判斷Future是否執行異常
            Throwable cause = cause();
            if (cause == null) {
                // 異常為空說明執行成功,調用getNow()方法返回結果
                return getNow();
            }
            // 異常為空不為空,這里區分特定的取消異常則轉換為CancellationException拋出
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            // 在非等待超時的前提下,非取消異常的其他所有異常都被包裝為執行異常ExecutionException拋出
            throw new ExecutionException(cause);
        }
        // 方法步入此處說明等待超時,則拋出超時異常TimeoutException
        throw new TimeoutException();
    }
}

AbstractFuture僅僅對get()get(long timeout, TimeUnit unit)兩個方法進行了實現,其實這兩處的實現和java.util.concurrent.FutureTask中的實現方式十分相似。

DefaultPromise的源碼比較多,這里分開多個部分去閱讀,先看它的屬性和構造函數:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // 正常日志的日志句柄,InternalLogger是Netty內部封裝的日志接口
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);

    // 任務拒絕執行時候的日志句柄 - Promise需要作為一個任務提交到線程中執行,如果任務拒絕則使用此日志句柄打印日志
    private static final InternalLogger rejectedExecutionLogger =
            InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");

    // 監聽器的最大棧深度,默認值為8,這個值是防止嵌套回調調用的時候棧深度過大導致內存溢出,后面會舉個例子說明它的用法
    private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
            SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
    
    // 結果更新器,用於CAS更新結果result的值
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    
    // 用於填充result的值,當設置結果result傳入null,Promise執行成功,用這個值去表示成功的結果
    private static final Object SUCCESS = new Object();
    
    // 用於填充result的值,表示Promise不能被取消
    private static final Object UNCANCELLABLE = new Object();
    
    // CancellationException實例的持有器,用於判斷Promise取消狀態和拋出CancellationException
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
            new CancellationException(), DefaultPromise.class, "cancel(...)"));
    
    // CANCELLATION_CAUSE_HOLDER的異常棧信息元素數組
    private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
    
    // 真正的結果對象,使用Object類型,最終有可能為null、真正的結果實例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
    private volatile Object result;
    
    // 事件執行器,這里暫時不做展開,可以理解為單個調度線程
    private final EventExecutor executor;
    
     // 監聽器集合,可能是單個GenericFutureListener實例或者DefaultFutureListeners(監聽器集合)實例
    private Object listeners;
    
    // 等待獲取結果的線程數量
    private short waiters;

    // 標記是否正在回調監聽器
    private boolean notifyingListeners;

    // 構造函數依賴於EventExecutor
    public DefaultPromise(EventExecutor executor) {
        this.executor = checkNotNull(executor, "executor");
    }

    protected DefaultPromise() {
        // only for subclasses - 這個構造函數預留給子類
        executor = null;
    }

    // ... 省略其他代碼 ...

    // 私有靜態內部類,用於存放Throwable實例,也就是持有異常的原因實例
    private static final class CauseHolder {
        final Throwable cause;
        CauseHolder(Throwable cause) {
            this.cause = cause;
        }
    }

    // 私有靜態內部類,用於覆蓋CancellationException的棧信息為前面定義的CANCELLATION_STACK,同時覆蓋了toString()返回CancellationException的全類名
    private static final class LeanCancellationException extends CancellationException {
        private static final long serialVersionUID = 2794674970981187807L;

        @Override
        public Throwable fillInStackTrace() {
            setStackTrace(CANCELLATION_STACK);
            return this;
        }

        @Override
        public String toString() {
            return CancellationException.class.getName();
        }
    }
    // ... 省略其他代碼 ...
}

Promise目前支持兩種類型的監聽器:

  • GenericFutureListener:支持泛型的Future監聽器。
  • GenericProgressiveFutureListener:它是GenericFutureListener的子類,支持進度表示和支持泛型的Future監聽器(有些場景需要多個步驟實現,類似於進度條那樣)。
// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    void operationComplete(F future) throws Exception;
}

// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
    
    void operationProgressed(F future, long progress, long total) throws Exception;
}

為了讓Promise支持多個監聽器,Netty添加了一個默認修飾符修飾的DefaultFutureListeners類用於保存監聽器實例數組:

// DefaultFutureListeners
final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // 這個構造相對特別,是為了讓Promise中的listeners(Object類型)實例由單個GenericFutureListener實例轉換為DefaultFutureListeners類型
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        // 注意這里,每次擴容數組長度是原來的2倍
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        // 把當前的GenericFutureListener加入數組中
        listeners[size] = l;
        // 監聽器總數量加1
        this.size = size + 1;
        // 如果為GenericProgressiveFutureListener,則帶進度指示的監聽器總數量加1
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                // 計算需要需要移動的監聽器的下標
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    // listenersToMove后面的元素全部移動到數組的前端
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                // 當前監聽器總量的最后一個位置設置為null,數量減1
                listeners[-- size] = null;
                this.size = size;
                // 如果監聽器是GenericProgressiveFutureListener,則帶進度指示的監聽器總數量減1
                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }
    
    // 返回監聽器實例數組
    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }
    
    // 返回監聽器總數量
    public int size() {
        return size;
    }
    
    // 返回帶進度指示的監聽器總數量
    public int progressiveSize() {
        return progressiveSize;
    }
}

接下來看DefaultPromise的剩余方法實現,筆者覺得DefaultPromise方法實現在代碼順序上是有一定的藝術的。先看幾個判斷Promise執行狀態的方法:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ... 省略其他代碼 ...

    @Override
    public boolean setUncancellable() {
        // 通過結果更新器CAS更新result為UNCANCELLABLE,期望舊值為null,更新值為UNCANCELLABLE屬性,如果成功則返回true
        if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
            return true;
        }
        Object result = this.result;
        // 步入這里說明result當前值不為null,isDone0()和isCancelled0()都是終態,這里如果命中終態就返回false
        //(筆者注:其實可以這樣認為,這里result不能為null,如果不為終態,它只能是UNCANCELLABLE屬性實例)
        return !isDone0(result) || !isCancelled0(result);
    }

    @Override
    public boolean isSuccess() {
        Object result = this.result;
        // 如果執行成功,則結果不為null,同時不為UNCANCELLABLE,同時不為CauseHolder類型
        //(筆者注:其實可以這樣認為,Promise為成功,則result只能是一個開發者定義的實例或者SUCCESS屬性實例)
        return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
    }

    @Override
    public boolean isCancellable() {
        // 是否可取消的,result為null說明Promise處於初始化狀態尚未執行,則認為可以取消
        return result == null;
    }

    @Override
    public Throwable cause() {
        // 通過當前result獲取Throwable實例
        return cause0(result);
    }

    private Throwable cause0(Object result) {
        // result非CauseHolder類型,則直接返回null
        if (!(result instanceof CauseHolder)) {
            return null;
        }
        // 如果result為CANCELLATION_CAUSE_HOLDER(靜態CancellationException的持有)
        if (result == CANCELLATION_CAUSE_HOLDER) {
            // 則新建一個自定義LeanCancellationException實例
            CancellationException ce = new LeanCancellationException();
            // 如果CAS更新結果result為LeanCancellationException新實例則返回
            if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
                return ce;
            }
            // 走到這里說明了result是非CANCELLATION_CAUSE_HOLDER的自定義CauseHolder實例
            result = this.result;
        }
        // 兜底返回CauseHolder持有的cause
        return ((CauseHolder) result).cause;
    }
      
    // 靜態方法,判斷Promise是否為取消,依據是result必須是CauseHolder類型,同時CauseHolder中的cause必須為CancellationException類型或者其子類
    private static boolean isCancelled0(Object result) {
        return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
    }
    
    // 靜態方法,判斷Promise是否完成,依據是result不為null同時不為UNCANCELLABLE屬性實例
    private static boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
    }

    // 判斷Promise實例是否取消
    @Override
    public boolean isCancelled() {
        return isCancelled0(result);
    }
    
    // 判斷Promise實例是否完成
    @Override
    public boolean isDone() {
        return isDone0(result);
    }
    // ... 省略其他代碼 ...
}    

接着看監聽器的添加和移除方法(這其中也包含了通知監聽器的邏輯):

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ... 省略其他代碼 ...
    @Override
    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        // 入參非空校驗
        checkNotNull(listener, "listener");
        // 加鎖,鎖定的對象是Promise實例自身
        synchronized (this) {
            // 添加監聽器
            addListener0(listener);
        }
        // 如果Promise實例已經執行完畢,則通知監聽器進行回調
        if (isDone()) {
            notifyListeners();
        }
        return this;
    }

    @Override
    public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
        // 入參非空校驗
        checkNotNull(listeners, "listeners");
        // 加鎖,鎖定的對象是Promise實例自身
        synchronized (this) {
            // 遍歷入參數組添加監聽器,有空元素直接跳出
            for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                if (listener == null) {
                    break;
                }
                addListener0(listener);
            }
        }
        // 如果Promise實例已經執行完畢,則通知監聽器進行回調
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
        // 入參非空校驗
        checkNotNull(listener, "listener");
        // 加鎖,鎖定的對象是Promise實例自身
        synchronized (this) {
            // 移除監聽器
            removeListener0(listener);
        }
        return this;
    }

    @Override
    public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
        // 入參非空校驗
        checkNotNull(listeners, "listeners");
        // 加鎖,鎖定的對象是Promise實例自身
        synchronized (this) {
            // 遍歷入參數組移除監聽器,有空元素直接跳出
            for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                if (listener == null) {
                    break;
                }
                removeListener0(listener);
            }
        }
        return this;
    }

    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        // 如果Promise實例持有listeners為null,則直接設置為入參listener
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
             // 如果當前Promise實例持有listeners的是DefaultFutureListeners類型,則調用它的add()方法進行添加
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            // 步入這里說明當前Promise實例持有listeners為單個GenericFutureListener實例,需要轉換為DefaultFutureListeners實例
            listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
        }
    }

    private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        // 如果當前Promise實例持有listeners的是DefaultFutureListeners類型,則調用它的remove()方法進行移除
        if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).remove(listener);
        } else if (listeners == listener) {
            // 如果當前Promise實例持有listeners不為DefaultFutureListeners類型,也就是單個GenericFutureListener並且和傳入的listener相同,
            // 則Promise實例持有listeners置為null
            listeners = null;
        }
    }

    private void notifyListeners() {
        EventExecutor executor = executor();
        // 當前執行線程是事件循環線程,那么直接同步調用,簡單來說就是調用notifyListeners()方法的線程和EventExecutor是同一個線程
        if (executor.inEventLoop()) {
            // 下面的ThreadLocal和listenerStackDepth是調用棧深度保護相關,博文會另起一個章節專門講解這個問題,這里可以暫時忽略
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }
        // 當前執行線程不是事件循環線程,則把notifyListenersNow()包裝為Runnable實例放到EventExecutor中執行
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }
    
    // 使用EventExecutor進行任務執行,execute()方法拋出的異常會使用rejectedExecutionLogger句柄打印
    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }
   
    // 馬上通知所有監聽器進行回調
    private void notifyListenersNow() {
        Object listeners;
        // 這里加鎖,在鎖的保護下設置notifyingListeners的值,如果多個線程調用同一個Promise實例的notifyListenersNow()方法
        // 命中notifyingListeners的線程可以直接返回
        synchronized (this) {
            // Only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            // 臨時變量listeners存放瞬時的監聽器實例,方便下一步設置Promise實例的listeners為null
            listeners = this.listeners;
            // 重置當前Promise實例的listeners為null
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                // 多個監聽器情況下的通知
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                // 單個監聽器情況下的通知
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // 這里因為沒有異常拋出的可能,不用在finally塊中編寫,重置notifyingListeners為false並且返回跳出循環
                    notifyingListeners = false;
                    return;
                }
                  // 臨時變量listeners存放瞬時的監聽器實例,回調操作判斷是基於臨時實例去做 - 這里可能由另一個線程更新了listeners的值
                listeners = this.listeners;
                // 重置當前Promise實例的listeners為null,確保監聽器只會被回調一次,下一次跳出for死循環
                this.listeners = null;
            }
        }
    }
   
    // 遍歷DefaultFutureListeners中的listeners數組,調用靜態方法notifyListener0()
    private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }
    
    // 這個靜態方法是最終監聽器回調的方法,也就是簡單調用GenericFutureListener#operationComplete()傳入的是當前的Promise實例,捕獲一切異常打印warn日志
    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }
}    

然后看wait()sync()方法體系:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ... 省略其他代碼 ...

    @Override
    public Promise<V> await() throws InterruptedException {
        // 如果Promise執行完畢,直接返回
        if (isDone()) {
            return this;
        }
        // 如果當前線程中斷則直接拋出InterruptedException
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // 死鎖檢測
        checkDeadLock();
        // 加鎖,加鎖對象是當前Promise實例
        synchronized (this) {
            // 這里設置一個死循環,終止條件是isDone()為true
            while (!isDone()) {
                // 等待線程數加1
                incWaiters();
                try {
                    // 這里調用的是Object#wait()方法進行阻塞,如果線程被中斷會拋出InterruptedException
                    wait();
                } finally {
                    // 解除阻塞后等待線程數減1
                    decWaiters();
                }
            }
        }
        return this;
    }

    @Override
    public Promise<V> awaitUninterruptibly() {
        // 如果Promise執行完畢,直接返回
        if (isDone()) {
            return this;
        }
        // 死鎖檢測
        checkDeadLock();
        boolean interrupted = false;
        // 加鎖,加鎖對象是當前Promise實例
        synchronized (this) {
            // 這里設置一個死循環,終止條件是isDone()為true
            while (!isDone()) {
                 // 等待線程數加1
                incWaiters();
                try {
                    // 這里調用的是Object#wait()方法進行阻塞,捕獲了InterruptedException異常,如果拋出InterruptedException記錄線程的中斷狀態到interrupted
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    // 解除阻塞后等待線程數減1
                    decWaiters();
                }
            }
        }
        // 如果線程被中斷跳出等待阻塞,則清除線程的中斷標志位
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return this;
    }

    // 后面的幾個帶超時時限的wait()方法都是調用await0()

    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }

    @Override
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(MILLISECONDS.toNanos(timeoutMillis), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
        try {
            return await0(unit.toNanos(timeout), false);
        } catch (InterruptedException e) {
            // Should not be raised at all.
            throw new InternalError();
        }
    }

    @Override
    public boolean awaitUninterruptibly(long timeoutMillis) {
        try {
            return await0(MILLISECONDS.toNanos(timeoutMillis), false);
        } catch (InterruptedException e) {
            // Should not be raised at all.
            throw new InternalError();
        }
    }
    
    // 檢查死鎖,這里判斷了等待線程是事件循環線程則直接拋出BlockingOperationException異常
    // 簡單來說就是:Promise的執行線程和等待結果的線程,不能是同一個線程,否則依賴會成環
    protected void checkDeadLock() {
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
    }

    @Override
    public Promise<V> sync() throws InterruptedException {
        // 同步永久阻塞等待
        await();
        // 阻塞等待解除,如果執行存在異常,則直接拋出
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> syncUninterruptibly() {
        // 同步永久阻塞等待 - 響應中斷
        awaitUninterruptibly();
        // 塞等待解除,如果執行存在異常,則直接拋出
        rethrowIfFailed();
        return this;
    }
    
    // waiters加1,如果超過Short.MAX_VALUE則拋出IllegalStateException
    private void incWaiters() {
        if (waiters == Short.MAX_VALUE) {
            throw new IllegalStateException("too many waiters: " + this);
        }
        ++waiters;
    }
    
    // waiters減1
    private void decWaiters() {
        --waiters;
    }
    
    // cause不為null則拋出
    private void rethrowIfFailed() {
        Throwable cause = cause();
        if (cause == null) {
            return;
        }
        PlatformDependent.throwException(cause);
    }

    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        // 如果Promise執行完畢,直接返回
        if (isDone()) {
            return true;
        }
        // 如果超時時限小於0那么返回isDone()的結果
        if (timeoutNanos <= 0) {
            return isDone();
        }
        // 如果允許中斷,當前線程的中斷標志位為true,則拋出InterruptedException
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        // 死鎖檢測
        checkDeadLock();
        // 記錄當前的納秒時間戳
        long startTime = System.nanoTime();
        // 等待時間的長度 - 單位為納秒
        long waitTime = timeoutNanos;
        // 記錄線程是否被中斷
        boolean interrupted = false;
        try {
            // 死循環
            for (;;) {
                synchronized (this) {
                    // 如果Promise執行完畢,直接返回true - 這一步是先驗判斷,命中了就不需要阻塞等待
                    if (isDone()) {
                        return true;
                    }
                    // 等待線程數加1
                    incWaiters();
                    try {
                        // 這里調用的是帶超時時限的Object#wait()方法進行阻塞
                        wait(waitTime / 1000000, (int) (waitTime % 1000000));
                    } catch (InterruptedException e) {
                        // 線程被中斷並且外部允許中斷,那么直接拋出InterruptedException
                        if (interruptable) {
                            throw e;
                        } else {
                            // 否則只記錄中斷過的狀態
                            interrupted = true;
                        }
                    } finally {
                        // 解除阻塞后等待線程數減1
                        decWaiters();
                    }
                }
                // 解除阻塞后,如果Promise執行完畢,直接返回true
                if (isDone()) {
                    return true;
                } else {
                    // 步入這里說明Promise尚未執行完畢,則重新計算等待時間間隔的長度數量(修正),如果大於0則進入下一輪循環
                    waitTime = timeoutNanos - (System.nanoTime() - startTime);
                    if (waitTime <= 0) {
                        return isDone();
                    }
                }
            }
        } finally {
            // 如果線程被中斷跳出等待阻塞,則清除線程的中斷標志位
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
    // ... 省略其他代碼 ...
}

最后是幾個設置結果和獲取結果的方法:

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    // ... 省略其他代碼 ...
    @Override
    public Promise<V> setSuccess(V result) {
        // 設置成功結果,如果設置成功則返回當前Promise實例
        if (setSuccess0(result)) {
            return this;
        }
        // 設置失敗說明了多次設置,Promise已經執行完畢,則拋出異常
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        // 設置成功結果,返回的布爾值表示成功或失敗
        return setSuccess0(result);
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        // 設置失敗結果,如果設置成功則返回當前Promise實例
        if (setFailure0(cause)) {
            return this;
        }
        // 設置失敗說明了多次設置,Promise已經執行完畢,則拋出異常
        throw new IllegalStateException("complete already: " + this, cause);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        // 設置失敗結果,返回的布爾值表示成功或失敗
        return setFailure0(cause);
    }

    @SuppressWarnings("unchecked")
    @Override
    public V getNow() {
        // 非阻塞獲取結果,如果result是CauseHolder類型、SUCCESS屬性實例或者UNCANCELLABLE實行實例則返回null,否則返回轉換類型后的result值
        // 對異常無感知,如果CauseHolder包裹了異常,此方法依然返回null
        Object result = this.result;
        if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        return (V) result;
    }

    @SuppressWarnings("unchecked")
    @Override
    public V get() throws InterruptedException, ExecutionException {
        // 永久阻塞獲取結果
        Object result = this.result;
        // 如果Promise未執行完畢則進行永久阻塞等待
        if (!isDone0(result)) {
            await();
            // 更新結果臨時變量
            result = this.result;
        }
        // result為SUCCESS屬性實例或者UNCANCELLABLE屬性實例的時候直接返回null
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        // 如果result為CauseHolder類型,則獲取其中持有的cause屬性,也有可能為null
        Throwable cause = cause0(result);
        if (cause == null) {
            // 執行成功的前提下轉換類型后的result值返回
            return (V) result;
        }
        // 取消的情況,拋出CancellationException
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // 剩余的情況一律封裝為ExecutionException異常
        throw new ExecutionException(cause);
    }

    @SuppressWarnings("unchecked")
    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // 帶超時時限的阻塞獲取結果
        Object result = this.result;
        // 如果Promise未執行完畢則進行帶超時時限的阻塞等待
        if (!isDone0(result)) {
            if (!await(timeout, unit)) {
                // 等待超時直接拋出TimeoutException
                throw new TimeoutException();
            }
            // 更新結果臨時變量
            result = this.result;
        }
        // result為SUCCESS屬性實例或者UNCANCELLABLE屬性實例的時候直接返回null
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        // 如果result為CauseHolder類型,則獲取其中持有的cause屬性,也有可能為null
        Throwable cause = cause0(result);
        if (cause == null) {
            // 執行成功的前提下轉換類型后的result值返回
            return (V) result;
        }
        // 取消的情況,拋出CancellationException
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // 剩余的情況一律封裝為ExecutionException異常
        throw new ExecutionException(cause);
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        // CAS更新result為CANCELLATION_CAUSE_HOLDER,result的期望值必須為null
        if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
            // 判斷是否需要進行等待線程的通知
            if (checkNotifyWaiters()) {
                // 通知監聽器進行回調
                notifyListeners();
            }
            return true;
        }
        return false;
    }

    private boolean setSuccess0(V result) {
        // 設置執行成功的結果,如果入參result為null,則選用SUCCESS屬性,否則使用result
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setFailure0(Throwable cause) {
        // 設置執行失敗的結果,入參是Throwable類型,封裝為CauseHolder,存放在CauseHolder實例的cause屬性
        return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
    }

    private boolean setValue0(Object objResult) {
        // CAS更新result為入參objResult,result的期望值必須為null或者UNCANCELLABLE才能更新成功
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            // 判斷是否需要進行等待線程的通知
            if (checkNotifyWaiters()) {
                // 通知監聽器進行回調
                notifyListeners();
            }
            return true;
        }
        return false;
    }
    
    // 判斷是否需要進行等待線程的通知 - 其實是判斷是否需要通知監聽器回調
    private synchronized boolean checkNotifyWaiters() {
        // 如果等待線程數量大於0則調用Object#notifyAll()喚醒所有等待線程
        if (waiters > 0) {
            notifyAll();
        }
        // 如果listeners不為空(也就是存在監聽器)的時候才返回true
        return listeners != null;
    }
    // ... 省略其他代碼 ...
}    

Promise的基本使用

要使用NettyPromise模塊,並不需要引入Netty的所有依賴,這里只需要引入netty-common

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.44.Final</version>
</dependency>

EventExecutor選取方面,Netty已經准備了一個GlobalEventExecutor用於全局事件處理,這里可以直接選用(當然也可以自行實現EventExecutor或者用EventExecutor的其他實現類):

EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);

這里設計一個場景:異步下載一個鏈接的資源到磁盤上,下載完成之后需要異步通知下載完的磁盤文件路徑,得到通知之后打印下載結果到控制台中。

public class PromiseMain {

    public static void main(String[] args) throws Exception {
        String url = "http://xxx.yyy.zzz";
        EventExecutor executor = GlobalEventExecutor.INSTANCE;
        Promise<DownloadResult> promise = new DefaultPromise<>(executor);
        promise.addListener(new DownloadResultListener());
        Thread thread = new Thread(() -> {
            try {
                System.out.println("開始下載資源,url:" + url);
                long start = System.currentTimeMillis();
                // 模擬下載耗時
                Thread.sleep(2000);
                String location = "C:\\xxx\\yyy\\z.md";
                long cost = System.currentTimeMillis() - start;
                System.out.println(String.format("下載資源成功,url:%s,保存到:%s,耗時:%d ms", url, location, cost));
                DownloadResult result = new DownloadResult();
                result.setUrl(url);
                result.setFileDiskLocation(location);
                result.setCost(cost);
                // 通知結果
                promise.setSuccess(result);
            } catch (Exception ignore) {

            }
        }, "Download-Thread");
        thread.start();
        Thread.sleep(Long.MAX_VALUE);
    }

    @Data
    private static class DownloadResult {

        private String url;

        private String fileDiskLocation;

        private long cost;
    }

    private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {

        @Override
        public void operationComplete(Future<DownloadResult> future) throws Exception {
            if (future.isSuccess()) {
                DownloadResult downloadResult = future.getNow();
                System.out.println(String.format("下載完成通知,url:%s,文件磁盤路徑:%s,耗時:%d ms", downloadResult.getUrl(),
                        downloadResult.getFileDiskLocation(), downloadResult.getCost()));
            }
        }
    }
}

執行后控制台輸出:

開始下載資源,url:http://xxx.yyy.zzz
下載資源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗時:2000 ms
下載完成通知,url:http://xxx.yyy.zzz,文件磁盤路徑:C:\xxx\yyy\z.md,耗時:2000 ms

Promise適用的場景很多,除了異步通知的場景也能用於同步調用,它在設計上比JUCFuture靈活很多,基於Future擴展出很多新的特性,有需要的可以單獨引入此依賴直接使用。

Promise監聽器棧深度的問題

有些時候,由於封裝或者人為編碼異常等原因,監聽器的回調可能出現基於多個Promise形成的鏈(參考Issue-5302a promise listener chain),這樣子有可能出現遞歸調用深度過大而導致棧溢出,因此需要設置一個閾值,限制遞歸調用的最大棧深度,這個深度閾值暫且稱為棧深度保護閾值,默認值是8,可以通過系統參數io.netty.defaultPromise.maxListenerStackDepth覆蓋設置。這里貼出前面提到過的代碼塊:

private void notifyListeners() {
    EventExecutor executor = executor();
    // 事件執行器必須是事件循環類型,也就是executor.inEventLoop()為true的時候才啟用遞歸棧深度保護
    if (executor.inEventLoop()) {
        // 獲取當前線程綁定的InternalThreadLocalMap實例,這里類似於ThreadLocal
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        // 獲取當前線程的監聽器調用棧深度
        final int stackDepth = threadLocals.futureListenerStackDepth();
        // 監聽器調用棧深度如果不超過閾值MAX_LISTENER_STACK_DEPTH
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            // 調用notifyListenersNow()前先設置監聽器調用棧深度 + 1
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                // 調用notifyListenersNow()完畢后設置監聽器調用棧深度為調用前的數值,也就是恢復線程的監聽器調用棧深度
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // 如果監聽器調用棧深度超過閾值MAX_LISTENER_STACK_DEPTH,則直接每次通知監聽器當成一個新的異步任務處理
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

如果我們想模擬一個例子觸發監聽器調用棧深度保護,那么只需要想辦法在同一個EventLoop類型的線程中遞歸調用notifyListeners()方法即可。

最典型的例子就是在上一個Promise監聽器回調的方法里面觸發下一個Promise的監聽器的setSuccess()(簡單理解就是套娃),畫個圖理解一下:

測試代碼:

public class PromiseListenerMain {

    private static final AtomicInteger COUNTER = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        EventExecutor executor = ImmediateEventExecutor.INSTANCE;
        // root
        Promise<String> root = new DefaultPromise<>(executor);
        Promise<String> p1 = new DefaultPromise<>(executor);
        Promise<String> p2 = new DefaultPromise<>(executor);
        Promise<String> p3 = new DefaultPromise<>(executor);
        Promise<String> p4 = new DefaultPromise<>(executor);
        Promise<String> p5 = new DefaultPromise<>(executor);
        Promise<String> p6 = new DefaultPromise<>(executor);
        Promise<String> p7 = new DefaultPromise<>(executor);
        Promise<String> p8 = new DefaultPromise<>(executor);
        Promise<String> p9 = new DefaultPromise<>(executor);
        Promise<String> p10 = new DefaultPromise<>(executor);
        p1.addListener(new Listener(p2));
        p2.addListener(new Listener(p3));
        p3.addListener(new Listener(p4));
        p4.addListener(new Listener(p5));
        p5.addListener(new Listener(p6));
        p6.addListener(new Listener(p7));
        p7.addListener(new Listener(p8));
        p8.addListener(new Listener(p9));
        p9.addListener(new Listener(p10));
        root.addListener(new Listener(p1));
        root.setSuccess("success");
        Thread.sleep(Long.MAX_VALUE);
    }

    private static class Listener implements GenericFutureListener<Future<String>> {

        private final String name;
        private final Promise<String> promise;

        public Listener(Promise<String> promise) {
            this.name = "listener-" + COUNTER.getAndIncrement();
            this.promise = promise;
        }

        @Override
        public void operationComplete(Future<String> future) throws Exception {
            System.out.println(String.format("監聽器[%s]回調成功...", name));
            if (null != promise) {
                promise.setSuccess("success");
            }
        }
    }
}

因為有safeExecute()兜底執行,上面的所有Promise都會回調,這里可以采用IDEA的高級斷點功能,在步入斷點的地方添加額外的日志,輸出如下:

MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-9]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-0]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-1]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-2]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-3]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-4]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-5]回調成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-6]回調成功...
safeExecute(notifyListenersNow)執行----------
監聽器[listener-7]回調成功...
safeExecute(notifyListenersNow)執行----------
監聽器[listener-8]回調成功...

這里筆者有點疑惑,如果調用棧深度大於8,超出的部分會包裝為Runnable實例提交到事件執行器執行,豈不是把遞歸棧溢出的隱患變成了內存溢出的隱患(因為異步任務也有可能積壓,除非拒絕任務提交,那么具體要看EventExecutor的實現了)?

小結

Netty提供的Promise工具的源碼和使用方式都分析完了,設計理念和代碼都是十分值得借鑒,同時能夠開箱即用,可以在日常編碼中直接引入,減少重復造輪子的勞動和風險。

個人博客

(本文完 e-a-20200123 c-3-d)

技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):

娛樂公眾號(《天天沙雕》),甄選奇趣沙雕圖文和視頻不定期推送,緩解生活工作壓力:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM