Netty筆記(6) - 粘包拆包問題及解決方案


Netty 中 TCP 粘包拆包問題

信息通過tcp傳輸過程中出現的狀況 .

TCP是個“流”協議,所謂流,就是沒有界限的一串數據。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送

產生粘包和拆包問題的主要原因是,操作系統在發送TCP數據的時候,底層會有一個緩沖區,例如1024個字節大小,如果一次請求發送的數據量比較小,沒達到緩沖區大小,TCP則會將多個請求合並為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的數據量比較大,超過了緩沖區大小,TCP就會將其拆分為多次發送,這就是拆包,也就是將一個大的包拆分為多個小包進行發送。

入圖所示:

上圖中演示了粘包和拆包的三種情況:

  • D1和D2兩個包都剛好滿足TCP緩沖區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行發送;
  • D1和D2兩次請求間隔時間內較短,並且數據包較小,因而合並為同一個包發送給服務端;
  • 某一個包比較大,因而將其拆分為兩個包D*_1和D*_2進行發送,而這里由於拆分后的某一個包比較小,其又與另一個包合並在一起發送。

發生這種情況的代碼:

客戶端發送數據 快速的發送 10條數據 :

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端發送10條數據 hello,server 編號
        for(int i= 0; i< 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " +i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }

}

服務端接受打印:

服務器接收到數據 hello,server 0
服務器接收到數據 hello,server 1
服務器接收到數據 hello,server 2hello,server 3
服務器接收到數據 hello,server 4hello,server 5
服務器接收到數據 hello,server 6
服務器接收到數據 hello,server 7hello,server 8
服務器接收到數據 hello,server 9

很明顯 其中有三條記錄被粘在其他數據上,這就是TCP的粘包拆包現象

怎么解決:

  1. Netty自帶的 解決方案:

    • 固定長度的拆包器 FixedLengthFrameDecoder,每個應用層數據包的都拆分成都是固定長度的大小

    • 行拆包器 LineBasedFrameDecoder,每個應用層數據包,都以換行符作為分隔符,進行分割拆分

    • 分隔符拆包器 DelimiterBasedFrameDecoder,每個應用層數據包,都通過自定義的分隔符,進行分割拆分

    • 基於數據包長度的拆包器 LengthFieldBasedFrameDecoder,將應用層數據包的長度,作為接收端應用層數據包的拆分依據。按照應用層數據包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含數據包的長度

FixedLengthFrameDecoder 解碼器

服務端 添加 FixedLengthFrameDecoder 解碼器 並指定長度

public class EchoServer {



  public static void main(String[] args) throws InterruptedException {

      EventLoopGroup bossGroup = new NioEventLoopGroup();
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      try {
          ServerBootstrap bootstrap = new ServerBootstrap();
          bootstrap.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .option(ChannelOption.SO_BACKLOG, 1024)
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          //指定長度為9 則每次截取長度為9的字節 
                          ch.pipeline().addLast(new FixedLengthFrameDecoder(9));
                         // 將 每次截取的字節編碼為字符串
                          ch.pipeline().addLast(new StringDecoder());
						//自定義處理類打印
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

          ChannelFuture future = bootstrap.bind(8000).sync();
          future.channel().closeFuture().sync();
      } finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
  }
}

自定義服務端Handler 打印字符串:

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("message: " + msg.trim());
  }
}

客戶端發送信息 並添加字符串編碼器 將信息已字符串的形式編碼:

public class EchoClient {



  public static void main(String[] args) throws InterruptedException {
      EventLoopGroup group = new NioEventLoopGroup();
      try {
          Bootstrap bootstrap = new Bootstrap();
          bootstrap.group(group)
                  .channel(NioSocketChannel.class)
                  .option(ChannelOption.TCP_NODELAY, true)
                  .handler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          
                          ch.pipeline().addLast(new StringEncoder());
                          ch.pipeline().addLast(new EchoClientHandler());
                      }
                  });

          ChannelFuture future = bootstrap.connect("127.0.0.1", 8000).sync();
          future.channel().closeFuture().sync();
      } finally {
          group.shutdownGracefully();
      }
  }
}

客戶端Handler 發送信息 剛好長度為9 :

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("123456789");
  }
}

總結: FixedLengthFrameDecoder 解碼器 將按照指定長度截取字節 並添加到List中向后傳遞 , 以本案例為例,如果字節數剛好為9,則全部打印,如果 字節數為18, 則拆分打印兩次,如果為19 則最后一個字節不打印,如果不足9 則什么都不打印.

LineBasedFrameDecoder 行拆分器

通過行換行符 \n 或者 \r\n 進行分割,

將上面案例的FixedLengthFrameDecoder 解碼器 換成 LineBasedFrameDecoder

並指定 截取每段的最大長度 (超過報錯 不往后傳遞)

...
    
.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {

//                          ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                          ch.pipeline().addLast(new                       LineBasedFrameDecoder(5));
                          // 將前一步解碼得到的數據轉碼為字符串
                          ch.pipeline().addLast(new StringDecoder());
//                           最終的數據處理
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

...

客戶端Handler 發送字符串, 最后的"1234" 不會打印,,

@Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("1\n123456\r\n1234");
  }

服務端接收並打印結果 分別打印了 "1" 和 "1234" 而超過字節長度5 的 "123456"則報出TooLongFrameException錯誤

server receives message: 1

An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.TooLongFrameException: frame length (6) exceeds the allowed maximum (5)

server receives message: 1234

DelimiterBasedFrameDecoder 自定義分割符

和行分割符類似, 此解碼器可以自定義分割符,常用構造方法:

 public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)

接收一個最大長度,和 任意個數的 分隔符(用ByteBuf的形式傳入),解碼器識別到任意一個 分割符 都會進行拆分

注冊解碼器:

傳入 "$" 和 "*" 作為分割符,並指定最大長度為 5個字節

.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {

                          ch.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
                              Unpooled.wrappedBuffer("$".getBytes()),Unpooled.wrappedBuffer("*".getBytes())));
                          // 將前一步解碼得到的數據轉碼為字符串
                          ch.pipeline().addLast(new StringDecoder());
                          
//                           最終的數據處理
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

客戶端 發送數據:

@Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ctx.writeAndFlush("1$123456*1234$789$");
  }

服務端只打印了 "1" 當解析到 "123456" 時 就報錯了 后面就沒有再解析了,會緩存着 等到該通道關閉 或者有后續數據發送過來時 才繼續解析

LengthFieldBasedFrameDecoder

自定義數據長度,發送的 字節數組中 包含 描述 數據長度的字段 和 數據本身,

解碼過程

常用字段:

  • maxFrameLength:指定了每個包所能傳遞的最大數據包大小,(上圖中的最大長度為11)
  • lengthFieldOffset:指定了長度字段在字節碼中的偏移量;(11這個描述長度的數據是在數組的第幾位開始)
  • lengthFieldLength:指定了長度字段所占用的字節長度;(11 占 1個字節)
  • lengthAdjustment: 長度域的偏移量矯正。 如果長度域的值,除了包含有效數據域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。 ( 11 這個域 不光光描述 Hello,world, 一般設置為0,)
  • initialBytesToStrip : 丟棄的起始字節數。丟棄處於有效數據前面的字節數量。比如前面有1個節點的長度域,則它的值為1. ( 如果為0代表不丟棄,則將長度域也向后傳遞)

服務端添加 解碼器:

  • 最大長度 為 長度描述域 的值11 + 長度描述域本身占用的長度 1 = 12
  • 長度描述域放在數據包的第一位, 沒有偏移 為0
  • 長度描述域 長度為1
  • 無需矯正
  • 一個字節也不丟棄
.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(12,0,1,0,0));
            // 將前一步解碼得到的數據轉碼為字符串

            ch.pipeline().addLast(new StringDecoder());
            // 最終的數據處理
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

客戶端發送數據 發送最Netty 底層操作 的ByteBuf對象 發送時 無需任何編碼:

 @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ByteBuf buffer = Unpooled.buffer();
    buffer.writeByte(11);
    buffer.writeBytes("Hello,World".getBytes());
    ctx.writeAndFlush(buffer);
  }

服務端接收數據為 (11代表的制表符)Hello,World

這樣發送 每次都要計算 數據長度,並手動添加到 數據的前面,很不方便 配合LengthFieldPrepender 使用,這個編碼碼器可以計算 長度,並自動添加到 數據的前面

改造客戶端 先攔截數據按字符串編碼,再計算字節長度 添加 長度描述字段 並占用一個字節 (這個長度要與客戶端的解碼器 lengthFieldLength參數 值保持一致) :

 .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {

            ch.pipeline().addLast(new LengthFieldPrepender(1));
            ch.pipeline().addLast(new StringEncoder());
            // 客戶端發送消息給服務端,並且處理服務端響應的消息
            ch.pipeline().addLast(new EchoClientHandler());
          }
        });

客戶端發送 有字符串編碼器 可以直接發送字符串:

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("Hello,World");
  }

自定義協議

上面介紹的 各種解碼器 已經可以應付絕大多數場景, 如果遇到 特殊的狀況 我們也可以自定義協議

定義 協議對象:

//協議包
public class MessageProtocol {
    private int len; //關鍵
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

客戶端發送:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       
        for(int i = 0; i< 5; i++) {
            String mes = "Hello,World";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));
            int length = mes.getBytes(Charset.forName("utf-8")).length;

            //創建協議包對象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

該協議的 自定義 編碼器 將協議包發送出去:

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被調用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

將客戶端 發送數據的Handler 和 編碼器 注冊 這里就不寫了

服務端解碼器 讀取長度 並 判斷可讀數據的長度是否足夠 :

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        
        in.markReaderIndex();
        
        //讀取長度
        int length = in.readInt();
        //如果可讀長度大於 數據長度 說明數據完整
        if (in.readableBytes()>length){
            byte[] content = new byte[length];
            in.readBytes(content);
            //封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            out.add(messageProtocol);
        }else{
            //如果數據不夠長 將已經讀過的的int 數據還原回去 留下次讀取
            in.resetReaderIndex();
        }
    }
}

服務端成功讀取:

本例中存在很多問題, 明白這個意思就行, 感興趣的話 可以 自己動手優化


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM