channel介紹:
netty中channel分為NioServerScoketChannel和NioSocketChannel,分別對應java nio中的ServerScoketChannel和SocketChannel

channel、pipeline、context、handler關系
ScoketChannel都會注冊到EventLoop上的selector中,每個channel內部都會有一個pipeline,pipeline管道 里面由多個handler連接成一個雙向鏈表結構,而handler又由ChannelHandlerContext包裹着,context相當於一個handler上下文,做到了承上啟下的作用,
從context可以得到handler,自然也能得到channel和pipeline,context內部有兩個指針分別指向前 后context,handler在pipeline向前或向后進行傳遞,當然順序只能由一個方向傳遞

head和tail表示頭尾,每一次有io事件進入,netty稱為入站 它始終從head入站像后進行傳遞,反之io事件出去稱為出站,也是從head出去,圖中入站出站看似形成了一個完整的雙向鏈表,但實際可能還沒到tail就結束了,
context分為inbound和outbound,入站和出站會判斷當前context是否符合 也是判斷context中handler是inboundhandler還是outboundhandler,入站只執行入站的context,出站也是如此,在初始化pipeline會默認創建head、tail,
它們分別表示頭 尾 位置固定不許修改,head和tail同時為inbound、outbound
先來看看handler是如何先加入pipeline中的,handler添加順序無論怎么添加,頭都是head 尾都是tail pipeline初始化就固定了
bossGroup.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline();
// 像pipeline添加一個handler p.addLast(new EchoServerHandler()); } });
// 將context添加到pipeline的"最后"
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
// 重新指向 依然保持着 tail》context》tail
tail.prev = newCtx;
}
在創建ServerBootGroup時會給workerGroup分配一個handler,后面每一個NioSocketChannel都會添加一個該handler, ChannelInitializer可以理解為幫助我們創建channel,它是一個inboundhandler,在第一次注冊時會調用initChannel來執行我們自定義的實現
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {// 這里會調用我們實現的initChannel方法,並把ctx中的channel傳過去 if (initChannel(ctx)) { // 初始化成功,這里就是我們入站的入口了,它是從pipeline發起的 ctx.pipeline().fireChannelRegistered(); } else { // 如果已經初始化過了,說明已經進入pipeline了,直接向后傳遞 ctx.fireChannelRegistered(); } }
下面分別列舉ChannelInboundHandler和ChannelOutboundHandler的方法讓大家知道哪些是入站執行的,哪些是出站執行的
public interface ChannelInboundHandler extends ChannelHandler { // 注冊 void channelRegistered(ChannelHandlerContext ctx) throws Exception; // 取消注冊 void channelUnregistered(ChannelHandlerContext ctx) throws Exception; // 處於活動狀態 void channelActive(ChannelHandlerContext ctx) throws Exception; // 處於非活動狀態 void channelInactive(ChannelHandlerContext ctx) throws Exception; // 讀取數據 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception; // 讀取數據完畢 void channelReadComplete(ChannelHandlerContext ctx) throws Exception; // 用戶自定義事件觸發,如發生心跳檢測時,會調用該方法把當前心跳事件傳播進來 void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception; // channel可寫狀態改變 void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception; // 發生異常 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; } public interface ChannelOutboundHandler extends ChannelHandler { // 綁定 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception; // 連接 void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception; // 斷開連接 void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; // 關閉 void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; // 注銷 void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception; // 暫時不知道這一步干啥的,按理說read操作是入站操作 void read(ChannelHandlerContext ctx) throws Exception; // 寫 void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception; // 刷新剛執行的操作 void flush(ChannelHandlerContext ctx) throws Exception; }
還有一個特殊的handler ChannelDuplexHandler,它同時繼承ChannelInboundHandler和ChannelOutboundHandler,但不推薦用它,容易混淆,建議我們自己寫的時候把入站和出站的handler分開
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler
ChannelHandlerContext傳播行為
前面我們講入站是像后進行事件傳遞,出站是向前進行事件傳遞,那么事件入口是如何進來的、怎么出去的,怎么保證執行的順序
如讀事件,發生read事件時,會交給NioUnsafe方法,Unsafe隨后會有介紹 它是netty中用來操作jdk中的nio,因為事件操作還是都交給jdk中的nio來,讀取數據時會調用pipeline.fireChannelRead(readBuf.get(i)),這樣就進入pipeline了 來開始事件傳播
private final class NioMessageUnsafe extends AbstractNioUnsafe { @Override public void read() { ~~~~~~~~~~~~~ int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // >>>>>>>>>進入入站讀操作 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); // 完成操作 pipeline.fireChannelReadComplete(); ~~~~~~~~ } }
pipeline開啟事件傳播
// 調用pipeline該方法開始事件傳播 public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } // >>>AbstractChannelHandlerContext.invokeChannelRead(head, msg) 由pipeline調用 然后進入context事件傳遞 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); // 判斷是否在當前線程,如果在當前線程直接調用,否則當成任務交給executor執行 if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { // 判斷handler是否已經被調用 if (invokeHandler()) { try { /** * 調用handler的讀操作,用戶通過實現該方法完成自定義讀事件邏輯,如果讀取完后需要向后傳遞,需要在channelRead自定義方法中繼續調用context.fireChannelRead(msg) * **/ ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { // 已經被調用 調過當前handler向后傳遞 fireChannelRead(msg); } } public ChannelHandlerContext fireChannelRead(final Object msg) { // 找出下一個入站handler繼續向后傳遞 invokeChannelRead(findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; // 不停地獲取下一個handler直到是inboundhandler返回 do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }

這就是讀數據入站的大致傳播流程,從head入站直至tail或中途停止事件傳播, 出站流程類似 我就不貼了
在講解服務端和客戶端啟動流程前我們還需要再熟悉幾個重要類
ChannelFuture、Promise、Unsafe
我們先來channelfuture和promise的繼承結構

- ChannelFuture:nio既然是異步執行的,那么必定有異步執行結果,跟線程池一樣,netty也對應有一個future
- Promise:Promise也繼承於future,聽起來是不是和channelfuture功能重復了是不是,它兩主要區別是,channelfuture可以得到對應的channel,promise可以主動設置異步執行狀態,實際使用的ChannelPromise的實現類DefaultChannelPromise,channelPromise又同時繼承channelfuture和實現promise,我個人不太理解為啥有channelfuture還多設計一個promise,直接在channelfuture加一個設置異步狀態的接口不就好了,我猜想可能是promise可以當做一個脫離channel普通的異步實現,優秀的框架內部“果然都是可高度自定義重寫的”
- Unsafe:unsafe讀過jdk源碼的人應該很熟悉了,它是一個可直接進行原子化操作的工具,netty中的unsafe是用來操作jdk中的nio操作,我們前面說過netty就是一個在jdk原生nio上進行封裝優化,所以內部網絡通信肯定還是依靠jdk的nio實現的
public interface Future<V> extends java.util.concurrent.Future<V> { // 是否成功 boolean isSuccess(); // 是否取消 boolean isCancellable(); // 執行時候發生的異常 Throwable cause(); // 添加監聽器 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // 批量添加監聽器 Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 移除監聽器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); // 移除多個監聽器 Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 同步阻塞,內部先執行await() 如果執行任務有發生異常會重新拋出 Future<V> sync() throws InterruptedException; // 與sync區別是,如果等待過程中發生中斷,會將當前線程也一起中斷,不響應中斷 Future<V> syncUninterruptibly(); // 同步阻塞,它與sync區別是,任務執行失敗不會拋出異常 Future<V> await() throws InterruptedException; // 同理 Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 獲得執行結果,但不阻塞 V getNow(); // 取消任務,並調用通知喚醒阻塞,然后調用監聽器,mayInterruptIfRunning值好像沒啥作用 boolean cancel(boolean mayInterruptIfRunning); } public interface Promise<V> extends Future<V> { // 設置任務結果為成功 如果任務已經完成則拋出異常 Promise<V> setSuccess(V result); // 設置任務結果為成功,返回設置結果 不拋出異常 boolean trySuccess(V result); // 設置任務結果為失敗 如果任務已經完成則拋出異常 Promise<V> setFailure(Throwable cause); // 設置任務結果為失敗,返回設置結果 不拋出異常 boolean tryFailure(Throwable cause); } public interface ChannelFuture extends Future<Void> { // 返回當前channel Channel channel(); }
我們這節先大致介紹channel、handler、ChannelHandlerContext、pipeline 作用和方法,具體如何創建並使用,它們之前是怎么相互配合工作,我們下節通過對服務端和客戶端啟動過程進行分析過就會清晰許多
