Netty writeAndFlush()方法分為兩步, 先 write 再 flush
@Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next; next = findContextOutbound(MASK_WRITE); ReferenceCountUtil.touch(msg, next); next.invoker.invokeWrite(next, msg, promise); next = findContextOutbound(MASK_FLUSH); next.invoker.invokeFlush(next); return promise; }
以上是DefaultChannelHandlerContext中的writeAndFlush方法, 可見實際上是先調用了write, 然后調用flush
1. write
write方法從TailHandler開始, 穿過中間自定義的各種handler以后到達HeadHandler, 然后調用了HeadHandler的成員變量Unsafe的write
如下
@Override public void write(Object msg, ChannelPromise promise) { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, promise); }
最終會把需要write的msg和promise(也就是一個future, 我們拿到手的future, 添加Listener的也是這個)放入到outboundBuffer中, msg和promise在outboundBuffer中的存在形式是一個自定義的結構體Entry.
也就是說調用write方法實際上並不是真的將消息寫出去, 而是將消息和此次操作的promise放入到了一個隊列中
2. flush
flush也是從Tail開始, 最后到Head, 最終調用的也是Head里的unsafe的flush0()方法, 然后flush0()里再調用doWrite()方法, 如下:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); break; } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); continue; } boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); // 這里才是實際將數據寫出去的地方if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { incompleteWrite(setOpWrite); break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i --) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (region.transfered() >= region.count()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); // 根據寫出的數據的數量情況, 來判斷操作是否完成, 如果完成則調用 in.remove() } else { incompleteWrite(setOpWrite); break; } } else { throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); } } }
紅字部分就是最后將數據寫出去的地方, 這里寫數據最終調用的是 GatheringByteChannel 的 write() 方法, 這是個原生Java接口, 具體實現依賴於實現這個接口的Java類, 例如會調用 NIO 的 SocketChannel 的write()方法, 至此, 實際寫數據的過程出現了, SocketChannel可以運行在non-blocking模式, 也就是非阻塞異步模式, write數據會馬上返回寫入的數據數量 (並不一定是所有數據都寫入成功, 對於是否寫入了所有數據, Netty有自己的處理邏輯, 也就是上面代碼中的紅字的那段for循環, 具體參看下SocketChannel的javadoc和netty源碼).
當所有數據寫入SocketChannel成功, 開始調用in.remove(), 這個 in 就是第一步 1. write 里的那個 outboundBuffer, 他的類型是 ChannelOutboundBuffer, 代碼如下:
public final boolean remove() { if (isEmpty()) { return false; } Entry e = buffer[flushed]; Object msg = e.msg; if (msg == null) { return false; } ChannelPromise promise = e.promise; int size = e.pendingSize; e.clear(); flushed = flushed + 1 & buffer.length - 1; if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. safeRelease(msg); safeSuccess(promise); // 這里, 調用了promise的trySuccess()方法, 觸發Listener decrementPendingOutboundBytes(size); } return true; }
最后會調用Promise的notifyListeners()操作, 觸發Listener完成整個異步流程
---------
最后, 回到我們應用netty的時候的代碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.writeAndFlush(new Object()).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { // do sth } else { // do sth } } }); }
這就是整個流程
最后提一下, Netty的AbstractNioChannel里封裝了selectionKey, 在accept socket的時候, socket會被注冊到eventLoop()的Selector, 這個selectionKey就會被賦值, 如下
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
在以后Selector的select()的時候, 則會通過這個key來獲取到channel, 然后調用 AbstractChannel 里的 DefaultChannelPipeline 來觸發 Handler 的 connect, read, write 等等事件...