netty核心組件之channel、handler、ChannelHandlerContext、pipeline


channel介紹:

  netty中channel分為NioServerScoketChannel和NioSocketChannel,分別對應java nio中的ServerScoketChannel和SocketChannel

 channel、pipeline、context、handler關系

  ScoketChannel都會注冊到EventLoop上的selector中,每個channel內部都會有一個pipeline,pipeline管道 里面由多個handler連接成一個雙向鏈表結構,而handler又由ChannelHandlerContext包裹着,context相當於一個handler上下文,做到了承上啟下的作用,

從context可以得到handler,自然也能得到channel和pipeline,context內部有兩個指針分別指向前 后context,handler在pipeline向前或向后進行傳遞,當然順序只能由一個方向傳遞

head和tail表示頭尾,每一次有io事件進入,netty稱為入站 它始終從head入站像后進行傳遞,反之io事件出去稱為出站,也是從head出去,圖中入站出站看似形成了一個完整的雙向鏈表,但實際可能還沒到tail就結束了,

context分為inbound和outbound,入站和出站會判斷當前context是否符合 也是判斷context中handler是inboundhandler還是outboundhandler,入站只執行入站的context,出站也是如此,在初始化pipeline會默認創建head、tail,

它們分別表示頭 尾 位置固定不許修改,head和tail同時為inbound、outbound

先來看看handler是如何先加入pipeline中的,handler添加順序無論怎么添加,頭都是head 尾都是tail pipeline初始化就固定了

bossGroup.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
     // 像pipeline添加一個handler p.addLast(
new EchoServerHandler()); } });

// 將context添加到pipeline的"最后"
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
  // 重新指向 依然保持着 tail》context》tail
tail.prev = newCtx;
}
 

在創建ServerBootGroup時會給workerGroup分配一個handler,后面每一個NioSocketChannel都會添加一個該handler, ChannelInitializer可以理解為幫助我們創建channel,它是一個inboundhandler,在第一次注冊時會調用initChannel來執行我們自定義的實現

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {// 這里會調用我們實現的initChannel方法,並把ctx中的channel傳過去
        if (initChannel(ctx)) {
            // 初始化成功,這里就是我們入站的入口了,它是從pipeline發起的
            ctx.pipeline().fireChannelRegistered();
        } else {
            // 如果已經初始化過了,說明已經進入pipeline了,直接向后傳遞
            ctx.fireChannelRegistered();
        }
    }

 

下面分別列舉ChannelInboundHandler和ChannelOutboundHandler的方法讓大家知道哪些是入站執行的,哪些是出站執行的

 

public interface ChannelInboundHandler extends ChannelHandler {

    // 注冊
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    // 取消注冊
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    // 處於活動狀態
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    // 處於非活動狀態
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    // 讀取數據
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    // 讀取數據完畢
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    // 用戶自定義事件觸發,如發生心跳檢測時,會調用該方法把當前心跳事件傳播進來
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    // channel可寫狀態改變
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    // 發生異常
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

public interface ChannelOutboundHandler extends ChannelHandler {
   
    // 綁定
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    // 連接
    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    // 斷開連接
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    // 關閉
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    // 注銷
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    // 暫時不知道這一步干啥的,按理說read操作是入站操作
    void read(ChannelHandlerContext ctx) throws Exception;

    //
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    // 刷新剛執行的操作
    void flush(ChannelHandlerContext ctx) throws Exception;
}

還有一個特殊的handler ChannelDuplexHandler,它同時繼承ChannelInboundHandler和ChannelOutboundHandler,但不推薦用它,容易混淆,建議我們自己寫的時候把入站和出站的handler分開

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler

ChannelHandlerContext傳播行為

  前面我們講入站是像后進行事件傳遞,出站是向前進行事件傳遞,那么事件入口是如何進來的、怎么出去的,怎么保證執行的順序

如讀事件,發生read事件時,會交給NioUnsafe方法,Unsafe隨后會有介紹 它是netty中用來操作jdk中的nio,因為事件操作還是都交給jdk中的nio來,讀取數據時會調用pipeline.fireChannelRead(readBuf.get(i)),這樣就進入pipeline了 來開始事件傳播

private final class NioMessageUnsafe extends AbstractNioUnsafe {
    @Override
    public void read() {
        ~~~~~~~~~~~~~
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // >>>>>>>>>進入入站讀操作
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 完成操作
        pipeline.fireChannelReadComplete();
        ~~~~~~~~
    }
}

pipeline開啟事件傳播

// 調用pipeline該方法開始事件傳播
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

// >>>AbstractChannelHandlerContext.invokeChannelRead(head, msg) 由pipeline調用 然后進入context事件傳遞
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    // 判斷是否在當前線程,如果在當前線程直接調用,否則當成任務交給executor執行
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
    
private void invokeChannelRead(Object msg) {
    // 判斷handler是否已經被調用
    if (invokeHandler()) {
        try {
             /**
             * 調用handler的讀操作,用戶通過實現該方法完成自定義讀事件邏輯,如果讀取完后需要向后傳遞,需要在channelRead自定義方法中繼續調用context.fireChannelRead(msg)
             * 
             **/
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        // 已經被調用 調過當前handler向后傳遞
        fireChannelRead(msg);
    }
}

public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 找出下一個入站handler繼續向后傳遞
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    // 不停地獲取下一個handler直到是inboundhandler返回 
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

這就是讀數據入站的大致傳播流程,從head入站直至tail或中途停止事件傳播, 出站流程類似 我就不貼了

在講解服務端和客戶端啟動流程前我們還需要再熟悉幾個重要類

ChannelFuture、Promise、Unsafe

我們先來channelfuture和promise的繼承結構

  • ChannelFuture:nio既然是異步執行的,那么必定有異步執行結果,跟線程池一樣,netty也對應有一個future
  • Promise:Promise也繼承於future,聽起來是不是和channelfuture功能重復了是不是,它兩主要區別是,channelfuture可以得到對應的channel,promise可以主動設置異步執行狀態,實際使用的ChannelPromise的實現類DefaultChannelPromise,channelPromise又同時繼承channelfuture和實現promise,我個人不太理解為啥有channelfuture還多設計一個promise,直接在channelfuture加一個設置異步狀態的接口不就好了,我猜想可能是promise可以當做一個脫離channel普通的異步實現,優秀的框架內部“果然都是可高度自定義重寫的
  • Unsafe:unsafe讀過jdk源碼的人應該很熟悉了,它是一個可直接進行原子化操作的工具,netty中的unsafe是用來操作jdk中的nio操作,我們前面說過netty就是一個在jdk原生nio上進行封裝優化,所以內部網絡通信肯定還是依靠jdk的nio實現的
public interface Future<V> extends java.util.concurrent.Future<V> {

    // 是否成功
    boolean isSuccess();

    // 是否取消
    boolean isCancellable();

    // 執行時候發生的異常
    Throwable cause();

    // 添加監聽器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    // 批量添加監聽器
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 移除監聽器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    // 移除多個監聽器
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    // 同步阻塞,內部先執行await() 如果執行任務有發生異常會重新拋出
    Future<V> sync() throws InterruptedException;

    // 與sync區別是,如果等待過程中發生中斷,會將當前線程也一起中斷,不響應中斷
    Future<V> syncUninterruptibly();

    // 同步阻塞,它與sync區別是,任務執行失敗不會拋出異常
    Future<V> await() throws InterruptedException;

    // 同理
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    boolean await(long timeoutMillis) throws InterruptedException;

    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    boolean awaitUninterruptibly(long timeoutMillis);

    // 獲得執行結果,但不阻塞
    V getNow();

    // 取消任務,並調用通知喚醒阻塞,然后調用監聽器,mayInterruptIfRunning值好像沒啥作用
    boolean cancel(boolean mayInterruptIfRunning);
}

public interface Promise<V> extends Future<V> {

    // 設置任務結果為成功 如果任務已經完成則拋出異常
    Promise<V> setSuccess(V result);

    // 設置任務結果為成功,返回設置結果 不拋出異常
    boolean trySuccess(V result);

    // 設置任務結果為失敗 如果任務已經完成則拋出異常
    Promise<V> setFailure(Throwable cause);

    // 設置任務結果為失敗,返回設置結果 不拋出異常
    boolean tryFailure(Throwable cause);
}

public interface ChannelFuture extends Future<Void> {
    
    // 返回當前channel
    Channel channel();
}

我們這節先大致介紹channel、handler、ChannelHandlerContext、pipeline 作用和方法,具體如何創建並使用,它們之前是怎么相互配合工作,我們下節通過對服務端和客戶端啟動過程進行分析過就會清晰許多

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM