Netty 編解碼器和 handler 的調用機制


1.基本說明
1) netty 的組件設計: Netty 的主要組件有 ChannelEventLoopChannelFutureChannelHandlerChannelPipe
2) ChannelHandler 充當了處理入站和出站數據的應用程序邏輯的容器。 例如, 實現 ChannelInboundHandler 接口(或
   ChannelInboundHandlerAdapter) , 你就可以接收入站事件和數據, 這些數據會被業務邏輯處理。 當要給客戶端
   發 送 響 應 時 , 也 可 以 從 ChannelInboundHandler 沖 刷 數 據 。 業 務 邏 輯 通 常 寫 在 一 個 或 者 多 個
   ChannelInboundHandler 中。 ChannelOutboundHandler 原理一樣, 只不過它是用來處理出站數據的
3) ChannelPipeline 提供了 ChannelHandler 鏈的容器。 以客戶端應用程序為例, 如果事件的運動方向是從客戶端到
  服務端的, 那么我們稱這些事件為出站的, 即客戶端發送給服務端的數據會通過 pipeline 中的一系列ChannelOutboundHandler, 並被這些 Handler 處理, 反之則稱為入站的 


  2.編碼解碼器 

1) Netty 發送或者接受一個消息的時候, 就將會發生一次數據轉換。 入站消息會被解碼: 從字節轉換為另一種格式(比如 java 對象) ; 如果是出站消息, 它會被編碼成字節。

2) Netty 提供一系列實用的編解碼器, 他們都實現了 ChannelInboundHadnler 或者 ChannelOutboundHandler 接口。在這些類中, channelRead 方法已經被重寫了。 以入站為例, 對於每個從入站 Channel 讀取的消息, 這個方法會
被調用。 隨后, 它將調用由解碼器所提供的 decode()方法進行解碼, 並將已經解碼的字節轉發給 ChannelPipeline中的下一個 ChannelInboundHandler。 

注意:進站和出站都是想對而言


 3 解碼器-ByteToMessageDecoder

1) 關系繼承圖

 2) 由於不可能知道遠程節點是否會一次性發送一個完整的信息, tcp 有可能出現粘包拆包的問題, 這個類會對入站數據進行緩沖, 直到它准備好被處理.
 3) 一個關於 ByteToMessageDecoder 實例分析

 


4 Netty handler 鏈的調用機制

實例要求:
1) 使用自定義的編碼器和解碼器來說明 Netty handler 調用機制
  客戶端發送 long -> 服務器
  服務端發送 long -> 客戶端
2) 案例演示

 3) 結論

  不論解碼器 handler 還是 編碼器 handler 即接收的消息類型必須與待處理的消息類型一致, 否則該 handler 不會被執行
  在解碼器 進行數據解碼時, 需要判斷 緩存區(ByteBuf)的數據是否足夠 , 否則接收到的結果會期望結果可能不一致 

代碼:

MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class MyServer {
    public static void main(String[] args) throws Exception{

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類


            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
View Code
MyServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();//一會下斷點

        //入站的handler進行解碼 MyByteToLongDecoder
        //pipeline.addLast(new MyByteToLongDecoder());
        pipeline.addLast(new MyByteToLongDecoder2());
        //出站的handler進行編碼
        pipeline.addLast(new MyLongToByteEncoder());
        //自定義的handler 處理業務邏輯
        pipeline.addLast(new MyServerHandler());
        System.out.println("xx");
    }
}
View Code
MyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("從客戶端" + ctx.channel().remoteAddress() + " 讀取到long " + msg);

        //給客戶端發送一個long
        ctx.writeAndFlush(98765L);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
View Code
MyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {
    public static void main(String[] args)  throws  Exception{

        EventLoopGroup group = new NioEventLoopGroup();

        try {

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定義一個初始化類

            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

            channelFuture.channel().closeFuture().sync();

        }finally {
            group.shutdownGracefully();
        }
    }
}
View Code
MyClientInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        //加入一個出站的handler 對數據進行一個編碼
        pipeline.addLast(new MyLongToByteEncoder());

        //這時一個入站的解碼器(入站handler )
        //pipeline.addLast(new MyByteToLongDecoder());
        pipeline.addLast(new MyByteToLongDecoder2());
        //加入一個自定義的handler , 處理業務
        pipeline.addLast(new MyClientHandler());


    }
}
View Code
MyClientHandler
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.nio.charset.Charset;

public class MyClientHandler  extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

        System.out.println("服務器的ip=" + ctx.channel().remoteAddress());
        System.out.println("收到服務器消息=" + msg);

    }

    //重寫channelActive 發送數據

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler 發送數據");
        //ctx.writeAndFlush(Unpooled.copiedBuffer(""))
        ctx.writeAndFlush(123456L); //發送的是一個long

        //分析
        //1. "abcdabcdabcdabcd" 是 16個字節
        //2. 該處理器的前一個handler 是  MyLongToByteEncoder
        //3. MyLongToByteEncoder 父類  MessageToByteEncoder
        //4. 父類  MessageToByteEncoder
        /*

         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) { //判斷當前msg 是不是應該處理的類型,如果是就處理,不是就跳過encode
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        }
        4. 因此我們編寫 Encoder 是要注意傳入的數據類型和處理的數據類型一致
        */
       // ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));

    }
}
View Code
MyByteToLongDecoder
解碼器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MyByteToLongDecoder extends ByteToMessageDecoder {
    /**
     *
     * decode 會根據接收的數據,被調用多次, 直到確定沒有新的元素被添加到list
     * , 或者是ByteBuf 沒有更多的可讀字節為止
     * 如果list out 不為空,就會將list的內容傳遞給下一個 channelinboundhandler處理, 該處理器的方法也會被調用多次
     *
     * @param ctx 上下文對象
     * @param in 入站的 ByteBuf
     * @param out List 集合,將解碼后的數據傳給下一個handler
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("MyByteToLongDecoder 被調用");
        //因為 long 8個字節, 需要判斷有8個字節,才能讀取一個long
        if(in.readableBytes() >= 8) {
            out.add(in.readLong());
        }
    }
}
View Code

解碼器-ReplayingDecoder

 1)  public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
 2)  ReplayingDecoder 擴展了 ByteToMessageDecoder 類, 使用這個類, 我們不必調用 readableBytes()方法。 參數 S指定了用戶狀態管理的類型, 其中 Void 代表不需要狀態管理
 3)  應用實例: 使用 ReplayingDecoder 編寫解碼器, 對前面的案例進行簡化 [案例演示 

MyByteToLongDecoder2
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("MyByteToLongDecoder2 被調用");
        //在 ReplayingDecoder 不需要判斷數據是否足夠讀取,內部會進行處理判斷
        out.add(in.readLong());


    }
}
View Code

4)  ReplayingDecoder 使用方便, 但它也有一些局限性:
  1. 並 不 是 所 有 的 ByteBuf 操 作 都 被 支 持 , 如 果 調 用 了 一 個 不 被 支 持 的 方 法 , 將 會 拋 出 一 個UnsupportedOperationException
  2. ReplayingDecoder 在某些情況下可能稍慢於 ByteToMessageDecoder, 例如網絡緩慢並且消息格式復雜時,
      消息會被拆成了多個碎片, 速度變慢 。


6 其它編解碼器

1) LineBasedFrameDecoder: 這個類在 Netty 內部也有使用, 它使用行尾控制字符(\n 或者\r\n) 作為分隔符來解析數據。
2) DelimiterBasedFrameDecoder: 使用自定義的特殊字符作為消息的分隔符。
3) HttpObjectDecoder: 一個 HTTP 數據的解碼器
4) LengthFieldBasedFrameDecoder: 通過指定長度來標識整包消息, 這樣就可以自動的處理黏包和半包消息。


7 其它編碼器


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM