在上一篇文章中,我們已經了解了pipeline在netty中所處的角色,像是一條流水線,控制着字節流的讀寫,本文,我們在這個基礎上繼續深挖pipeline在事件傳播
Unsafe
顧名思義,unsafe是不安全的意思,就是告訴你不要在應用程序里面直接使用Unsafe以及他的衍生類對象。
netty官方的解釋如下
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
Unsafe 在Channel定義,屬於Channel的內部類,表明Unsafe和Channel密切相關
下面是unsafe接口的所有方法
interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHandle(); SocketAddress localAddress(); SocketAddress remoteAddress(); void register(EventLoop eventLoop, ChannelPromise promise); void bind(SocketAddress localAddress, ChannelPromise promise); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); void disconnect(ChannelPromise promise); void close(ChannelPromise promise); void closeForcibly(); void beginRead(); void write(Object msg, ChannelPromise promise); void flush(); ChannelPromise voidPromise(); ChannelOutboundBuffer outboundBuffer(); }
按功能可以分為分配內存,Socket四元組信息,注冊事件循環,綁定網卡端口,Socket的連接和關閉,Socket的讀寫,看的出來,這些操作都是和jdk底層相關
Unsafe 繼承結構
NioUnsafe
在 Unsafe
基礎上增加了以下幾個接口
public interface NioUnsafe extends Unsafe { SelectableChannel ch(); void finishConnect(); void read(); void forceFlush(); }
從增加的接口以及類名上來看,NioUnsafe
增加了可以訪問底層jdk的SelectableChannel
的功能,定義了從SelectableChannel
讀取數據的read
方法
Unsafe的分類
從以上繼承結構來看,我們可以總結出兩種類型的Unsafe分類,一個是與連接的字節數據讀寫相關的NioByteUnsafe
,一個是與新連接建立操作相關的NioMessageUnsafe
NioByteUnsafe
中的讀:委托到外部類NioSocketChannel
protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
最后一行已經與jdk底層以及netty中的ByteBuf相關,將jdk的 SelectableChannel
的字節數據讀取到netty的ByteBuf
中
NioMessageUnsafe
中的讀:委托到外部類NioSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; }
NioMessageUnsafe
的讀操作很簡單,就是調用jdk的accept()
方法,新建立一條連接
NioByteUnsafe
中的寫:委托到外部類NioSocketChannel
@Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
最后一行已經與jdk底層以及netty中的ByteBuf相關,將netty的ByteBuf
中的字節數據寫到jdk的 SelectableChannel
中
pipeline中的head
NioEventLoop
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //新連接的已准備接入或者已存在的連接有數據可讀 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }
NioByteUnsafe
@Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); // 創建ByteBuf分配器 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; do { // 分配一個ByteBuf byteBuf = allocHandle.allocate(allocator); // 將數據讀取到分配的ByteBuf中去 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } // 觸發事件,將會引發pipeline的讀事件傳播 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); pipeline.fireChannelReadComplete(); }
同樣,我抽出了核心代碼,細枝末節先剪去,NioByteUnsafe
要做的事情可以簡單地分為以下幾個步驟
- 拿到Channel的config之后拿到ByteBuf分配器,用分配器來分配一個ByteBuf,ByteBuf是netty里面的字節數據載體,后面讀取的數據都讀到這個對象里面
- 將Channel中的數據讀取到ByteBuf
- 數據讀完之后,調用
pipeline.fireChannelRead(byteBuf);
從head節點開始傳播至整個pipeline - 最后調用fireChannelReadComplete();
這里,我們的重點其實就是 pipeline.fireChannelRead(byteBuf);
DefaultChannelPipeline
final AbstractChannelHandlerContext head; //... head = new HeadContext(this); public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; }
結合這幅圖
可以看到,數據從head節點開始流入,在進行下一步之前,我們先把head節點的功能過一遍
HeadContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
從head節點繼承的兩個接口看,TA既是一個ChannelHandlerContext,同時又屬於inBound和outBound Handler
在傳播讀寫事件的時候,head的功能只是簡單地將事件傳播下去,如ctx.fireChannelRead(msg);
在真正執行讀寫操作的時候,例如在調用writeAndFlush()
等方法的時候,最終都會委托到unsafe執行,而當一次數據讀完,channelReadComplete
方法會被調用
pipeline中的inBound事件傳播
我們接着上面的 AbstractChannelHandlerContext.invokeChannelRead(head, msg); 這個靜態方法看,參數傳入了 head,我們知道入站數據都是從 head 開始的,以保證后面所有的 handler 都由機會處理數據流。
我們看看這個靜態方法內部是怎么樣的:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
調用這個 Context (也就是 head) 的 invokeChannelRead 方法,並傳入數據。我們再看看head中 invokeChannelRead 方法的實現,實際上是在headContext的父類AbstractChannelHandlerContext中:
AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } public ChannelHandler handler() { return this; }
上面 handler()
就是
headContext中的handler,也就是headContext自身,也就是調用 head 的 channelRead 方法。那么這個方法是怎么實現的呢?
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
什么都沒做,調用 Context 的 fire 系列方法,將請求轉發給下一個節點。我們這里是 fireChannelRead 方法,注意,這里方法名字都挺像的。需要細心區分。下面我們看看 Context 的成員方法 fireChannelRead:
AbstractChannelHandlerContext
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; }
這個是 head 的抽象父類 AbstractChannelHandlerContext 的實現,該方法再次調用了靜態 fire 系列方法,但和上次不同的是,不再放入 head 參數了,而是使用 findContextInbound 方法的返回值。從這個方法的名字可以看出,是找到入站類型的 handler。我們看看方法實現:
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
該方法很簡單,找到當前 Context 的 next 節點(inbound 類型的)並返回。這樣就能將請求傳遞給后面的 inbound handler 了。我們來看看 invokeChannelRead(findContextInbound(), msg);
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
上面我們找到了next節點(inbound類型的),然后直接調用 next.invokeChannelRead(m);如果這個next是我們自定義的handler,此時我們自定義的handler的父類是AbstractChannelHandlerContext,則又回到了AbstractChannelHandlerContext中實現的invokeChannelRead,代碼如下:
AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } public ChannelHandler handler() { return this; }
此時的handler()就是我們自定義的handler了,然后調用我們自定義handler中的 channelRead(this, msg);
請求進來時,pipeline 會從 head 節點開始輸送,通過配合 invoker 接口的 fire 系列方法,實現 Context 鏈在 pipeline 中的完美傳遞。最終到達我們自定義的 handler。
注意:此時如果我們想繼續向后傳遞該怎么辦呢?我們前面說過,可以調用 Context 的 fire 系列方法,就像 head 的 channelRead 方法一樣,調用 fire 系列方法,直接向后傳遞就 ok 了。
如果所有的handler都調用了fire系列方法,則會傳遞到最后一個inbound類型的handler,也就是——tail節點,那我們就來看看tail節點
pipeline中的tail
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public ChannelHandler handler() { return this; } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // This may not be a configuration error and so don't log anything. // The event may be superfluous for the current pipeline configuration. ReferenceCountUtil.release(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } }
正如我們前面所提到的,tail節點的大部分作用即終止事件的傳播(方法體為空)
channelRead
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
tail節點在發現字節數據(ByteBuf)或者decoder之后的業務對象在pipeline流轉過程中沒有被消費,落到tail節點,tail節點就會給你發出一個警告,告訴你,我已經將你未處理的數據給丟掉了
總結一下,tail節點的作用就是結束事件傳播,並且對一些重要的事件做一些善意提醒
pipeline中的outBound事件傳播
上一節中,我們在闡述tail節點的功能時,忽略了其父類AbstractChannelHandlerContext
所具有的功能,這一節中,我們以最常見的writeAndFlush操作來看下pipeline中的outBound事件是如何向外傳播的
典型的消息推送系統中,會有類似下面的一段代碼
Channel channel = getChannel(userInfo);
channel.writeAndFlush(pushInfo);
這段代碼的含義就是根據用戶信息拿到對應的Channel,然后給用戶推送消息,跟進 channel.writeAndFlush
NioSocketChannel
public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
從pipeline開始往外傳播
public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); }
Channel 中大部分outBound事件都是從tail開始往外傳播, writeAndFlush()
方法是tail繼承而來的方法,我們跟進去
AbstractChannelHandlerContext
public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; }
AbstractChannelHandlerContext
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
先調用findContextOutbound()
方法找到下一個outBound()
節點
AbstractChannelHandlerContext
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
找outBound節點的過程和找inBound節點類似,反方向遍歷pipeline中的雙向鏈表,直到第一個outBound節點next
,然后調用next.invokeWriteAndFlush(m, promise)
AbstractChannelHandlerContext
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
調用該節點的ChannelHandler的write方法,flush方法我們暫且忽略,后面會專門講writeAndFlush的完整流程
AbstractChannelHandlerContext
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
可以看到,數據開始出站,從后向前開始流動,和入站的方向是反的。那么最后會走到哪里呢,當然是走到 head 節點,因為 head 節點就是 outbound 類型的 handler。
HeadContext
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
調用了 底層的 unsafe 操作數據,這里,加深了我們對head節點的理解,即所有的數據寫出都會經過head節點
當執行完這個 write 方法后,方法開始退棧。逐步退到 unsafe 的 read 方法,回到最初開始的地方,然后繼續調用 pipeline.fireChannelReadComplete() 方法
總結
總結一下一個請求在 pipeline 中的流轉過程:
- 調用 pipeline 的 fire 系列方法,這些方法是接口 invoker 設計的,pipeline 實現了 invoker 的所有方法,inbound 事件從 head 開始流入,outbound 事件從 tail 開始流出。
- pipeline 會將請求交給 Context,然后 Context 通過抽象父類 AbstractChannelHandlerContext 的 invoke 系列方法(靜態和非靜態的)配合 AbstractChannelHandlerContext 的 fire 系列方法再配合 findContextInbound 和 findContextOutbound 方法完成各個 Context 的數據流轉。
- 當入站過程中,調用 了出站的方法,那么請求就不會向后走了。后面的處理器將不會有任何作用。想繼續相會傳遞就調用 Context 的 fire 系列方法,讓 Netty 在內部幫你傳遞數據到下一個節點。如果你想在整個通道傳遞,就在 handler 中調用 channel 或者 pipeline 的對應方法,這兩個方法會將數據從頭到尾或者從尾到頭的流轉一遍。