淺析Netty的異步事件驅動(二)


上一篇文件淺析了Netty中的事件驅動過程,這篇主要寫一下異步相關的東東。

首先,什么是異步了?

異步的概念和同步相對。當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的部件在完成后,通過狀態、通知和回調來通知調用者。

異步的好處是不會造成阻塞,在高並發情形下會更穩定和更高的吞吐量。

 

說到Netty中的異步,就不得不提ChannelFuture。Netty中的IO操作是異步的,包括bind、write、connect等操作會簡單的返回一個ChannelFuture,調用者並不能立刻獲得結果。

當future對象剛剛創建時,處於非完成狀態。可以通過isDone()方法來判斷當前操作是否完成。通過isSuccess()判斷已完成的當前操作是否成功,getCause()來獲取已完成的當前操作失敗的原因,isCancelled()來判斷已完成的當前操作是否被取消。

調用者可以通過返回的ChannelFuture來獲取操作執行的狀態,注冊監聽函數來執行完成后的操作。

其實同步的阻塞和異步的非阻塞可以直接通過代碼看出:

這是一段阻塞的代碼:

        printTime("開始connect: ");
        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        printTime("connect結束: ");
        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();

這段代碼的輸出結果是:

開始connect: 2013-07-17 14:45:28

connect結束: 2013-07-17 14:45:29

很明顯的可以看出,connect操作導致整段代碼阻塞了大概1秒。

 

以下這段是異步非阻塞的代碼:

        printTime("開始connect: ");
        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        future.addListener(new ChannelFutureListener()
        {
            public void operationComplete(final ChannelFuture future)
                throws Exception
            {
                printTime("connect結束: ");
            }
        });

        printTime("異步時間: ");

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();

輸出結果是:

開始connect: 2013-07-17 14:50:09
異步時間: 2013-07-17 14:50:09
connect結束: 2013-07-17 14:50:09

可以明顯的看出,在異步模式下,上面這段代碼沒有阻塞,在執行connect操作后直接執行到printTime("異步時間: "),隨后connect完成,future的監聽函數輸出connect操作完成。

關於同步的阻塞和異步的非阻塞可以打一個很簡單的比方,A向B打電話,通知B做一件事。

在同步模式下,A告訴B做什么什么事,然后A依然拿着電話,等待B做完,才可以做下一件事;

在異步模式下,A告訴B做什么什么事,A掛電話,做自己的事。B做完后,打電話通知A做完了。

 

如上面代碼所顯示的,ChannelFuture同時提供了阻塞和非阻塞方法,接下來就簡單的分析一下各自是怎么實現的。

阻塞方法是await系列,這些方法要小心翼翼的使用,不可以在handler內調用這些方法,否則會造成死鎖。

public ChannelFuture awaitUninterruptibly() {
        boolean interrupted = false;
        synchronized (this) {
            //循環等待到完成
            while (!done) {
                checkDeadLock();
                waiters++;
                try {
                    wait();
                } catch (InterruptedException e) {
                    //不允許中斷
                    interrupted = true;
                } finally {
                    waiters--;
                }
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }

一個標志位,一個while循環,代碼簡潔明了。

非阻塞則是添加監聽類ChannelFutureListener,通過覆蓋ChannelFutureListener的operationComplete執行業務邏輯。

public void addListener(final ChannelFutureListener listener) {
        if (listener == null) {
            throw new NullPointerException("listener");
        }

        boolean notifyNow = false;
        synchronized (this) {
            if (done) {
                notifyNow = true;
            } else {
                if (firstListener == null) {
                    //listener鏈表頭
                    firstListener = listener;
                } else {
                    if (otherListeners == null) {
                        otherListeners = new ArrayList<ChannelFutureListener>(1);
                    }
                    //添加到listener鏈表中,以便操作完成后遍歷操作
                    otherListeners.add(listener);
                }

               ......

        if (notifyNow) {
            //通知listener進行處理
            notifyListener(listener);
        }
    }

然后當操作完成后直接遍歷listener鏈表,把每個listener取出來執行。以setSuccess為例,如下:

public boolean setSuccess() {
        synchronized (this) {
            // Allow only once.
            if (done) {
                return false;
            }

            done = true;
            //喚醒所有等待
            if (waiters > 0) {
                notifyAll();
            }
        }

        //通知所有listener
        notifyListeners();
        return true;
    }
private void notifyListeners() {
        if (firstListener != null) {
            //執行listener表頭
            notifyListener(firstListener);
            firstListener = null;

            //挨個執行其余的listener
            if (otherListeners != null) {
                for (ChannelFutureListener l: otherListeners) {
                    notifyListener(l);
                }
                otherListeners = null;
            }
        }
    }

其實這部分代碼的邏輯很簡單,就是注冊回調函數,當操作完成后自動調用回調函數,就達到了異步的效果。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM