從netty-example分析Netty組件續


上文我們從netty-example的Discard服務器端示例分析了netty的組件,今天我們從另一個簡單的示例Echo客戶端分析一下上個示例中沒有出現的netty組件。

1. 服務端的連接處理,讀寫處理

echo客戶端代碼:

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
 Bootstrap b = new Bootstrap(); b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

從上面的代碼可以看出,discard的服務端代碼和echo的客戶端代碼基本相似,不同的是一個使用ServerBootStrap,另一個使用BootStrap而已。先看一下連接過程

NioEventLoop處理key的過程,

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore  unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write  ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

2.1 連接流程

調用AbstractNioByteChannel的finishConnect()方法

        @Override
        public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }

觸發channelActive操作:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }

 

2.2 讀操作流程

調用AbstractNioByteChannel的read()方法,

  典型的autoRead流程如下:

  1. 當socket建立連接時,Netty觸發一個inbound事件channelActive,然后提交一個read()請求給本身(參考DefaultChannelPipeline.fireChannelActive())

  2. 接收到read()請求后,Netty從socket讀取消息。

  3. 當讀取到消息時,Netty觸發channelRead()。

  4. 當讀取不到消息后,Netty觸發ChannelReadCompleted().

  5. Netty提交另外一個read()請求來繼續從socket中讀取消息。

@Override
        public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime  removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; try { boolean needReadPendingReset = true; do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer.  byteBuf.release(); byteBuf = null; break; } allocHandle.incMessagesRead(1); if (needReadPendingReset) { needReadPendingReset = false; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (allocHandle.lastBytesRead() < 0) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }

 觸發讀操作

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelRead(next, pipeline.touch(msg, next)); return this; }

讀完觸發完成事件

    @Override
    public ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); } return this; } @Override public ChannelHandlerContext fireChannelReadComplete() { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelReadComplete(next); return this; }

2.3 寫操作流程

寫操作

 @SuppressWarnings("deprecation")
        protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }

寫操作具體實現(以NioSocketChannel為例):

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE  clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer.  in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely.  incompleteWrite(setOpWrite); break; } } }

 

2. ChannelInboundHandler和ChannelInboundHandler

Echo的handler代碼如下:

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

上面的代碼出現了兩個重要的netty組件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已經講到。我們這次重點分析一下    ChannelInboundHandlerAdapter及其相關類。

  ChannelInboundHandlerAdapter繼承了ChannelInboundHandler,它的作用是將operation轉到ChannelPipeline中的下一個ChannelHandler。子類可以重寫一個方法的實現來改變。注意:在方法#channelRead(ChannelHandlerContext, Object)自動返回前,message不會釋放。若需要一個可以自動釋放接收消息的ChannelInboundHandler實現時,請考慮SimpleChannelInboundHandler。

  ChannelOutboundHandlerAdapter繼承了ChannelOutboundHandler,它僅通過調用ChannelHandlerContext跳轉到每個方法。

  ChannelInboundHandler處理輸入的事件,事件由外部事件源產生,例如從一個socket接收到數據。 

  ChannelOutboundHandler解析你自己應用提交的操作。

 2.1 ChannelInboundHandler.channelActive() 

從源碼角度看一下,Netty觸發一個inbound事件channelActive(以LoggingHandler為例):

   @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "ACTIVE"));
        }
        ctx.fireChannelActive();
    }

觸發操作如下:

     @Override
    public ChannelHandlerContext fireChannelActive() {
        AbstractChannelHandlerContext next = findContextInbound();
        next.invoker().invokeChannelActive(next);
        return this;
    }

   private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

 invokeChannelActive方法實現:

    @Override
    public void invokeChannelActive(final ChannelHandlerContext ctx) {
        if (executor.inEventLoop()) {
            invokeChannelActiveNow(ctx);
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    invokeChannelActiveNow(ctx);
                }
            });
        }
    }
    public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
        try {
            ((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);
        }
    }

2.2 ChannelOutboundHandler.Read()

讀的流程:

    @Override
    public ChannelHandlerContext read() {
        AbstractChannelHandlerContext next = findContextOutbound();
        next.invoker().invokeRead(next);
        return this;
    }

查找outbound的過程:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

觸發讀操作:

    @Override
    public void invokeRead(final ChannelHandlerContext ctx) {
        if (executor.inEventLoop()) {
            invokeReadNow(ctx);
        } else {
            AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
            Runnable task = dctx.invokeReadTask;
            if (task == null) {
                dctx.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        invokeReadNow(ctx);
                    }
                };
            }
            executor.execute(task);
        }
    }

2.3 ChannelOutboundHandler.write()

以實現類LoggingHandler為例:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "WRITE", msg));
        }
        ctx.write(msg, promise);
    }

具體實現:

    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise); return promise;
    }

寫操作的觸發

    @Override
    public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
        if (!validatePromise(ctx, promise, true)) {
            // promise cancelled
            ReferenceCountUtil.release(msg);
            return;
        }

        if (executor.inEventLoop()) {
            invokeWriteNow(ctx, msg, promise);
        } else {
            safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
        }
    }

立刻觸發

    public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

小結:

   Netty中,可以注冊多個handler。ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行,如下圖所示,按照注冊的先后順序對Handler進行排序,request進入Netty后的執行順序為:

 

 參考文獻

【1】http://blog.csdn.net/u013252773/article/details/21195593

【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM