Netty之大動脈Pipeline


Pipeline 設計原理

Channel 與ChannelPipeline:

  相信大家都已經知道,在Netty 中每個Channel 都有且僅有一個ChannelPipeline 與之對應,它們的組成關系如下:

  通過上圖我們可以看到, 一個Channel 包含了一個ChannelPipeline , 而ChannelPipeline 中又維護了一個由ChannelHandlerContext 組成的雙向鏈表。這個鏈表的頭是HeadContext,鏈表的尾是TailContext,並且每個ChannelHandlerContext 中又關聯着一個ChannelHandler。圖示給了我們一個對ChannelPipeline 的直觀認識,但是實際上Netty 實現的Channel 是否真的是這樣的呢?我們繼續用源碼說話。在前我們已經知道了一個Channel 的初始化的基本過程,下面我們再回顧一下。下面的代碼是AbstractChannel 構造器:

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

  AbstractChannel 有一個pipeline 字段,在構造器中會初始化它為DefaultChannelPipeline 的實例。這里的代碼就印證了一點:每個Channel 都有一個ChannelPipeline。接着我們跟蹤一下DefaultChannelPipeline 的初始化過程,首先進入到DefaultChannelPipeline 構造器中:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

  在DefaultChannelPipeline 構造器中, 首先將與之關聯的Channel 保存到字段channel 中。然后實例化兩個ChannelHandlerContext:一個是HeadContext 實例head,另一個是TailContext 實例tail。接着將head 和tail 互相指向, 構成一個雙向鏈表。

  特別注意的是:我們在開始的示意圖中head 和tail 並沒有包含ChannelHandler,這是因為HeadContext 和TailContext繼承於AbstractChannelHandlerContext 的同時也實現了ChannelHandler 接口了,因此它們有Context 和Handler的雙重屬性。

再探ChannelPipeline 的初始化:

  前面的學習我們已經對ChannelPipeline 的初始化有了一個大致的了解,不過當時重點沒有關注ChannelPipeline,因此沒有深入地分析它的初始化過程。那么下面我們就來看一下具體的ChannelPipeline 的初始化都做了哪些工作吧。先回顧一下,在實例化一個Channel 時,會伴隨着一個ChannelPipeline 的實例化,並且此Channel 會與這個ChannelPipeline相互關聯,這一點可以通過NioSocketChannel 的父類AbstractChannel 的構造器予以佐證:

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

  當實例化一個NioSocketChannel 是,其pipeline 字段就是我們新創建的DefaultChannelPipeline 對象。可以看到,在DefaultChannelPipeline 的構造方法中,將傳入的channel 賦值給字段this.channel,接着又實例化了兩個特殊的字段:tail 與head,這兩個字段是一個雙向鏈表的頭和尾。其實在DefaultChannelPipeline 中,維護了一個以AbstractChannelHandlerContext 為節點的雙向鏈表,這個鏈表是Netty 實現Pipeline 機制的關鍵。再回顧一下head和tail 的類層次結構:

  從類層次結構圖中可以很清楚地看到,head 實現了ChannelInboundHandler與ChannelOutboundHandler,而tail 實現了ChannelOutboundHandler 接口,並且它們都實現了ChannelHandlerContext 接口, 因此可以說head 和tail 即是一個ChannelHandler,又是一個ChannelHandlerContext。接着看HeadContext與TailContext 構造器中的代碼:

HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true); this.unsafe = pipeline.channel().unsafe(); this.setAddComplete(); } TailContext(DefaultChannelPipeline pipeline) { super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false); this.setAddComplete(); }

  我們可以看到,鏈表中head 是一個ChannelOutboundHandler,而tail 則是一個ChannelInboundHandler。它調用了父類AbstractChannelHandlerContext 的構造器,並傳入參數inbound = false,outbound = true。而TailContext 的構造器與HeadContext 的相反,它調用了父類AbstractChannelHandlerContext 的構造器,並傳入參數inbound = true,outbound = false。即header 是一個OutBoundHandler,而tail 是一個InBoundHandler。

ChannelInitializer 的添加:

  前面我們已經分析過Channel 的組成,其中我們了解到,最開始的時候ChannelPipeline 中含有兩個ChannelHandlerContext(同時也是ChannelHandler),但是這個Pipeline 並不能實現什么特殊的功能,因為我們還沒有給它添加自定義的ChannelHandler。通常來說,我們在初始化Bootstrap,會添加我們自定義的ChannelHandler,就以我們具體的客戶端啟動代碼片段來舉例:

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
  @Override
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new ChatClientHandler(nickName));
  }
});

  上面代碼的初始化過程,相信大家都不陌生。在調用handler 時,傳入了ChannelInitializer 對象,它提供了一個initChannel()方法給我我們初始化ChannelHandler。最后將這個匿名的Handler保存到AbstractBootstrap中。那么這個初始化過程是怎樣的呢?下面我們來揭開它的神秘面紗。

  ChannelInitializer 實現了ChannelHandler,那么它是在什么時候添加到ChannelPipeline 中的呢?通過代碼跟蹤,我們發現它是在Bootstrap 的init()方法中添加到ChannelPipeline 中的,其代碼如下(以客戶端為例):

void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelHandler[]{this.config.handler()});
     。。。。。。
}
//AbstractBootstrapConfig
public final ChannelHandler handler() {
  return this.bootstrap.handler();
}
//AbstractBootstrap
final ChannelHandler handler() {
        return this.handler;
}

  從上面的代碼可見,將handler()返回的ChannelHandler 添加到Pipeline 中,而handler()返回的其實就是我們在初始化Bootstrap 時通過handler()方法設置的ChannelInitializer 實例,因此這里就是將ChannelInitializer 插入到了Pipeline的末端。此時Pipeline 的結構如下圖所示:

  這時候,有小伙伴可能就有疑惑了,我明明插入的是一個ChannelInitializer 實例,為什么在ChannelPipeline 中的雙向鏈表中的元素卻是一個ChannelHandlerContext 呢?我們繼續去源碼中尋找答案。

  剛才,我們提到,在Bootstrap 的init()方法中會調用p.addLast()方法,將ChannelInitializer 插入到鏈表的末端:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized(this) {
            checkMultiplicity(handler);
            newCtx = this.newContext(group, this.filterName(name, handler), handler);
       
this.addLast0(newCtx);
 }
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
}

  addLast()有很多重載的方法,我們只需關注這個比較重要的方法就行。上面的addLast()方法中,首先檢查ChannelHandler 的名字是否是重復,如果不重復,則調用newContex()方法為這個Handler 創建一個對應的DefaultChannelHandlerContext 實例,並與之關聯起來(Context 中有一個handler 屬性保存着對應的Handler 實例)。為了添加一個handler 到pipeline 中,必須把此handler 包裝成ChannelHandlerContext。因此在上面的代碼中我們可以看到新實例化了一個newCtx 對象,並將handler 作為參數傳遞到構造方法中。那么我們來看一下實例化的DefaultChannelHandlerContext 到底有什么玄機吧。首先看它的構造器:

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        } else {
            this.handler = handler;
        }
    }

  在DefaultChannelHandlerContext 的構造器中,調用了兩個很有意思的方法:isInbound()與isOutbound(),這兩個方法是做什么的呢?從源碼中可以看到,當一個handler 實現了ChannelInboundHandler 接口,則isInbound 返回true;類似地,當一個handler 實現了ChannelOutboundHandler 接口,則isOutbound 就返回true。而這兩個boolean 變量會傳遞到父類AbstractChannelHandlerContext 中,並初始化父類的兩個字段:inbound 與outbound。那么這里的ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound 與outbound 字段分別是什么呢? 那就看一下ChannelInitializer 到底實現了哪個接口不就行了?如下是ChannelInitializer 的類層次結構圖:

  從類圖中可以清楚地看到,ChannelInitializer 僅僅實現了ChannelInboundHandler 接口,因此這里實例化的DefaultChannelHandlerContext 的inbound = true,outbound = false。兜了一圈,不就是inbound 和outbound 兩個字段嘛,為什么需要這么大費周折地分析一番?其實這兩個字段關系到pipeline 的事件的流向與分類,因此是十分關鍵的,不過我在這里先賣個關子, 后面我們再來詳細分析這兩個字段所起的作用。至此, 我們暫時先記住一個結論:ChannelInitializer 所對應的DefaultChannelHandlerContext 的inbound =true,outbound = false。當創建好Context 之后,就將這個Context 插入到Pipeline 的雙向鏈表中

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = this.tail.prev;
        newCtx.prev = prev;
        newCtx.next = this.tail;
        prev.next = newCtx;
        this.tail.prev = newCtx;
    }

   添加完ChannelInitializer的Pipeline現在是長這樣的:

 自定義ChannelHandler 的添加過程:

  前面我們已經分析了ChannelInitializer 是如何插入到Pipeline 中的,接下來就來探討ChannelInitializer 在哪里被調用,ChannelInitializer 的作用以及我們自定義的ChannelHandler 是如何插入到Pipeline 中的。先簡單復習一下Channel 的注冊過程:

  1. 首先在AbstractBootstrap 的initAndRegister()中,通過group().register(channel),調用MultithreadEventLoopGroup 的register()方法。
  2. 在MultithreadEventLoopGroup 的register()中調用next()獲取一個可用的SingleThreadEventLoop,然后調用它的register()方法。
  3. 在SingleThreadEventLoop 的register()方法中,通過channel.unsafe().register(this, promise)方法獲取channel的unsafe()底層IO 操作對象,然后調用它的register()。
  4. 在AbstractUnsafe 的register()方法中,調用register0()方法注冊Channel 對象。
  5. 在AbstractUnsafe 的register0()方法中,調用AbstractNioChannel 的doRegister()方法。
  6. AbstractNioChannel 的doRegister()方法調用javaChannel().register(eventLoop().selector, 0, this)將Channel對應的Java NIO 的SockerChannel 對象注冊到一個eventLoop 的Selector 中,並且將當前Channel 作為attachment。

  而我們自定義ChannelHandler 的添加過程,發生在AbstractUnsafe 的register0()方法中,在這個方法中調用了pipeline.fireChannelRegistered()方法,其代碼實現如下:

private void register0(ChannelPromise promise) {
        boolean firstRegistration = this.neverRegistered;
        AbstractChannel.this.doRegister();
        this.neverRegistered = false;
        AbstractChannel.this.registered = true;
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
        this.safeSetSuccess(promise);
        AbstractChannel.this.pipeline.fireChannelRegistered();
}
public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
        return this;
}

  再看AbstractChannelHandlerContext 的invokeChannelRegistered()方法:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

  很顯然,這個代碼會從head 開始遍歷Pipeline 的雙向鏈表,然后 findContextInbound()  找到第一個屬性inbound 為true 的ChannelHandlerContext 實例。看代碼:

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
}
public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(this.findContextInbound());
        return this;
}

  想起來了沒?我們在前面分析ChannelInitializer 時,花了大量的篇幅來分析了inbound和outbound 屬性,現在這里就用上了。回想一下,ChannelInitializer 實現了ChannelInboudHandler,因此它所對應的ChannelHandlerContext 的inbound 屬性就是true,因此這里返回就是ChannelInitializer 實例所對應的ChannelHandlerContext 對象,如下圖所示:

  當獲取到inbound 的Context 后,就調用它的invokeChannelRegistered()方法:

private void invokeChannelRegistered() {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelRegistered(this);
            } catch (Throwable var2) {
                this.notifyHandlerException(var2);
            }
        } else {
            this.fireChannelRegistered();
        }
}

  我們已經知道,每個ChannelHandler 都和一個ChannelHandlerContext 關聯,我們可以通過ChannelHandlerContext獲取到對應的ChannelHandler。因此很明顯,這里handler()返回的對象其實就是一開始我們實例化的ChannelInitializer 對象,並接着調用了ChannelInitializer 的channelRegistered()方法。看到這里, 應該會覺得有點眼熟了。ChannelInitializer 的channelRegistered()這個方法我們在一開始的時候已經接觸到了,但是我們並沒有深入地分析這個方法的調用過程。下面我們來看這個方法中到底有什么玄機,繼續看代碼:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
  protected abstract void initChannel(C ch) throws Exception;
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
            removeState(ctx);
        } else {
            ctx.fireChannelRegistered();
        }
    }private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }
}

  initChannel((C) ctx.channel())這個方法我們也很熟悉,它就是我們在初始化Bootstrap 時,調用handler 方法傳入的匿名內部類所實現的方法:

protected void initChannel(SocketChannel ch) throws Exception {
      ChannelPipeline pipeline = ch.pipeline();
      pipeline.addLast("handler", new MyClient());
}

  因此,當調用這個方法之后, 我們自定義的ChannelHandler 就插入到了Pipeline,此時Pipeline 的狀態如下圖所示:

 

  當添加完成自定義的ChannelHandler 后,在finally 代碼塊會刪除自定義的ChannelInitializer,也就是remove(ctx)最終調用ctx.pipeline().remove(this),因此最后的Pipeline 的狀態如下:

  至此,自定義ChannelHandler 的添加過程也分析得差不多了。

ChannelHandler 默認命名規則

  不知道大家注意到沒有,pipeline.addXXX 都有一個重載的方法,例如addLast()它有一個重載的版本是:ChannelPipeline addLast(String name, ChannelHandler handler);第一個參數指定添加的handler 的名字(更准確地說是ChannelHandlerContext 的名字,說成handler 的名字更便於理解)。那么handler 的名字有什么用呢?如果我們不設置name,那么handler 默認的名字是怎樣呢?帶着這些疑問,我們依舊還是去源碼中找到答案。還是以addLast()方法為例:

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

  這個方法會調用重載的addLast()方法:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);return this;
    }

  第一個參數被設置為null,我們不用關心它。第二參數就是這個handler 的名字。看代碼可知,在添加一個handler之前,需要調用checkMultiplicity()方法來確定新添加的handler 名字是否與已添加的handler 名字重復。

  如果我們調用的是如下的addLast()方法:ChannelPipeline addLast(ChannelHandler... handlers);那么Netty 就會調用generateName()方法為新添加的handler 自動生成一個默認的名字:

private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
 

  而generateName()方法會接着調用generateName0()方法來實際生成一個新的handler 名字:

private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
}

  默認命名的規則很簡單,就是用反射獲取handler 的simpleName 加上"#0",因此我們自定義ChatClientHandler 的名字就是"ChatClientHandler#0"。

Pipeline 的事件傳播機制

  前面章節中,我們已經知道AbstractChannelHandlerContext 中有inbound 和outbound 兩個boolean 變量,分別用於標識Context 所對應的handler 的類型,即:

  1. inbound 為true 是,表示其對應的ChannelHandler 是ChannelInboundHandler 的子類。
  2. outbound 為true 時,表示對應的ChannelHandler 是ChannelOutboundHandler 的子類。

  這里大家肯定還有很多疑惑,不知道這兩個字段到底有什么作用? 這還要從ChannelPipeline 的事件傳播類型說起。Netty 中的傳播事件可以分為兩種:Inbound 事件和Outbound 事件。如下是從Netty 官網針對這兩個事件的說明:

  從上圖可以看出,inbound 事件和outbound 事件的流向是不一樣的,inbound 事件的流行是從下至上,而outbound剛好相反,是從上到下。並且inbound 的傳遞方式是通過調用相應的ChannelHandlerContext.fireIN_EVT()方法,而outbound 方法的的傳遞方式是通過調用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext的fireChannelRegistered()調用會發送一個ChannelRegistered 的inbound 給下一個ChannelHandlerContext,而ChannelHandlerContext 的bind()方法調用時會發送一個bind 的outbound 事件給下一個ChannelHandlerContext。

    Inbound 事件傳播方法有:

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext var1) throws Exception;

    void channelUnregistered(ChannelHandlerContext var1) throws Exception;

    void channelActive(ChannelHandlerContext var1) throws Exception;

    void channelInactive(ChannelHandlerContext var1) throws Exception;

    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelReadComplete(ChannelHandlerContext var1) throws Exception;

    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

  Outbound 事件傳播方法有:

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void read(ChannelHandlerContext var1) throws Exception;

    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

    void flush(ChannelHandlerContext var1) throws Exception;
}

  大家應該發現了規律:inbound 類似於是事件回調(響應請求的事件),而outbound 類似於主動觸發(發起請求的事件)。注意,如果我們捕獲了一個事件,並且想讓這個事件繼續傳遞下去,那么需要調用Context 對應的傳播方法 fireXXX,例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("連接成功");
    ctx.fireChannelActive();
  }
}

Outbound 事件傳播方式:

  Outbound 事件都是請求事件(request event),即請求某件事情的發生,然后通過Outbound 事件進行通知。Outbound 事件的傳播方向是tail -> customContext -> head。我們接下來以connect 事件為例,分析一下Outbound 事件的傳播機制。首先,當用戶調用了Bootstrap 的connect()方法時,就會觸發一個Connect 請求事件,我們就發現AbstractChannel 的connect()其實由調用了DefaultChannelPipeline 的connect()方法:

public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}

  而pipeline.connect()方法的實現如下:

public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }

  可以看到,當outbound 事件(這里是connect 事件)傳遞到Pipeline 后,它其實是以tail 為起點開始傳播的。而tail.connect()其實調用的是AbstractChannelHandlerContext 的connect()方法:

public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);return promise;
    }

  findContextOutbound()方法顧名思義,它的作用是以當前Context 為起點,向Pipeline 中的Context 雙向鏈表的前端尋找第一個outbound 屬性為true 的Context(即關聯ChannelOutboundHandler 的Context),然后返回。findContextOutbound()方法代碼實現如下:

private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

  當我們找到了一個outbound 的Context 后,就調用它的invokeConnect()方法,這個方法中會調用Context 其關聯的ChannelHandler 的connect()方法

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

  如果用戶沒有重寫ChannelHandler 的connect()方法,那么會調用ChannelOutboundHandlerAdapter 的connect()實現:

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

  我們看到,ChannelOutboundHandlerAdapter 的connect()僅僅調用了ctx.connect(),而這個調用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect這樣的循環中,直到connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節點,即head 中。為什么會傳遞到head 中呢?回想一下,head 實現了ChannelOutboundHandler,因此它的outbound 屬性是true。因為head 本身既是一個ChannelHandlerContext,又實現了ChannelOutboundHandler 接口,因此當connect()消息傳遞到head 后,會將消息轉遞到對應的ChannelHandler 中處理,而head 的handler()方法返回的就是head 本身:

public ChannelHandler handler() {
      return this;
}

  因此最終connect()事件是在head 中被處理。head 的connect()事件處理邏輯如下:

public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

  到這里, 整個connect()請求事件就結束了。下圖中描述了整個connect()請求事件的處理過程:

  我們僅僅以connect()請求事件為例,分析了outbound 事件的傳播過程,但是其實所有的outbound 的事件傳播都遵循着一樣的傳播規律,小伙伴們可以試着分析一下其他的outbound 事件,體會一下它們的傳播過程。

Inbound 事件傳播方式:

  Inbound 事件和Outbound 事件的處理過程是類似的,只是傳播方向不同。Inbound 事件是一個通知事件,即某件事已經發生了,然后通過Inbound 事件進行通知。Inbound 通常發生在Channel的狀態的改變或IO 事件就緒。Inbound 的特點是它傳播方向是head -> customContext -> tail。上面我們分析了connect()這個Outbound 事件,那么接着分析connect()事件后會發生什么Inbound 事件,並最終找到Outbound 和Inbound 事件之間的聯系。當connect()這個Outbound 傳播到unsafe 后,其實是在AbstractNioUnsafe的connect()方法中進行處理的:

public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
       boolean wasActive = isActive();
       if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
       } else {
       }
}

  在AbstractNioUnsafe 的connect()方法中,首先調用doConnect()方法進行實際上的Socket 連接,當連接上后會調用fulfillConnectPromise()方法:

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }
        }

  我們看到,在fulfillConnectPromise()中,會通過調用pipeline().fireChannelActive()方法將通道激活的消息(即Socket 連接成功)發送出去。而這里,當調用pipeline.fireXXX 后,就是Inbound 事件的起點。因此當調用pipeline().fireChannelActive()后,就產生了一個ChannelActive Inbound 事件,我們就從這里開始看看這個Inbound事件是怎么傳播的?

public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

  果然, 在fireChannelActive()方法中,調用的是head.invokeChannelActive(),因此可以證明Inbound 事件在Pipeline中傳輸的起點是head。那么,在head.invokeChannelActive()中又做了什么呢?

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

  接下去的調用流程是:

private void invokeChannelActive() {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelActive(this);
            } catch (Throwable var2) {
                this.notifyHandlerException(var2);
            }
        } else {
            this.fireChannelActive();
        }

}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            this.readIfIsAutoRead();
        }
public ChannelHandlerContext fireChannelActive() {
        AbstractChannelHandlerContext next = this.findContextInbound();
        invokeChannelActive(next);
        return this;
    }

  上面的代碼應該很熟悉了。回想一下在Outbound 事件(例如connect()事件)的傳輸過程中時,我們也有類似的操作:

  1. 首先調用findContextInbound(),從Pipeline 的雙向鏈表中中找到第一個屬性inbound 為true 的Context,然后將其返回。
  2. 調用Context 的invokeChannelActive()方法.

  invokeChannelActive()方法源碼如下:

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

  這個方法和Outbound 的對應方法(如:invokeConnect()方法)如出一轍。與Outbound 一樣,如果用戶沒有重寫channelActive() 方法,那就會調用ChannelInboundHandlerAdapter 的channelActive()方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

  同樣地, 在ChannelInboundHandlerAdapter 的channelActive()中,僅僅調用了ctx.fireChannelActive()方法,因此就會進入Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()這樣的循環中。同理,tail 本身既實現了ChannelInboundHandler 接口,又實現了ChannelHandlerContext 接口,因此當channelActive()消息傳遞到tail 后,會將消息轉遞到對應的ChannelHandler 中處理,而tail 的handler()返回的就是tail 本身:

public ChannelHandler handler() {
            return this;
        }

  因此channelActive Inbound 事件最終是在tail 中處理的,我們看一下它的處理方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }

  TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 處理方法時就會發現,它們的實現都是空的。可見,如果是Inbound,當用戶沒有實現自定義的處理器時,那么默認是不處理的。下圖描述了Inbound事件的傳輸過程:

Pipeline 事件傳播小結:

  Outbound 事件總結:

Outbound 事件是請求事件(由connect()發起一個請求,並最終由unsafe 處理這個請求)。

Outbound 事件的發起者是Channel。

Outbound 事件的處理者是unsafe。

Outbound 事件在Pipeline 中的傳輸方向是tail -> head。

在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx 的方法(如:ctx.connect()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。

Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()

  Inbound 事件總結:

Inbound 事件是通知事件,當某件事情已經就緒后,通知上層。

Inbound 事件發起者是unsafe。

Inbound 事件的處理者是Channel,如果用戶沒有實現自定義的處理方法,那么Inbound 事件默認的處理者是TailContext,並且其處理方法是空實現。Inbound 事件在Pipeline 中傳輸方向是head -> tail。

在ChannelHandler 中處理事件時,如果這個Handler 不是最后一個Handler,則需要調用ctx.fireIN_EVT()事件(如:ctx.fireChannelActive()方法)將此事件繼續傳播下去。如果不這樣做,那么此事件的傳播會提前終止。

Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().

  outbound 和inbound 事件設計上十分相似,並且Context 與Handler 直接的調用關系也容易混淆,因此我們在閱讀這里的源碼時,需要特別的注意。

Handler 的各種姿勢:

  ChannelHandlerContext

  每個ChannelHandler 被添加到ChannelPipeline 后,都會創建一個ChannelHandlerContext 並與之創建的ChannelHandler 關聯綁定。ChannelHandlerContext 允許ChannelHandler 與其他的ChannelHandler 實現進行交互。ChannelHandlerContext 不會改變添加到其中的ChannelHandler,因此它是安全的。下圖描述了ChannelHandlerContext、ChannelHandler、ChannelPipeline 的關系:

 

Channel 的生命周期:

  Netty 有一個簡單但強大的狀態模型,並完美映射到ChannelInboundHandler 的各個方法。下面是Channel 生命周期中四個不同的狀態:

  1. channelUnregistered() Channel已創建,還未注冊到一個EventLoop上
  2. channelRegistered() Channel已經注冊到一個EventLoop上
  3. channelActive() Channel是活躍狀態(連接到某個遠端),可以收發數據
  4. channelInactive() Channel未連接到遠端

  一個Channel 正常的生命周期如下圖所示。隨着狀態發生變化相應的事件產生。這些事件被轉發到ChannelPipeline中的ChannelHandler 來觸發相應的操作。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM