Netty中數據包的拆分粘包處理方案,以及對protobuf協議中的拆包粘包方案自定義重寫


1、netty中的拆分粘包處理方案

TCP粘包和拆包

TCP是個“流”協議,所謂流,就是沒有界限的一串數據。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。

img

如圖所示,假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到的字節數是不確定的,故可能存在以下4種情況。

  1. 服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
  2. 服務端一次接收到了兩個數據包,D1和D2粘合在一起,被稱為TCP粘包;
  3. 服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余內容,這被稱為TCP拆包
  4. 服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余內容D1_2和D2包的整包。

如果此時服務端TCP接收滑窗非常小,而數據包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。

TCP粘包和拆包產生的原因

數據從發送方到接收方需要經過操作系統的緩沖區,而造成粘包和拆包的主要原因就在這個緩沖區上。粘包可以理解為緩沖區數據堆積,導致多個請求數據粘在一起,而拆包可以理解為發送的數據大於緩沖區,進行拆分處理。

img

詳細來說,造成粘包和拆包的原因主要有以下三個:

  1. 應用程序write寫入的字節大小大於套接口發送緩沖區大小
  2. 進行MSS大小的TCP分段
  3. 以太網幀的payload大於MTU進行IP分片。

img

粘包和拆包的解決方法

由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下。

  1. 消息長度固定,累計讀取到長度和為定長LEN的報文后,就認為讀取到了一個完整的信息
  2. 將回車換行符作為消息結束符
  3. 將特殊的分隔符作為消息的結束標志,回車換行符就是一種特殊的結束分隔符
  4. 通過在消息頭中定義長度字段來標識消息的總長度

因為前3個在實際中用的非常少,所以這里主要對4進行說明。

使用LengthFieldBasedFrameDecoder作為decoder實現,LengthFieldBasedFrameDecoder構造函數,第一個參數為信息最大長度,超過這個長度回報異常,第二參數為長度屬性的起始(偏移)位,我們的協議中長度是0到第3個字節,所以這里寫0,第三個參數為“長度屬性”的長度,我們是4個字節,所以寫4,第四個參數為長度調節值,在總長被定義為包含包頭長度時,修正信息長度,第五個參數為跳過的字節數,根據需要我們跳過前4個字節,以便接收端直接接受到不含“長度屬性”的內容。

public class EchoServer {

  public void bind(int port) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這里將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數據
            // 進行長度字段解碼,這里也會對數據進行粘包和拆包處理
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
            // LengthFieldPrepender是一個編碼器,主要是在響應字節數據前面添加字節長度字段
            ch.pipeline().addLast(new LengthFieldPrepender(2));
            // 對經過粘包和拆包處理之后的數據進行json反序列化,從而得到User對象
            ch.pipeline().addLast(new JsonDecoder());
            // 對響應數據進行編碼,主要是將User對象序列化為json
            ch.pipeline().addLast(new JsonEncoder());
            // 處理客戶端的請求的數據,並且進行響應
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

      ChannelFuture future = bootstrap.bind(port).sync();
      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new EchoServer().bind(8080);
  }
}
這里EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負責將響應的User對象序列化為json對象,然后在其字節數組前面添加一個長度字段的字節數組;解碼一器主要是對接收到的數據進行長度字段的解碼,然后將其反序列化為一個User對象

  

2、Protobuf協議傳輸中對粘包和拆包自定義處理

之所以進行自定義處理是因為項目中的客戶端不是使用netty來寫的,使用基於c++的原生socket實現,所以為了和客戶端一致,對

protobuf協議進行了修改:

   private static void nettyProcessService(final Properties prop, final KafkaStringProducerService kafkaProducerService1,
                                            final KafkaStringProducerService kafkaProducerService2, final KafkaStringProducerService kafkaProducerService3,
                                            final ExecutorService executor1, final ExecutorService executor2, final ConcurrentHashMap<String, Channel> mapChannels, final RedisPool redisPool)
    {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        try {
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 半包處理
                            //socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                            socketChannel.pipeline().addLast(new ProtobufFixed32FrameDecoderRedefine());
                            socketChannel.pipeline().addLast(new ProtobufDecoder(protobuf.MsgProto.MsgProtoInfo.getDefaultInstance()));
                            //socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                            socketChannel.pipeline().addLast(new ProtobufFixed32LengthFieldPrependerRedefine());
                            socketChannel.pipeline().addLast(new ProtobufEncoder());
                            socketChannel.pipeline().addLast(new SamplingReqServerHandler(prop, kafkaProducerService1, kafkaProducerService2,
                                    kafkaProducerService3, executor1, executor2, mapChannels, redisPool));
                        }
                    });

            ChannelFuture future = b.bind(Integer.parseInt(prop.getProperty("PORT"))).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            logger.info("**************** Netty Serve 已關閉 ****************");
        }
    }

 

這里主要說明對ProtobufFixed32FrameDecoder進行復寫,修改其編解碼函數。

ProtobufFixed32FrameDecoderRedefine

public class ProtobufFixed32FrameDecoderRedefine extends ByteToMessageDecoder {

    public ProtobufFixed32FrameDecoderRedefine()
    {
    }

    @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
    {
            in.markReaderIndex();
            int preIndex = in.readerIndex();
            in.markReaderIndex();
            byte[] frontBytes = new byte[4];
            if (in.readableBytes() < 4){
                throw new CorruptedFrameException("less min length[4]: " + in.readableBytes());
            }
            in.readBytes(frontBytes); //讀取前4個字節 int length = bytesToInt(frontBytes); //自定義字節序獲取前四個字節表示的長度
            if (preIndex != in.readerIndex()) {
                if (length < 0) {
                    throw new CorruptedFrameException("negative length: " + length);
                } else {
                    if (in.readableBytes() < length) {
                        in.resetReaderIndex();
                    } else {
                        out.add(in.readRetainedSlice(length)); //讀取相應長度的數據
                    }

                }
            }
        }

    public static int bytesToInt(byte b[]) {
        return  b[3] & 0xff
                | (b[2] & 0xff) << 8
                | (b[1] & 0xff) << 16
                | (b[0] & 0xff) << 24;
    }
}

 

ProtobufFixed32LengthFieldPrependerRedefine復寫改動:
繼承MessageToByteEncoder方案
public class ProtobufFixed32LengthFieldPrependerRedefine extends MessageToByteEncoder<ByteBuf> {

    public ProtobufFixed32LengthFieldPrependerRedefine() {
    }

    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = 4;
        out.ensureWritable(headerLen + bodyLen);  //前4個字節+數據長度
        writeRawVarint32(out, bodyLen); //把body的長度寫到前四個字節,int轉為網絡需
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }

    static void writeRawVarint32(ByteBuf out, int value) {

        byte[] frontBytes = intToBytes(value); //int轉為網絡序

        out.writeBytes(frontBytes);
    }

    //寫入的時候,把 int 轉化為網絡序
    public static byte[] intToBytes(int n) {
        byte[] b = new byte[4];
        b[3] = (byte) (n & 0xff);
        b[2] = (byte) (n >> 8 & 0xff);
        b[1] = (byte) (n >> 16 & 0xff);
        b[0] = (byte) (n >> 24 & 0xff);
        return b;
    }

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM