上文我們從netty-example的Discard服務器端示例分析了netty的組件,今天我們從另一個簡單的示例Echo客戶端分析一下上個示例中沒有出現的netty組件。
1. 服務端的連接處理,讀寫處理
echo客戶端代碼:
/** * Sends one message when a connection is open and echoes back any received * data to the server. Simply put, the echo client initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
從上面的代碼可以看出,discard的服務端代碼和echo的客戶端代碼基本相似,不同的是一個使用ServerBootStrap,另一個使用BootStrap而已。先看一下連接過程
NioEventLoop處理key的過程,
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
2.1 連接流程
調用AbstractNioByteChannel的finishConnect()方法
@Override
public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
觸發channelActive操作:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // trySuccess() will return false if a user cancelled the connection attempt. boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, // because what happened is what happened. if (!wasActive && isActive()) { pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); } }
2.2 讀操作流程
調用AbstractNioByteChannel的read()方法,
典型的autoRead流程如下:
1. 當socket建立連接時,Netty觸發一個inbound事件channelActive,然后提交一個read()請求給本身(參考DefaultChannelPipeline.fireChannelActive())
2. 接收到read()請求后,Netty從socket讀取消息。
3. 當讀取到消息時,Netty觸發channelRead()。
4. 當讀取不到消息后,Netty觸發ChannelReadCompleted().
5. Netty提交另外一個read()請求來繼續從socket中讀取消息。
@Override
public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; try { boolean needReadPendingReset = true; do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; break; } allocHandle.incMessagesRead(1); if (needReadPendingReset) { needReadPendingReset = false; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (allocHandle.lastBytesRead() < 0) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }
觸發讀操作
@Override
public ChannelHandlerContext fireChannelRead(Object msg) { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelRead(next, pipeline.touch(msg, next)); return this; }
讀完觸發完成事件
@Override
public ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); } return this; } @Override public ChannelHandlerContext fireChannelReadComplete() { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelReadComplete(next); return this; }
2.3 寫操作流程
寫操作
@SuppressWarnings("deprecation")
protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; } }
寫操作具體實現(以NioSocketChannel為例):
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // Only one ByteBuf so use non-gathering write ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } }
2. ChannelInboundHandler和ChannelInboundHandler
Echo的handler代碼如下:
/** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }
上面的代碼出現了兩個重要的netty組件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已經講到。我們這次重點分析一下 ChannelInboundHandlerAdapter及其相關類。
ChannelInboundHandlerAdapter繼承了ChannelInboundHandler,它的作用是將operation轉到ChannelPipeline中的下一個ChannelHandler。子類可以重寫一個方法的實現來改變。注意:在方法#channelRead(ChannelHandlerContext, Object)自動返回前,message不會釋放。若需要一個可以自動釋放接收消息的ChannelInboundHandler實現時,請考慮SimpleChannelInboundHandler。
ChannelOutboundHandlerAdapter繼承了ChannelOutboundHandler,它僅通過調用ChannelHandlerContext跳轉到每個方法。
ChannelInboundHandler處理輸入的事件,事件由外部事件源產生,例如從一個socket接收到數據。
ChannelOutboundHandler解析你自己應用提交的操作。
2.1 ChannelInboundHandler.channelActive()
從源碼角度看一下,Netty觸發一個inbound事件channelActive(以LoggingHandler為例):
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "ACTIVE")); } ctx.fireChannelActive(); }
觸發操作如下:
@Override public ChannelHandlerContext fireChannelActive() { AbstractChannelHandlerContext next = findContextInbound(); next.invoker().invokeChannelActive(next); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
invokeChannelActive方法實現:
@Override public void invokeChannelActive(final ChannelHandlerContext ctx) { if (executor.inEventLoop()) { invokeChannelActiveNow(ctx); } else { executor.execute(new OneTimeTask() { @Override public void run() { invokeChannelActiveNow(ctx); } }); } } public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) { try { ((ChannelInboundHandler) ctx.handler()).channelActive(ctx); } catch (Throwable t) { notifyHandlerException(ctx, t); } }
2.2 ChannelOutboundHandler.Read()
讀的流程:
@Override public ChannelHandlerContext read() { AbstractChannelHandlerContext next = findContextOutbound(); next.invoker().invokeRead(next); return this; }
查找outbound的過程:
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
觸發讀操作:
@Override public void invokeRead(final ChannelHandlerContext ctx) { if (executor.inEventLoop()) { invokeReadNow(ctx); } else { AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; Runnable task = dctx.invokeReadTask; if (task == null) { dctx.invokeReadTask = task = new Runnable() { @Override public void run() { invokeReadNow(ctx); } }; } executor.execute(task); } }
2.3 ChannelOutboundHandler.write()
以實現類LoggingHandler為例:
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "WRITE", msg)); } ctx.write(msg, promise); }
具體實現:
@Override public ChannelFuture write(Object msg, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise); return promise; }
寫操作的觸發
@Override public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(ctx, promise, true)) { // promise cancelled ReferenceCountUtil.release(msg); return; } if (executor.inEventLoop()) { invokeWriteNow(ctx, msg, promise); } else { safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg); } }
立刻觸發
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
小結:
Netty中,可以注冊多個handler。ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行,如下圖所示,按照注冊的先后順序對Handler進行排序,request進入Netty后的執行順序為:
參考文獻
【1】http://blog.csdn.net/u013252773/article/details/21195593
【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler