使用Netty如何解決拆包粘包的問題


首先,我們通過一個DEMO來模擬TCP的拆包粘包的情況:客戶端連續向服務端發送100個相同消息。服務端的代碼如下:

AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                        ByteBuf byteBuf = (ByteBuf) msg;
                        long l = count.incrementAndGet();
                        System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
                    }
                });
            }
        });
serverBootstrap.bind(8080);

客戶端代碼如下:

NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
        .group(nioEventLoopGroup)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast(
                        new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                for (int i = 0; i < 1000; i++) {
                                    byte[] bytes = "歡迎關注我,微信公眾號:元本一兀!".getBytes(StandardCharsets.UTF_8);
                                    ByteBuf buffer = ctx.alloc().buffer();
                                    buffer.writeBytes(bytes);

                                    ctx.channel().writeAndFlush(buffer);
                                }
                            }
                        }
                );
            }
        });
bootstrap.connect("localhost", 8080);

運行結果如下:

首先,我們發了1000個消息,但是在服務端有49行輸出,同時,有些消息是合並在一起的,有些消息解析出了亂碼。上面的輸出中,包含三種情況:

  1. 正確的結果輸出
  2. 多個消息拼在一起,也就是粘包情況;
  3. 另一種情況是半包,導致包不完成,解析出來的數據是亂碼

為什么會粘包、拆包?

這個是因為Netty底層是走的TCP協議,說白了傳輸的是就是字節流,消息與消息之間是沒有邊界的。發生TCP粘包拆包的原因主要有:

  1. 當連續發送數據時,由於TCP協議的nagle算法,會將較小的內容拼接成較大的包一次性發送到服務器端,因而導致粘包;
  2. 當發送的內容較大時,由於服務器端的recv(buffer_size)方法中buffer_size較小,不能一次性讀完所有數據,從而導致一個消息分拆成多次讀取,產生非拆包的情況。

本質上來講,TCP協議的包並不是按照業務消息來拆分的,TCP層並不感知發送的消息的大小。

解決粘包拆包的方法

解決粘包拆包的思路,其實就是在接收數據的時候,將字節流拆分成完整的包:

  1. 如果當前讀到的數據不是一個完整的業務數據包,則繼續從TCP緩沖區中讀取數據,知道讀到的數據中包含完整的數據包;
  2. 如果檔次讀取到的數據加上內存中已有的數據,包含一個完整的業務數據包,則將完整的業務包拼成一個包,並返回應用層處理;對於多余的數據,仍保留在內存中,待和后續加載的數據合並處理。

Netty中提供了一些拆包器,能夠滿足大部分的使用場景:

  1. FixedLengthFrameDecoder
  2. LineBasedFrameDecoder
  3. DelimiterBasedFrameDecoder
  4. LengthFieldBasedFrameDecoder

Netty中常用的拆包器

定長拆包器-FixedLengthFrameDecoder

如果你的業務消息格式很簡單,是固定長度的,則使用該拆包器很方便。
比如上面的代碼,發送的數據是固定的51個字節,我們在服務端的pipeline中加上定長拆包器:

AtomicLong count = new AtomicLong(0);
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap()
        .group(boss, worker)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast(new FixedLengthFrameDecoder(51)).addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                        ByteBuf byteBuf = (ByteBuf) msg;
                        long l = count.incrementAndGet();
                        System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
                    }
                });
            }
        });
serverBootstrap.bind(8080);

結果如下:

可以看到,服務端收到了1000個完整的獨立的包。

行拆包器-LineBasedFrameDecoder

這個拆包器拆包的邏輯就是按行拆分,發送端每個數據之間用換行符作為分隔符,接收端通過也會按照換行符將字節流拆分成業務消息。
修改一下的上面的客戶端,消息后追加一個\r\n

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 0; i < 1000; i++) {
        byte[] bytes = ("歡迎關注我,微信公眾號:元本一兀!" + i+"\r\n").getBytes(StandardCharsets.UTF_8);
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);

        ctx.channel().writeAndFlush(buffer);
        
    }
}

接收數據段,添加行拆包器:

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
            .addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                    ByteBuf byteBuf = (ByteBuf) msg;
                    long l = count.incrementAndGet();
                    System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
                }
            });
}

運行后,我們可以看到接收端能夠接收到1000個完整的包。

基於分隔符的拆包器-DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder允許指定一個分隔符,在收到消息的時候按照指定的分隔符進行拆包。其實上面說的LineBasedFrameDecoder是一個特定的分隔符拆包器,它指定的是使用換行符作為分隔符,下面使用DelimiterBasedFrameDecoder來實現行拆包器:

ch.pipeline().addLast(
        new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, delimiterLine, delimiterSharp))
        .addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ByteBuf byteBuf = (ByteBuf) msg;
                long l = count.incrementAndGet();
                System.out.println(l + ": " + byteBuf.toString(StandardCharsets.UTF_8));
            }
        });

基於分隔符的拆包器允許設置多個分隔符,在設置多個分隔符的情況下,會將包拆分成最小的滿足分隔符的包。

基於長度域的拆包器-LengthFieldBasedFrameDecoder

最后一種拆包器是通用性最強的一種拆包器,只要我們協議的中有一個固定的區域來表示數據長度,就可以方便的使用該拆包器。LengthFieldBaesdFrameDecoder有很多可配置的參數,用來應對各種情況的長度域。

長度域的offset為0

假設消息中長度域就在開頭,這種情況下不需要考慮長度域之前是否有其他內容,配置LengthFieldBasedFrameDecoder很簡單,設置offset為0,以及長度域的字節數。這里以長度域占2字節為例:

new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 2)

一個解析的例子:

  BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
  +--------+----------------+      +--------+----------------+
  | Length | Actual Content |----->| Length | Actual Content |
  | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
  +--------+----------------+      +--------+----------------+

長度域offset為0,去掉協議頭

上面的例子中,我們保留了協議頭,雖然這里的協議頭就有長度域。如果我們只想保留數據域,這里需要設置跳過的字節數為2字節:


new LengthFieldBasedFrameDecoder(
    Integer.MAX_VALE,  // maxFrameLength
    0,     // lengthFieldOffset
    2,     // lengthFieldLength
    0,     // lengthAdjustment
    2)     // initalBytesToStrip 跳過2字節,也就是跳過長度域

拆包示例:

 * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 * +--------+----------------+      +----------------+
 * | Length | Actual Content |----->| Actual Content |
 * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 * +--------+----------------+      +----------------+

長度域占2字節,offset為0,不跳過header,長度域表示所有消息的大小

在前面兩個例子,長度的長度表示的是長度之后的數據長度。這里我們考慮長度里面設置的長度表示的是整個消息的大小,包括頭部和數據部分。這種情況下我們需要制定lengthAdjustment,數據部分的長度為 長度域里的長度 - lengthAdjustment


new LengthFieldBasedFrameDecoder(
    Integer.MAX_VALE,  // maxFrameLength
    0,     // lengthFieldOffset
    2,     // lengthFieldLength
    2,     // lengthAdjustment 長度 - lengthAdjustment為數據部分的長度
    0)     // initalBytesToStrip 

拆包示例:

 * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 * +--------+----------------+      +--------+----------------+
 * | Length | Actual Content |----->| Length | Actual Content |
 * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 * +--------+----------------+      +--------+----------------+

HEADER中不只有長度域的情況

前面的case中,協議頭部只有長度域,但是更多的情況中,HEADER中不只有長度域。比如下面這個例子,HEADER部分有三部分,HDR1,Length,HDR2三部分,分別占1個字節,2個字節,1個字節。
在長度域之前還有HDR1,要定位到長度域,需要指定長度域的offset(lengthFieldOffset=1)。
這里長度域存的是所有的數據長度,如果我們希望拆包的結果中包含HDR2+Data兩部分,可以通過設置lengthAdjustment=-3,長度域之后的內容長度是HDR2+DATA。拆包的結果中,只想包含HDR2+DATA,所以整個消息跳過錢3個字節(HDR1+Lenght部分)。

 * +------+--------+------+----------------+      
 * | HDR1 | Length | HDR2 | Data           |
 * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+ 

代碼如下:


new LengthFieldBasedFrameDecoder(
    Integer.MAX_VALE,  // maxFrameLength
    3,     // lengthFieldOffset
    2,     // lengthFieldLength
    -3,     // lengthAdjustment 長度 - lengthAdjustment為數據部分的長度
    3)     // initalBytesToStrip 

解析結果:

 * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
 * +------+--------+------+----------------+      +------+----------------+
 * | HDR1 | Length | HDR2 | Data           |----->| HDR2 | Data           |
 * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
 * +------+--------+------+----------------+      +------+----------------+


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM