Netty之Channel*
本文內容主要參考**<<Netty In Action>> ** 和Netty的文檔和源碼,偏筆記向.
先簡略了解一下ChannelPipeline和ChannelHandler.
想象一個流水線車間.當組件從流水線頭部進入,穿越流水線,流水線上的工人按順序對組件進行加工,到達流水線尾部時商品組裝完成.
可以將ChannelPipeline當做流水線,ChannelHandler當做流水線工人.源頭的組件當做event,如read,write等等.
1.1 Channel
Channel連接了網絡套接字或能夠進行I/O操作的組件,如 read, write, connect, bind.
我們可以通過Channel獲取一些信息.
Channel的當前狀態(如,是否連接,是否打開)Channel的配置參數,如buffer的size- 支持的I/O操作
- 處理所有I/O事件的
ChannelPipeline和與通道相關的請求
Channel接口定義了一組和ChannelInboundHandler API密切相關的狀態模型.

當
Channel的狀態改變,會生成對應的event.這些event會轉發給ChannelPipeline中的ChannelHandler,handler會對其進行響應.

1.2 ChannelHandler生命周期
下面列出了 interface ChannelHandler 定義的生命周期操作, 在 ChannelHandler被添加到 ChannelPipeline 中或者被從 ChannelPipeline 中移除時會調用這些操作。這些方法中的每一個都接受一個 ChannelHandlerContext 參數

1.3 ChannelInboundHandler 接口
ChannelInboundHandler處理入站數據以及各種狀態變化,當Channel狀態發生改變會調用ChannelInboundHandler中的一些生命周期方法.這些方法與Channel的生命密切相關.
入站數據,就是進入socket的數據.下面展示一些該接口的生命周期API

當某個
ChannelInboundHandler的實現重寫channelRead()方法時,它將負責顯式地
釋放與池化的 ByteBuf 實例相關的內存。 Netty 為此提供了一個實用方法ReferenceCountUtil.release().
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler ,重寫channelRead0()方法,就可以在調用過程中會自動釋放資源.
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// 不用調用ReferenceCountUtil.release(msg)也會釋放資源
}
}
原理就是這樣,channelRead方法包裝了channelRead0方法.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
1.4 ChannelOutboundHandler
出站操作和數據將由 ChannelOutboundHandler 處理。它的方法將被 Channel、 ChannelPipeline 以及 ChannelHandlerContext 調用。
ChannelOutboundHandler 的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些復雜的方法來處理請求。例如, 如果到遠程節點的寫入被暫停了, 那么你可以推遲沖刷操作並在稍后繼續。

ChannelPromise與ChannelFuture: ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise參數, 以便在操作完成時得到通知。 ChannelPromiseChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 從而使ChannelFuture不可變.
1.5 ChannelHandler適配器
ChannelHandlerAdapter顧名思義,就是handler的適配器.你需要知道什么是適配器模式,假設有一個A接口,我們需要A的subclass實現功能,但是B類中正好有我們需要的功能,不想復制粘貼B中的方法和屬性了,那么可以寫一個適配器類Adpter繼承B實現A,這樣一來Adpter是A的子類並且能直接使用B中的方法,這種模式就是適配器模式.
就比如Netty中的SslHandler類,想使用ByteToMessageDecoder中的方法進行解碼,但是必須是ChannelHandler子類對象才能加入到ChannelPipeline中,通過如下簽名和其實現細節(SslHandler實現細節就不貼了)就能夠作為一個Handler去處理消息了.
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
下圖是ChannelHandler和Adpter的UML圖示.

ChannelHandlerAdapter提供了一些實用方法
isSharable()如果其對應的實現被標注為 Sharable, 那么這個方法將返回 true, 表示它可以被添加到多個 ChannelPipeline中 .如果想在自己的ChannelHandler中使用這些適配器類,只需要擴展他們,重寫那些想要自定義的方法即可.
1.6 資源管理
在使用ChannelInboundHandler.channelRead() 或ChannelOutboundHandler.write() 方法處理數據時要避免資源泄露,ByteBuf那篇文章提到過引用計數,當使用完某個ByteBuf之后記得調整引用計數.
Netty提供了一個class ResourceLeakDetector 來幫助診斷資源泄露,這能夠幫助你判斷應用的運行情況,但是如果希望提高吞吐量(比如搞一些競賽),關閉內存診斷可以提高吞吐量.

泄露檢測級別可以通過將下面的 Java 系統屬性設置為表中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED如果帶着該 JVM 選項重新啟動你的應用程序,你將看到自己的應用程序最近被泄漏的緩沖
區被訪問的位置。下面是一個典型的由單元測試產生的泄漏報告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...
應用程序處理消息釋放資源
消費入站消息釋放資源
@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);// 用於釋放資源的工具類
}
}
SimpleChannelInboundHandler 中的channelRead0()會消費消息之后自動釋放資源.
出站釋放資源
@Sharable
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
// 還是通過util工具類釋放資源
ReferenceCountUtil.release(msg);
// 通知ChannelPromise,消息已經處理
promise.setSuccess();
}
}
重要的是, 不僅要釋放資源,還要通知 ChannelPromise。否則可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的情況。總之,如果一個消息被消費或者丟棄了, 並且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那么用戶就有責任調用ReferenceCountUtil.release()。如果消息到達了實際的傳輸層, 那么當它被寫入時或者 Channel 關閉時,都將被自動釋放。
2 ChannelPipelin接口
Channel和ChannelPipeline
每一個新創建的 Channel 都將會被分配一個新的 ChannelPipeline。這項關聯是永久性的; Channel 既不能附加另外一個 ChannelPipeline,也不能分離其當前的。在 Netty 組件的生命周期中,這是一項固定的操作,不需要開發人員的任何干預。
ChannelHandler和ChannelHandlerContext
根據事件的起源,事件將會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理。隨后, 通過調用 ChannelHandlerContext 實現,它將被轉發給同一超類型的下一個ChannelHandler。
ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 屬 的 ChannelPipeline 中 的 下 一 個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline.
ChannelPipelin和ChannelHandler

這是一個同時具有入站和出站 ChannelHandler 的 ChannelPipeline 的布局,並且印證了我們之前的關於 ChannelPipeline 主要由一系列的 ChannelHandler 所組成的說法。 ChannelPipeline 還提供了通過 ChannelPipeline 本身傳播事件的方法。如果一個入站事件被觸發,它將被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。
你可能會說, 從事件途經 ChannelPipeline 的角度來看, ChannelPipeline 的頭部和尾端取決於該事件是入站的還是出站的。然而 Netty 總是將 ChannelPipeline 的入站口(圖 的左側)作為頭部,而將出站口(該圖的右側)作為尾端。
當你完成了通過調用 ChannelPipeline.add*()方法將入站處理器( ChannelInboundHandler)和 出 站 處 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 后 , 每 一 個ChannelHandler 從頭部到尾端的順序位置正如同我們方才所定義它們的一樣。因此,如果你將圖 6-3 中的處理器( ChannelHandler)從左到右進行編號,那么第一個被入站事件看到的 ChannelHandler 將是1,而第一個被出站事件看到的 ChannelHandler 將是 5。在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。如果不匹配, ChannelPipeline 將跳過該ChannelHandler 並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。 (當然, ChannelHandler 也可以同時實現ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)
2.1 修改ChannelPipeline
修改指的是添加或刪除ChannelHandler

代碼示例
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一個Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 這個Handler放在了first,意味着放在了handler1之前
pipeline.addFirst("handler2", new SecondHandler());
// 這個Handler被放到了last,意味着在handler1之后
pipeline.addLast("handler3", new ThirdHandler());
...
// 通過名稱刪除
pipeline.remove("handler3");
// 通過對象刪除
pipeline.remove(firstHandler);
// 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例
pipeline.replace("handler2", "handler4", new ForthHandler());
這種方式非常靈活,按照需要更換或插入handler達到我們想要的效果.
ChannelHandler的執行和阻塞
通常 ChannelPipeline 中的每一個 ChannelHandler 都是通過它的 EventLoop( I/O 線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的 I/O 處理產生負面的影響。
但有時可能需要與那些使用阻塞 API 的遺留代碼進行交互。對於這種情況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add()方法。如果一個事件被傳遞給一個自定義的 EventExecutorGroup ,它將被包含在這個 EventExecutorGroup 中的某個 EventExecutor 所處理,從而被從該Channel 本身的 EventLoop 中移除。對於這種用例, Netty 提供了一個叫 DefaultEventExecutorGroup 的默認實現。
pipeline對handler的操作

2.2 ChannelPipeline的出入站api
入站

出站

- ChannelPipeline 保存了與 Channel 相關聯的 ChannelHandler
- ChannelPipeline 可以根據需要,通過添加或者刪除 ChannelHandler 來動態地修改
- ChannelPipeline 有着豐富的 API 用以被調用,以響應入站和出站事件
3 ChannelHandlerContext接口
每當有ChannelHandler添加到ChannelPipeline中,都會創建ChannelHandlerContext.如果調用Channel或ChannelPipeline上的方法,會沿着整個ChannelPipeline傳播,如果調用ChannelHandlerContext上的相同方法,則會從對應的當前ChannelHandler進行傳播.
API


ChannelHandlerContext和ChannelHandler之間的關聯(綁定)是永遠不會改變的,所以緩存對它的引用是安全的;- 如同我們在本節開頭所解釋的一樣,相對於其他類的同名方法,
ChannelHandlerContext的方法將產生更短的事件流, 應該盡可能地利用這個特性來獲得最大的性能。
3.1 使用CHannelHandlerContext

從ChannelHandlerContext訪問channel
ChannelHandlerContext ctx = ..;
// 獲取channel引用
Channel channel = ctx.channel();
// 通過channel寫入緩沖區
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
從ChannelHandlerContext訪問ChannelPipeline
ChannelHandlerContext ctx = ..;
// 獲取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 通過ChannelPipeline寫入緩沖區
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

有時候我們不想從頭傳遞數據,想跳過幾個handler,從某個handler開始傳遞數據.我們必須獲取目標handler之前的handler關聯的ChannelHandlerContext.
ChannelHandlerContext ctx = ..;
// 直接通過ChannelHandlerContext寫數據,發送到下一個handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

好了,ChannelHandlerContext的基本使用應該掌握了,但是你真的理解ChannelHandlerContext,ChannelPipeline和Channelhandler之間的關系了嗎.我們老看一下Netty的源碼.
先看一下AbstractChannelHandlerContext類,這個類像不像雙向鏈表中的一個Node,
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
...
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
}
再來看一看DefaultChannelPipeline,ChannelPipeline中擁有ChannelHandlerContext這個節點的head和tail,
而且DefaultChannelPipeline類中並沒有ChannelHandler成員或handler數組.
public class DefaultChannelPipeline implements ChannelPipeline {
...
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
...
所以addFirst向pipeline中添加了handler到底添加到哪了呢.看一下pipeline中的addFirst方法
@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
}
@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 檢查handler是否具有復用能力,不重要
checkMultiplicity(handler);
// 名稱,不重要.
name = filterName(name, handler);
// 這個方法創建了DefaultChannelHandlerContext,handler是其一個成員屬性
// 你現在應該明白了上面說的添加handler會創建handlerContext了吧
newCtx = newContext(group, name, handler);
// 這個方法
addFirst0(newCtx);
// 這個方法是調整pipeline中HandlerContext的指針,
// 就是更新HandlerContext鏈表節點之間的位置
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
簡單總結一下,pipeline擁有context(本身像一個鏈表的節點)組成的節點的雙向鏈表首尾,可以看做pipeline擁有一個context鏈表,context擁有成員handler,這便是三者之間的關系.實際上,handler作為消息處理的主要組件,實現了和pipeline的解耦,我們可以只有一個handler,但是被封裝進不同的context能夠被不同的pipeline使用.
3.2 handler和context高級用法
緩存ChannelHandlerContext引用
@Sharable
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void send(String msg) {
ctx.writeAndFlush(msg);
}
}
因為一個 ChannelHandler 可以從屬於多個 ChannelPipeline,所以它也可以綁定到多個 ChannelHandlerContext 實例。 對於這種用法指在多個ChannelPipeline 中共享同一個 ChannelHandler, 對應的 ChannelHandler 必須要使用@Sharable 注解標注; 否則,試圖將它添加到多個 ChannelPipeline 時將會觸發異常。
@Sharable錯誤用法
@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
count++;
System.out.println("channelRead(...) called the "
+ count + " time");
ctx.fireChannelRead(msg);
}
}
這段代碼的問題在於它擁有狀態 , 即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個並發的Channel訪問時導致問題。(當然,這個簡單的問題可以通過使channelRead()方法變為同步方法來修正。)
總之,只應該在確定了你的 ChannelHandler 是線程安全的時才使用@Sharable 注解。
4.1 入站異常處理
處理入站事件的過程中有異常被拋出,那么它將從它在ChannelInboundHandler里被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你需要在你的 ChannelInboundHandler 實現中重寫下面的方法。
public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception
// 基本處理方式
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣), 所以實現了前面所示邏輯的 ChannelInboundHandler 通常位於 ChannelPipeline 的最后。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline 中的什么位置。
-
ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;
-
如果異常到達了 ChannelPipeline 的尾端,它將會被記錄為未被處理;
-
要想定義自定義的處理邏輯,你需要重寫 exceptionCaught()方法。然后你需要決定是否需要將該異常傳播出去。
4.2 出站異常處理
- 每個出站操作都將返回一個 ChannelFuture。 注冊到 ChannelFuture 的 ChannelFutureListener 將在操作完成時被通知該操作是成功了還是出錯了。
- 幾乎所有的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise
的實例。作為 ChannelFuture 的子類, ChannelPromise 也可以被分配用於異步通
知的監聽器。但是, ChannelPromise 還具有提供立即通知的可寫方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
1.添加ChannelFutureListener到ChannelFuture
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
2.添加ChannelFutureListener到ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}
