Future和Promise


Future用於獲取異步操作的結果,而Promise則比較抽象,無法直接猜測出其功能。

Future

Future最早來源於JDK的java.util.concurrent.Future,它用於代表異步操作的結果。

可以通過get方法獲取操作結果,如果操作尚未完成,則會同步阻塞當前調用的線程;如果不允許阻塞太長時間或者無限期阻塞,可以通過帶超時時間的get方法獲取結果;如果到達超時時間操作仍然沒有完成,則拋出TimeoutException。通過isDone()方法可以判斷當前的異步操作是否完成,如果完成,無論成功與否,都返回true,否則返回false。通過cancel可以嘗試取消異步操作,它的結果是未知的,如果操作已經完成,或者發生其他未知的原因拒絕取消,取消操作將會失敗。

由於Netty的Future都是與異步I/O操作相關的,因此,命名為ChannelFuture,代表它與Channel操作相關。

在Netty中,所有的I/O操作都是異步的,這意味着任何I/O調用都會立即返回,而不是像傳統BIO那樣同步等待操作完成。異步操作會帶來一個問題:調用者如何獲取異步操作的結果?ChannelFuture就是為了解決這個問題而專門設計的。

ChannelFuture有兩種狀態:uncompleted和completed。當開始一個I/O操作時,一個新的ChannelFuture被創建,此時它處於uncompleted狀態——非失敗、非成功、非取消,因為I/O操作此時還沒有完成。一旦I/O操作完成,ChannelFuture將會被設置成completed,它的結果有如下三種可能。

  1. 操作成功;
  2. 操作失敗;
  3. 操作被取消。

 

ChannelFuture提供了一系列新的API,用於獲取操作結果、添加事件監聽器、取消I/O操作、同步等待等。

Netty強烈建議直接通過添加監聽器的方式獲取I/O操作結果,或者進行后續的相關操作。

ChannelFuture可以同時增加一個或者多個GenericFutureListener,也可以通過remove方法刪除GenericFutureListener。

當I/O操作完成之后,I/O線程會回調ChannelFuture中GenericFutureListener的operationComplete方法,並把ChannelFuture對象當作方法的入參。如果用戶需要做上下文相關的操作,需要將上下文信息保存到對應的ChannelFuture中。

推薦通過GenericFutureListener代替ChannelFuture的get等方法的原因是:當我們進行異步I/O操作時,完成的時間是無法預測的,如果不設置超時時間,它會導致調用線程長時間被阻塞,甚至掛死。而設置超時時間,時間又無法精確預測。利用異步通知機制回調GenericFutureListener是最佳的解決方案,它的性能最優。

ps:不要在ChannelHandler中調用ChannelFuture的await()方法,這會導致死鎖。原因是發起I/O操作之后,由I/O線程負責異步通知發起I/O操作的用戶線程,如果I/O線程和用戶線程是同一個線程,就會導致I/O線程等待自己通知操作完成,這就導致了死鎖,這跟經典的兩個線程互等待死鎖不同,屬於自己把自己掛死。

 

異步I/O操作有兩類超時:一個是TCP層面的I/O超時,另一個是業務邏輯層面的操作超時。兩者沒有必然的聯系,但是通常情況下業務邏輯超時時間應該大於I/O超時時間,它們兩者是包含的關系。

ps:ChannelFuture超時並不代表I/O超時,這意味着ChannelFuture超時后,如果沒有關閉連接資源,隨后連接依舊可能會成功,這會導致嚴重的問題。所以通常情況下,必須要考慮究竟是設置I/O超時還是ChannelFuture超時。

ChannelFuture源碼分析

AbstractFuture實現Future接口,它不允許I/O操作被取消。

    @Override
    public V get() throws InterruptedException, ExecutionException {
        //調用await()方法進行無限期阻塞,當I/O操作完成后會被notify()。
        await();
        Throwable cause = cause();
        //程序繼續向下執行,檢查I/O操作是否發生了異常
        if (cause == null) {
            //如果沒有異常,則通過getNow()方法獲取結果並返回。
            return getNow();
        }
        //否則,將異常堆棧進行包裝,拋出ExecutionException。
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        //調用await(long timeout, TimeUnit unit)方法即可
        if (await(timeout, unit)) {
            Throwable cause = cause();
            //如果沒有超時,則依次判斷是否發生了I/O異常等情況
            if (cause == null) {
                //操作與無參數的get方法相同。
                return getNow();
            }
            throw new ExecutionException(cause);
        }
        //如果超時,則拋出TimeoutException。
        throw new TimeoutException();
    }

Promise

Promise是可寫的Future,Future自身並沒有寫操作相關的接口,Netty通過Promise對Future進行擴展,用於設置I/O操作的結果。

Promise相關的寫操作接口定義如圖:

Netty發起I/O操作的時候,會創建一個新的Promise對象,例如調用ChannelHandlerContext的write(Object object)方法時,會創建一個新的ChannelPromise。

    @Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }
    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        DefaultChannelHandlerContext next = findContextOutbound(MASK_WRITE);
        next.invoker.invokeWrite(next, msg, promise);
        return promise;
    }
    @Override
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }

Promise源碼分析

分析一個它的實現子類的源碼DefaultPromise 。

setSuccess方法的實現

    @Override
    public Promise<V> setSuccess(V result) {
        //調用setSuccess0方法並對其操作結果進行判斷,如果操作成功,則調用notifyListeners方法通知listener。
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    private boolean setSuccess0(V result) {
        //首先判斷當前Promise的操作結果是否已經被設置,如果已經被設置,則不允許重復設置,返回設置失敗。
        if (isDone()) {
            return false;
        }
        //由於可能存在I/O線程和用戶線程同時操作Promise,所以設置操作結果的時候需要加鎖保護,防止並發操作。
        synchronized (this) {
            //對操作結果是否被設置進行二次判斷(為了提升並發性能的二次判斷),如果已經被設置,則返回操作失敗。
            if (isDone()) {
                return false;
            }
            //對操作結果result進行判斷,如果為空,說明僅僅需要notify在等待的業務線程,不包含具體的業務邏輯對象。
            //因此,將result設置為系統默認的SUCCESS。
            if (result == null) {
                this.result = SUCCESS;
            } else {
                //如果操作結果非空,將結果設置為result。
                this.result = result;
            }
            //如果有正在等待異步I/O操作完成的用戶線程或者其他系統線程
            if (hasWaiters()) {
                //調用notifyAll方法喚醒所有正在等待的線程。注意,notifyAll和wait方法都必須在同步塊內使用。
                notifyAll();
            }
        }
        return true;
    }

await方法的實現

    @Override
    public Promise<V> await() throws InterruptedException {
        //如果當前的Promise已經被設置,則直接返回。
        if (isDone()) {
            return this;
        }
        //如果線程已經被中斷,則拋出中斷異常。
        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }
        //通過同步關鍵字鎖定當前Promise對象
        synchronized (this) {
            //使用循環判斷對isDone結果進行判斷,進行循環判斷的原因是防止線程被意外喚醒導致的功能異常。
            while (!isDone()) {
                checkDeadLock();
                incWaiters();
                try {
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }

由於在I/O線程中調用Promise的await或者sync方法會導致死鎖,所以在循環體中需要對死鎖進行保護性校驗,防止I/O線程被掛死,最后調用java.lang.Object.wait()方法進行無限期等待,直到I/O線程調用setSuccess方法、trySuccess方法、setFailure或者tryFailure方法。

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM