淺析Netty的異步事件驅動(一)


本篇文章着重於淺析一下Netty的事件處理流程,Netty版本為netty-3.6.6.Final。

Netty定義了非常豐富的事件類型,代表了網絡交互的各個階段。並且當各個階段發生時,觸發相應的事件交給pipeline中定義的handler處理。

舉個例子,如下一段簡單的代碼:

ChannelFactory factory =
            new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        ServerBootstrap bootstrap = new ServerBootstrap(factory);

        bootstrap.setPipelineFactory(new PipelineFactory());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(7080));

Netty中觸發事件幾乎都是靠Channels類中的幾個靜態fire函數,因此通過在這些函數中加上Sysout方法,就可以看出這一個簡單的bind方法觸發了多少事件,如下:

fireChannelOpen(final Channel channel) upstream
bind(final Channel channel, final SocketAddress localAddress) downstream
fireChannelBound(final Channel channel, final SocketAddress localAddress) upstream

由此可見由這幾個函數觸發了OPEN、BOUND和BIND事件。

 

Netty中的事件大致可以分為upstream事件和downstream事件。簡單的說,upstream事件是內獲取外資源時觸發的事件如messageReceived等等,而downstream事件則是內向外發送資源時觸發的事件如write、connect等等。

與之相對應的,處理upstream事件的是upstreamhandler,處理downstream事件的是downstreamhandler,也有可以處理兩類事件的channelhandler。我們可以通過繼承handler來實現自己的業務邏輯。

Upstream事件的典型是messageReceived,在Netty中抽象為MessageEvent,即接收到了消息。而downstream事件的典型是write,在Netty中也抽象為MessageEvent,即發送消息。一個比較完整的事件表如下:

upstream事件包括:

downstream事件包括

 

Netty通過pipeline來存放upstreamhandler和downstreamhandler,在pipeline中添加handler的源代碼如下:

public class PipelineFactory implements ChannelPipelineFactory
{
    public ChannelPipeline getPipeline()
        throws Exception
    {
        ChannelPipeline pipeline = Channels.pipeline();

        //並不具體處理事件,只是輸出相關事件的string
        pipeline.addLast("1", new UpStreamHandler1());
        //單純的丟棄事件
        pipeline.addLast("2", new DiscardServer());
        return pipeline;
    }
}

在Netty中,upstreamhandler的處理順序是從前向后,而downstreamhandler的順序是從后往前。根本原因是pipeline中維護了一個雙向鏈表,handler的處理順序不同是因為upstream是從head->tail遍歷,而downstream事件是從tail->head遍歷。

以DefaultChannelPipeline為例,以下分別是添加handler至鏈表的代碼和訪問upstreamhandler的代碼

    public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            //一段典型的插入到鏈表尾部並更新尾指針的代碼
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

            callBeforeAdd(newTail);

            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);

            callAfterAdd(newTail);
        }
    }
    public void sendUpstream(ChannelEvent e) {
        //從頭部開始遍歷,相對的是,downstream則是從尾部開始遍歷
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        if (head == null) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "The pipeline contains no upstream handlers; discarding: " + e);
            }

            return;
        }

        sendUpstream(head, e);
    }

 

前文已經說過,Netty中通過Channels中的靜態方法來觸發事件,這些靜態函數列舉如下:

 1.fireChannelOpen;2.fireChannelBound;3.fireChannelConnected等等。

直接來看fireChannelOpen的源碼,看看Netty到底是怎么做的。

    public static void fireChannelOpen(final Channel channel) {
        // Notify the parent handler.
        if (channel.getParent() != null) {
            fireChildChannelStateChanged(channel.getParent(), channel);
        }
        channel.getPipeline().sendUpstream(
                new UpstreamChannelStateEvent(
                        channel, ChannelState.OPEN, Boolean.TRUE));
    }

這個sendUpstream到底是干嘛的了?

    void sendUpstream(final DefaultChannelHandlerContext ctx, final ChannelEvent e) {
        try {
            //從鏈表頭部開始,取出每個節點中的handler直接對channelevent進行處理
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

然后具體到handler又是怎么處理各個事件的了?以SimpleChannelUpstreamHandler為例,如下:

    public void handleUpstream(
            final ChannelHandlerContext ctx, final ChannelEvent e) throws Exception {
        //根據事件類型進行不同的處理
        if (e instanceof MessageEvent) {
            messageReceived(ctx, (MessageEvent) e);
        } else if (e instanceof WriteCompletionEvent) {
            WriteCompletionEvent evt = (WriteCompletionEvent) e;
            writeComplete(ctx, evt);
        } else if (e instanceof ChildChannelStateEvent) {
            ......
        }
    }
   
    public void messageReceived(
            final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
        //直接將事件傳至下一個handler進行處理
        ctx.sendUpstream(e);
    }

源碼看到現在已經很明顯了,在Netty里,pipeline中維護了一個handler的鏈表。每當事件觸發時,就會從雙向鏈表的頭部(對於downstream事件則是尾部)開始遍歷,這樣每個handler都會對事件進行處理。在handler里,可以根據事件類型做相應的處理后傳至下一個handler繼續處理(甚至可以截斷處理鏈)。

需要注意的是,單次流程是在一個線程中實現的,是串行的。因此如果其中一個handler是阻塞的,就會影響整體的效果。

當然netty也已經提供了解決方案,可以通過繼承ExecutionHandler的handler來處理這類耗時的操作。而這么做的原理是什么,請期待下一篇文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM