首先我們知道,在NIO網絡編程模型中,IO操作直接和channel相關,比如客戶端的請求連接,或者向服務端發送數據, 服務端都要從客戶端的channel獲取這個數據
那么channelPipeline是什么?
其實,這個channelPepiline是Netty增加給原生的channel的組件,在ChannelPipeline
接口的上的注解闡述了channelPipeline
的作用,這個channelPipeline是高級過濾器的實現,netty將chanenl中數據導向channelPipeline,進而給了用戶對channel中數據的百分百的控制權, 此外,channelPipeline數據結構是雙向鏈表,每一個節點都是channelContext
,channelContext
里面維護了對應的handler和pipeline的引用, 大概總結一下: 通過chanelPipeline,用戶客戶輕松的往channel寫數據,從channel讀數據
創建pipeline
通過前面幾篇博客的追蹤,我們知道無論我們是通過反射創建出服務端的channel也好,還是直接new創建客戶端的channel也好,隨着父類構造函數的逐層調用,最終我們都會在Channel體系的頂級抽象類AbstractChannel
中,創建出Channel的一大組件 channelPipeline
於是我們程序的入口,AbstractChannel
的 pipeline = newChannelPipeline();
,跟進去,看到他的源碼如下:
protected DefaultChannelPipeline newChannelPipeline() {
// todo 跟進去
return new DefaultChannelPipeline(this);
}
可以看到,它創建了一個DefaultChannelPipeline(thisChannel)
DefaultChannelPipeline
是channelPipeline的默認實現,他有着舉足輕重的作用,我們看一下下面的 Channel
ChannelContext
ChannelPipeline
的繼承體系圖,我們可以看到圖中兩個類,其實都很重要,
他們之間有什么關系呢?
當我們看完了DefaultChannelPipeline()
構造中做了什么自然就知道了
// todo 來到這里
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// todo 把當前的Channel 保存起來
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
// todo 這兩方法很重要
// todo 設置尾
tail = new TailContext(this);
// todo 設置頭
head = new HeadContext(this);
// todo 雙向鏈表關聯
head.next = tail;
tail.prev = head;
}
主要做了如下幾件事:
- 初始化succeededFuture
- 初始化voidPromise
- 創建尾節點
- 創建頭節點
- 關聯頭尾節點
其實,到現在為止,pipiline的初始化已經完成了,我們接着往下看
此外,我們看一下DefaultChannelPipeline
的內部類和方法,如下圖()
我們關注我圈出來的幾部分
- 兩個重要的內部類
- 頭結點 HeaderContext
- 尾節點 TailContext
- PendingHandlerAddedTask 添加完handler之后處理的任務
- PendingHandlerCallBack 添加完handler的回調
- PengdingHandlerRemovedTask 移除Handler之后的任務
- 大量的addXXX方法,
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
跟進它的封裝方法:
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
// todo 來到這里
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
// todo 為ChannelContext的pipeline附上值了
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
如下圖是HeaderContext和TailContext的聲明截圖:
我們可以看到,這個tail節點是inbound類型的處理器,一開始確實很納悶,難道header不應該是Inbound類型的嗎?我也不買關子了,直接說為啥
是的,header確實是Inbound類型的處理器, 同時也是出站處理器 (評論區有個老哥說的也很清楚,可以瞅瞅)
因為,對netty來說用發送過來的數據,要就從header節點開始往后傳播,怎么傳播呢? 因為是雙向鏈表,直接找后一個節點,什么類型的節點呢? inbound類型的,於是數據msg就從header之后的第一個結點往后傳播,如果說,一直到最后,都只是傳播數據而沒有任何處理就會傳播到tail節點,因為tail也是inbound類型的, tail節點會替我們釋放掉這個msg,防止內存泄露,當然如果我們自己使用了msg,而沒往后傳播,也沒有釋放,內存泄露是早晚的時,這就是為啥tail是Inbound類型的, header節點和它相反,在下面說
ok,現在知道了ChannelPipeline的創建了吧
Channelpipeline與ChannelHandler和ChannelHandlerContext之間的關系
它三者的關系也直接說了, 在上面pipeline
的創建的過程中, DefaultChannelPipeline
中的頭尾節點都是ChannelHandlerContext
, 這就意味着, 在pipeline雙向鏈表的結構中,每一個節點都是一個ChannelHandlerContext
, 而且每一個 ChannelHandlerContext
維護一個handler
,這一點不信可以看上圖,ChannelHandlerContext
的實現類DefaultChannelHandlerContext
的實現類, 源碼如下:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// todo Context里面有了 handler的引用
private final ChannelHandler handler;
// todo 創建默認的 ChannelHandlerContext,
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
ChannelHandlerContext
接口同時繼承ChannelOutBoundInvoker
和和ChannelInBoundInvoker
使得他同時擁有了傳播入站事件和出站事件的能力, ChannelHandlerContext
把事件傳播之后,是誰處理的呢? 當然是handler
下面給出ChannelHandler
的繼承體系圖,可以看到針對入站出來和出站處理ChannelHandler
有不同的繼承分支應對
添加一個新的節點:
一般我們都是通過ChanelInitialezer
動態的一次性添加多個handler, 下面就去看看,在服務端啟動過程中,ServerBootStrap
的init()
,如下源碼:解析我寫在代碼下面
// todo 這是ServerBootStrapt對 他父類初始化 channel的實現, 用於初始化 NioServerSocketChannel
@Override
void init(Channel channel) throws Exception {
// todo ChannelOption 是在配置 Channel 的 ChannelConfig 的信息
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
// todo 把 NioserverSocketChannel 和 options Map傳遞進去, 給Channel里面的屬性賦值
// todo 這些常量值全是關於和諸如TCP協議相關的信息
setChannelOptions(channel, options, logger);
}
// todo 再次一波 給Channel里面的屬性賦值 attrs0()是獲取到用戶自定義的業務邏輯屬性 -- AttributeKey
final Map<AttributeKey<?>, Object> attrs = attrs0();
// todo 這個map中維護的是 程序運行時的 動態的 業務數據 , 可以實現讓業務數據隨着netty的運行原來存進去的數據還能取出來
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// todo------- options attrs : 都可以在創建BootStrap時動態的傳遞進去
// todo ChannelPipeline 本身 就是一個重要的組件, 他里面是一個一個的處理器, 說他是高級過濾器,交互的數據 會一層一層經過它
// todo 下面直接就調用了 p , 說明,在channel調用pipeline方法之前, pipeline已經被創建出來了!,
// todo 到底是什么時候創建出來的 ? 其實是在創建NioServerSocketChannel這個通道對象時,在他的頂級抽象父類(AbstractChannel)中創建了一個默認的pipeline對象
/// todo 補充: ChannelHandlerContext 是 ChannelHandler和Pipeline 交互的橋梁
ChannelPipeline p = channel.pipeline();
// todo workerGroup 處理IO線程
final EventLoopGroup currentChildGroup = childGroup;
// todo 我們自己添加的 Initializer
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// todo 這里是我們在Server類中添加的一些針對新連接channel的屬性設置, 這兩者屬性被acceptor使用到!!!
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// todo 默認 往NioServerSocketChannel的管道里面添加了一個 ChannelInitializer ,
// todo ( 后來我們自己添加的ChildHandler 就繼承了的這個ChannelInitializer , 而這個就繼承了的這個ChannelInitializer 實現了ChannelHandler)
p.addLast(new ChannelInitializer<Channel>() { // todo 進入addlast
// todo 這個ChannelInitializer 方便我們一次性往pipeline中添加多個處理器
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// todo 獲取bootStrap的handler 對象, 沒有返回空
// todo 這個handler 針對bossgroup的Channel , 給他添加上我們在server類中添加的handler()里面添加處理器
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// todo ServerBootstrapAcceptor 接收器, 是一個特殊的chanelHandler
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// todo !!! -- 這個很重要,在ServerBootStrap里面,netty已經為我們生成了接收器 --!!!
// todo 專門處理新連接的接入, 把新連接的channel綁定在 workerGroup中的某一條線程上
// todo 用於處理用戶的請求, 但是還有沒搞明白它是怎么觸發執行的
pipeline.addLast(new ServerBootstrapAcceptor(
// todo 這些參數是用戶自定義的參數
// todo NioServerSocketChannel, worker線程組 處理器 關系的事件
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
這個函數真的是好長,但是我們的重點放在ChannelInitializer
身上, 現在的階段, 當前的channel還沒有注冊上EventLoop上的Selector中
還有不是分析怎么添加handler? 怎么來這里了? 其實下面的 ServerBootstrapAcceptor就是一個handler
我們看一下上面的代碼做了啥
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// todo !!! -- 這個很重要,在ServerBootStrap里面,netty已經為我們生成了接收器 --!!!
// todo 專門處理新連接的接入, 把新連接的channel綁定在 workerGroup中的某一條線程上
// todo 用於處理用戶的請求, 但是還有沒搞明白它是怎么觸發執行的
pipeline.addLast(new ServerBootstrapAcceptor(
// todo 這些參數是用戶自定義的參數
// todo NioServerSocketChannel, worker線程組 處理器 關系的事件
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
懵逼不? 當時真的是給我整蒙圈了, 還沒有關聯上 EventLoop呢!!! 哪來的ch.eventLoop()....
后來整明白了,這其實是一個回調,netty提供給用戶在任意時刻都可以往pipeline中添加handler的實現手段
那么在哪里回調呢? 其實是在 jdk原生的channel注冊進EventLoop中的Selector后緊接着回調的,源碼如下
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// todo 進入這個方法doRegister()
// todo 它把系統創建的ServerSocketChannel 注冊進了選擇器
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// todo 確保在 notify the promise前調用 handlerAdded(...)
// todo 這是必需的,因為用戶可能已經通過ChannelFutureListener中的管道觸發了事件。
// todo 如果需要的話,執行HandlerAdded()方法
// todo 正是這個方法, 回調了前面我們添加 Initializer 中添加 Accpter的重要方法
pipeline.invokeHandlerAddedIfNeeded();
回調函數在 pipeline.invokeHandlerAddedIfNeeded();
, 看它的命名, 如果需要的話,執行handler已經添加完成了操作 哈哈,我們現在當然需要,剛添加了個ServerBootstrapAcceptor
在跟進入看源碼之間,注意,方法是pipeline調用的, 哪個pipeline呢? 就是上面我們說的DefaultChannelPipeline
, ok,跟進源碼,進入 DefaultChannelPipeline
// todo 執行handler的添加,如果 需要的話
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// todo 現在我們的channel已經注冊在bossGroup中的eventLoop上了, 是時候回調執行那些在注冊前添加的 handler了
callHandlerAddedForAllHandlers();
}
}
調用本類方法 callHandlerAddedForAllHandlers();
繼續跟進下
// todo 回調原來在沒有注冊完成之前添加的handler
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
我們它的動作task.execute();
其中的task是誰? pendingHandlerCallbackHead
這是DefaultChannelPipeline
的內部類, 它的作用就是輔助完成 添加handler之后的回調, 源碼如下:
private abstract static class PendingHandlerCallback implements Runnable {
final AbstractChannelHandlerContext ctx;
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute();
}
我們跟進上一步的task.execute()
就會看到它的抽象方法,那么是誰實現的呢? 實現類是PendingHandlerAddedTask
同樣是DefaultChannelPipeline
的內部類, 既然不是抽象類了, 就得同時實現他父類PendingHandlerCallback
的抽象方法,其實有兩個一是個excute()
另一個是run()
--Runable
我們進入看它是如何實現excute
,源碼如下:
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
HandlerAdded()
的回調時機
我們往下追蹤, 調用類本類方法callHandlerAdded0(ctx);
源碼如下:
// todo 重點看看這個方法 , 入參是剛才添加的 Context
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// todo 在channel關聯上handler之后並且把Context添加到了 Pipeline之后進行調用!!!
ctx.handler().handlerAdded(ctx); // todo 他是諸多的回調方法中第一個被調用的
ctx.setAddComplete(); // todo 修改狀態
}
...
繼續往下追蹤
- ctx.handler() -- 獲取到了當前的channel
- 調用channel的
.handlerAdded(ctx);
這個handlerAdded()
是定義在ChannelHandler中的回調方法, 什么時候回調呢? 當handler添加后回調, 因為我們知道,當服務端的channel在啟動時,會通過 channelInitializer 添加那個ServerBootstrapAcceptor
,所以ServerBootstrapAcceptor
的handlerAdded()
的回調時機就在上面代碼中的ctx.handler().handlerAdded(ctx);
如果直接點擊去這個函數,肯定就是ChannelHandler
接口中去; 那么 新的問題來了,誰是實現類? 答案是抽象類 ChannelInitializer`` 就在上面我們添加ServerBootstrapAcceptor
就創建了一個ChannelInitializer
的匿名對象
它的繼承體系圖如下:
介紹這個ChannelInitializer
他是Netty提供的輔助類,用於提供針對channel的初始化工作,什么工作呢? 批量初始化channel
這個中有三個重要方法,如下
- 重寫的channel的
handlerAdded()
, 這其實也是handlerAdded()
的回調的體現 - 自己的
initChannel()
- 自己的
remove()
繼續跟進我們上面的handlerAdded(ChannelHandlerContext ctx)
源碼如下:
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
initChannel(ctx); // todo 這個方法在上面, 進入 可以在 finally中 找到移除Initializer的邏輯
}
}
調用本類的 initChannel(ctx);
源碼如下:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// todo remove(ctx); 刪除 ChannelInitializer
remove(ctx);
}
return true;
}
return false;
}
兩個點
- 第一: 繼續調用本類的抽象方法
initChannel((C) ctx.channel());
- 第二: 移除了
remove(ctx);
分開進行第一步
initChannel((C) ctx.channel());
初始化channel,這個函數被設計成了抽象的, 問題來了, 實現類是誰? 實現類其實剛才說了,就是netty在添加ServerBootStrapAcceptor
時創建的那個匿名內部類,我們跟進去看他的實現: 源碼如下:
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// todo 獲取bootStrap的handler 對象, 沒有返回空
// todo 這個handler 針對bossgroup的Channel , 給他添加上我們在server類中添加的handler()里面添加處理器
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// todo ServerBootstrapAcceptor 接收器, 是一個特殊的chanelHandler
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// todo !!! -- 這個很重要,在ServerBootStrap里面,netty已經為我們生成了接收器 --!!!
// todo 專門處理新連接的接入, 把新連接的channel綁定在 workerGroup中的某一條線程上
// todo 用於處理用戶的請求, 但是還有沒搞明白它是怎么觸發執行的
pipeline.addLast(new ServerBootstrapAcceptor(
// todo 這些參數是用戶自定義的參數
// todo NioServerSocketChannel, worker線程組 處理器 關系的事件
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
實際上就是完成了一次方法的回調,成功添加了ServerBootstrapAcceptor
處理器
刪除一個節點
回來看第二步
remove(ctx);
刪除一個節點, 把Initializer
刪除了? 是的, 把這個初始化器刪除了, 為啥要把它刪除呢, 說了好多次, 其實他是一個輔助類, 目的就是通過他往channel中一次性添加多個handler, 現在handler也添加完成了, 留着他也沒啥用,直接移除了
我們接着看它的源碼
// todo 刪除當前ctx 節點
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
從pipeline中移除, 一路看過去,就會發現底層刪除鏈表節點的操作
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
inbound事件的傳播
什么是inbound事件
inbound事件其實就是客戶端主動發起事件,比如說客戶端請求連接,連接后客戶端有主動的給服務端發送需要處理的有效數據等,只要是客戶端主動發起的事件,都算是Inbound事件,特征就是事件觸發類型,當channel處於某個節點,觸發服務端傳播哪些動作
netty如何對待inbound
netty為了更好的處理channel中的數據,給jdk原生的channel添加了pipeline組件,netty會把原生jdk的channel中的數據導向這個pipeline,從pipeline中的header開始 往下傳播, 用戶對這個過程擁有百分百的控制權,可以把數據拿出來處理, 也可以往下傳播,一直傳播到tail節點,tail節點會進行回收,如果在傳播的過程中,最終沒到尾節點,自己也沒回收,就會面臨內存泄露的問題
一句話總結,面對Inbound的數據, 被動傳播
netty知道客戶端發送過來的數據是啥類型嗎?
比如一個聊天程序,客戶端可能發送的是心跳包,也可能發送的是聊天的內容,netty又不是人,他是不知道數據是啥的,他只知道來了數據,需要進一步處理,怎么處理呢? 把數據導向用戶指定的handler鏈條
開始讀源碼
這里書接上一篇博客的尾部,事件的傳播
重點步驟如下
第一步: 等待服務端啟動完成
第二步: 使用telnet模擬發送請求 --- > 新連接的接入邏輯
第三步: register0(ChannelPromise promise)
方法中會傳播channel激活事件 --> 目的是二次注冊端口,
第三個也是我們程序的入手點: fireChannelActive()
源碼如下:
@Override
public final ChannelPipeline fireChannelActive() {
// todo ChannelActive從head開始傳播
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
調用了AbstractChannelHandlerContext
的invokeChannelActive
方法
在這里,我覺得特別有必須有必要告訴自己
AbstractChannelHandlerContext
的重要性,現在的DefaultChannelPipeline
中的每一個節點,包括header,tail,以及我們自己的添加的,都是AbstractChannelHandlerContext
類型,事件的傳播圍繞着AbstractChannelHandlerContext
的方法開始,溫習它的繼承體系如下圖
接着回到AbstractChannelHandlerContext.invokeChannelActive(head);
, 很顯然,這是個靜態方法, 跟進去,源碼如下:
// todo 來這
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
...
}
- 第一點: inbound類型的事件是從header開始傳播的 , next --> HeaderContext
- 第二點: HeaderContext其實就是
AbstractChannelHandlerContext
類型的,所以invokeChannelActive()
其實是當前類的方法
ok,跟進入看看他干啥了,源碼:
// todo 使Channel活躍
private void invokeChannelActive() {
// todo 繼續進去
((ChannelInboundHandler) handler()).channelActive(this);
}
我們看, 上面的代碼做了如下幾件事
- handler() -- 返回當前的 handler, 就是從HandlerContext中拿出handler
- 強轉成
ChannelInboundHandler
類型的,因為他是InBound類型的處理器
如果我們用鼠標點擊channelActive(this)
, 毫無疑問會進入ChannelInboundHandler
,看到的是抽象方法
那么問題來了, 誰實現的它?
其實是headerContext
頭結點做的, 之前說過,Inbound事件,是從header開始傳播的,繼續跟進去, 看源碼:
// todo 來到這里, 分兩步, 1. 把ChannelActive事件繼續往下傳播, 傳播結束之后,做第二件事
// todo 2. readIfIsAutoRead();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// todo fireChannelActive是在做了實際的端口綁定之后才觸發回調
ctx.fireChannelActive();
// todo 默認方式注冊一個read事件
// todo 跟進去, readIfIsAutoRead 作用是 把已經注冊進selector上的事件, 重新注冊綁定上在初始化NioServerSocketChannel時添加的Accept事件
// todo 目的是 新連接到來時, selector輪詢到accept事件, 使得netty可以進一步的處理這個事件
readIfIsAutoRead();
}
其實這里有兩種重要的事情 , 上面我們也看到了:
- 向下傳播
channelActive()
目的是讓header后面的用戶添加的handler中的channelActive()
被回調 readIfIsAutoRead();
就是去注冊Netty能看懂的感興趣的事件
下面我們看它的事件往下傳播, 於是重新回到了AbstractChannelHandlerContext
, 源碼如下:
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}
findContextInbound()
找出下一個Inbound類型的處理器, 我們去看他的實現,源碼如下:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
是不是明明白白的? 從當前節點開始,往后變量整個鏈表, 下一個節點是誰呢? 在新鏈接接入的邏輯中,調用的ChannelInitializer
我手動 批量添加了三個InboundHandler,按照我添加的順序,他們會依次被找到
繼續跟進本類方法 invokeChannelActive(findContextInbound())
,源碼如下
// todo 來這
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
...
一開始的next--> HeaderContext
現在的 next就是header之后,我手動添加的Inbound的handler
同樣是調用本類方法invokeChannelActive()
,源碼如下:
// todo 使Channel活躍
private void invokeChannelActive() {
// todo 繼續進去
((ChannelInboundHandler) handler()).channelActive(this);
再次看到,回調, 我添加的handler.channelActive(this);
,進入查看
public class MyServerHandlerA extends ChannelInboundHandlerAdapter {
// todo 當服務端的channel綁定上端口之后,就是 傳播 channelActive 事件
// todo 事件傳播到下面后,我們手動傳播一個 channelRead事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello MyServerHandlerA");
}
在我處理器中,繼續往下傳播手動添加的數據"hello MyServerHandlerA"
同樣她會按找上面的順序依次傳播下去
最終她會來到tail , 在tail做了如下的工作, 源碼如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// todo channelRead
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
為什么Tail節點是 Inbound 類型的處理器?
上一步就說明了為什么Tail為什么設計成Inbound, channel中的數據,無論服務端有沒有使用,最終都要被釋放掉,tail可以做到收尾的工作, 清理內存
outbound事件的傳播
什么是outBound事件
創建的outbound事件如: connect,disconnect,bind,write,flush,read,close,register,deregister, outbound類型事件更多的是服務端主動發起的事件,如給主動channel綁定上端口,主動往channel寫數據,主動關閉用戶的的連接
開始讀源碼
最典型的outbound事件,就是服務端往客戶端寫數據,准備測試用例如下:
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println( "hello OutBoundHandlerB");
ctx.write(ctx, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(()->{
// todo 模擬給 客戶端一個響應
ctx.channel().write("Hello World");
// 寫法二 : ctx.write("Hello World");
},3, TimeUnit.SECONDS);
}
}
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
// todo 當服務端的channel綁定上端口之后,就是 傳播 channelActive 事件
// todo 事件傳播到下面后,我們手動傳播一個 channelRead事件
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println( "hello OutBoundHandlerA");
ctx.write(ctx, promise);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println( "hello OutBoundHandlerC");
ctx.write(ctx, promise);
}
}
下面我們把斷點調試,把斷點打在OutBoundHandlerB
的handlerAdded
上, 模擬向客戶端發送數據, 啟動程序,大概的流程如下
- 等待服務端的啟動
- 服務端Selector輪詢服務端channel可能發生的感興趣的事件
- 使用telnet向服務端發送請求
- 服務端創建客戶端的channel,在給客戶端的原生的chanenl注冊到 Selector上
- 通過
invokeChannelAddedIfNeeded()
將我們添加在Initializer中的handler添加到pipeline中- 挨個回調這些handler中的
channelAdded()
方法- 和我們添加進去的順序相反
- C --> B --->A
- 這些childHandler,會添加在每一條客戶端的channel的pipeline
- 挨個回調這些handler中的
- 傳播channel注冊完成事件
- 傳播channelActive事件
- readIfAutoRead() 完成二次注冊netty可以處理的感興趣的事件
此外,我們上面的write以定時任務的形式提交,當用ctx中的唯一的線程執行器三秒后去執行任務,所以程序會繼續下去綁定端口, 過了三秒后把定時任務聚合到普通任務隊列中,那時才會執行我們OutBoundHandlerB
中的 ctx.channel().write("Hello World");
outBound類型的handler添加順序和執行順序有什么關系
因為Outbound類型的事件是從鏈表的tail開始傳播的,所以執行的順序和我們的添加進去的順序相反
篇幅太長了,重寫補一張圖
從ctx.channel().write("Hello World");
開始跟源碼, 鼠標直接跟進去,進入的是ChannelOutboundInvoker
, 往channel中寫,我們進入DefaultChannelPipeline
的實現,源碼如下
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
再一次的驗證了,出站的事件是從尾部往前傳遞的, 我們知道,tail節點是DefaultChannelHandlerContext
類型的,所以我們看它的write()
方法是如何實現的
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
其中msg-->我們要寫會客戶端的內容, newPromise()默認的promise()
,繼續跟進本類方法write(msg, newPromise())
,源碼如下:
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
上面做了很多判斷,其中我們只關心write(msg, false, promise);
源碼如下:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
我們可以看到,重要的邏輯findContextOutbound();
它的源碼如下, 從尾節點開始遍歷鏈表,找到前一個outbound類型的handler
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
找到后,因為我們使用函數是write
而不是writeAndFlush
所以進入上面的else代碼塊invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
繼續跟進invokeWrite0(msg, promise);
終於看到了handler的write邏輯
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
其中:
- (ChannelOutboundHandler) handler() -- 是tail前面的節點
- 調用當前節點的write函數
實際上就是回調我們自己的添加的handler的write函數,我們跟進去,源碼如下:
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println( "hello OutBoundHandlerC");
ctx.write(msg, promise);
}
}
我們繼續調用write, 按照相同的邏輯,msg會繼續往前傳遞
一直傳遞到HeadContext節點, 因為這個節點也是Outbound類型的, 這就是Outbound事件的傳播,我們直接看HeaderContext是如何收尾的, 源碼如下:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
Header使用了unsafe類,這沒毛病,和數據讀寫有關的操作,最終都離不開unsafe
為什么Header節點是outBound類型的處理器?
拿上面的write事件來說,msg經過這么多handler的加工,最終的目的是傳遞到客戶端,所以netty把header設計為outBound類型的節點,由他完成往客戶端的寫
context.write()與context.channel().write()的區別
context.write()
,會從當前的節點開始往前傳播context.channel().write()
從尾節點開始依次往前傳播
異常的傳播
netty中如果發生了異常的話,異常事件的傳播和當前的節點是 入站和出站處理器是沒關系的,一直往下一個節點傳播,如果一直沒有handler處理異常,最終由tail節點處理
最佳的異常處理解決方法
既然異常的傳播和入站和出站類型的處理器沒關系,那么我們就在pipeline的最后,也就是tail之前,添加我們的統一異常處理器就好了, 就像下面:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// todo 異常處理的最佳實踐, 最pipeline的最后添加異常處理handler
channelPipeline.addLast(new myExceptionCaughtHandler());
}
}
public class myExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
// 最終全部的異常都會來到這里
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof 自定義異常1){
}else if(cause instanceof 自定義異常2){
}
// todo 下面不要往下傳播了
// super.exceptionCaught(ctx, cause);
}
}
SimpleChannelInboundHandler
的特點
通過前面的分析,我們知道如果客戶端的msg一味的往后傳播,最終會傳播到tail節點,由tail節點處理釋放,從而避免了內存的泄露
如果我們的handler使用了msg之后沒有往后傳遞就要倒霉了,時間久了就會出現內存泄露的問題
netty人性化的為我們提供的指定泛型的 SimpleChannelInboundHandler<T>
,可以為我們自動的釋放內存,我們看他是如何做到的
/ todo 直接繼承於ChanelInboundHandlerAdapter的實現 抽象類
// todo 我們自己的處理器, 同樣可以繼承SimpleChannelInboundHandler適配器,達到相同的效果
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler() {
this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
// todo channelRead 完全被改寫了
// todo 這其實又是一種設計模式 , 模板方法設計模式
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
// todo 把消息進行了強轉
I imsg = (I) msg;
// todo channelRead0()在他的父類中是抽象的,因此我們自己寫handler時,需要重寫它的這個抽象的 方法 , 在下面
// todo 這其實又是一種設計模式 , 模板方法設計模式
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {// todo 對msg的計數減一, 表示對消息的引用減一. 也就意味着我們不要在任何
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
}
- 它本身是抽象類,抽象方法是
channelRead0
,意味着我們需要重寫這個方法 - 他繼承了
ChannelInboundHandlerAdapter
這是個適配器類,使他可以僅實現部分自己需要的方法就ok
我們看它實現的channelRead
, 模板方法設計模式 主要做了如下三件事
- 將msg 強轉成特定的泛型類型的數據
- 將ctx和msg傳遞給自己的chanenlRead0使用msg和ctx(ctx,msg)
- chanenlRead0使用msg和ctx
- 在finally代碼塊中,將msg釋放