通過大量實戰案例分解Netty中是如何解決拆包黏包問題的?


圖怪獸_d3c9b823ca0faaff4827def1bc596a37_83299

TCP傳輸協議是基於數據流傳輸的,而基於流化的數據是沒有界限的,當客戶端向服務端發送數據時,可能會把一個完整的數據報文拆分成多個小報文進行發送,也可能將多個報文合並成一個大報文進行發送。

在這樣的情況下,有可能會出現圖3-1所示的情況。

  • 服務端恰巧讀到了兩個完整的數據包 A 和 B,沒有出現拆包/粘包問題;
  • 服務端接收到 A 和 B 粘在一起的數據包,服務端需要解析出 A 和 B;
  • 服務端收到完整的 A 和 B 的一部分數據包 B-1,服務端需要解析出完整的 A,並等待讀取完整的 B 數據包;
  • 服務端接收到 A 的一部分數據包 A-1,此時需要等待接收到完整的 A 數據包;
  • 數據包 A 較大,服務端需要多次才可以接收完數據包 A。

image-20210816220231161

圖3-1 粘包和拆包問題

由於存在拆包/粘包問題,接收方很難界定數據包的邊界在哪里,所以可能會讀取到不完整的數據導致數據解析出現問題。

拆包粘包問題實戰

下面演示一個拆包粘包問題

PackageNettyServer

public class PackageNettyServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new SimpleServerHandler());
                        }
                    });
            ChannelFuture channelFuture=serverBootstrap.bind(8080).sync(); //綁定端口
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

SimpleServerHandler

public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in=(ByteBuf) msg;
        byte[] buffer=new byte[in.readableBytes()]; //長度為可讀的字節數
        in.readBytes(buffer); //讀取到字節數組中
        String message=new String (buffer,"UTF-8");
        System.out.println("服務端收到的消息內容:"+message+"\n服務端收到的消息數量"+(++count));
        ByteBuf resBB= Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("utf-8"));
        ctx.writeAndFlush(resBB);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();//關閉連接
    }
}

PackageNettyClient

public class PackageNettyClient {

    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new SimpleClientHandler());
                    }
                });
            ChannelFuture channelFuture=bootstrap.connect("localhost",8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客戶端和服務端成功建立連接");
        //客戶端和服務端建立連接后,發送十次消息給服務端
        for (int i = 0; i < 10; i++) {
            ByteBuf buf= Unpooled.copiedBuffer("客戶端消息"+i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buf);
        }
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服務端發過來的消息
        System.out.println("接收到服務端返回的信息");
        ByteBuf buf=(ByteBuf)msg;
        byte[] buffer=new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message=new String(buffer,Charset.forName("utf-8"));
        System.out.println("客戶端收到的消息內容為:"+message);
        System.out.println("客戶端收到的消息數量為:"+(++count));
        super.channelRead(ctx, msg);
    }
}

運行上述案例后,會出現粘包和拆包問題。

應用層定義通信協議

如何解決拆包和粘包問題呢?

一般我們會在應用層定義通信協議。其實思想也很簡單,就是通信雙方約定一個通信報文協議,服務端收到報文之后,按照約定的協議進行解碼,從而避免出現粘包和拆包問題。

其實大家把這個問題往深度思考一下就不難發現,之所以在拆包粘包之后導致收到消息端的內容解析出現錯誤,是因為程序無法識別一個完整消息,也就是不知道如何把拆包之后的消息組合成一個完整消息,以及將粘包的數據按照某個規則拆分形成多個完整消息。所以基於這個角度思考,我們只需要針對消息做一個通信雙方約定的識別規則即可。

消息長度固定

每個數據報文都需要一個固定的長度,當接收方累計讀取到固定長度的報文后,就認為已經獲得了一個完整的消息,當發送方的數據小於固定長度時,則需要空位補齊.

如圖3-2所示,假設我們固定消息長度是4,那么沒有達到長度的報文,需要通過一個空位來補齊,從而使得消息能夠形成一個整體。

image-20210817141908905

圖3-2

這種方式很簡單,但是缺點也很明顯,對於沒有固定長度的消息,不清楚如何設置長度,而且如果長度設置過大會造成字節浪費,長度太小又會影響消息傳輸,所以一般情況下不會采用這種方式。

特定分隔符

既然沒辦法通過固定長度來分割消息,那能不能在消息報文中增加一個分割符呢?然后接收方根據特定的分隔符來進行消息拆分。比如我們采用\r\n來進行分割,如圖3-3所示。

image-20210817142341684

圖3-3

對於特定分隔符的使用場景中,需要注意分隔符和消息體中的字符不要存在沖突,否則會出現消息拆分錯誤的問題。

消息長度加消息內容加分隔符

基於消息長度+消息內容+分隔符的方式進行數據通信,這個之前大家在Redis中學習過,redis的報文協議定義如下。

*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nmic

可以發現消息報文包含三個維度

  • 消息長度
  • 消息分隔符
  • 消息內容

這種方式在項目中是非常常見的協議,首先通過消息頭中的總長度來判斷當前一個完整消息所攜帶的參數個數。然后在消息體中,再通過消息內容長度以及消息體作為一個組合,最后通過\r\n進行分割。服務端收到這個消息后,就可以按照該規則進行解析得到一個完整的命令進行執行。

Zookeeper中的消息協議

在Zookeeper中使用了Jute協議,這是zookeeper自定義消息協議,請求協議定義如圖3-4所示。

xid用於記錄客戶端請求發起的先后序號,用來確保單個客戶端請求的響應順序。type代表請求的操作類型,常見的包括創建節點、刪除節點和獲取節點數據等。
協議的請求體部分是指請求的主體內容部分,包含了請求的所有操作內容。不同的請求類型,其請求體部分的結構是不同的。

img
圖3-4

響應協議定義如圖3-5所示。

協議的響應頭中的xid和上文中提到的請求頭中的xid是一致的,響應中只是將請求中的xid原值返回。zxid代表ZooKeeper服務器上當前最新的事務ID。err則是一個錯誤碼,當請求處理過程中出現異常情況時,會在這個錯誤碼中標識出來。協議的響應體部分是指響應的主體內容部分,包含了響應的所有返回數據。不同的響應類型,其響應體部分的結構是不同的。

img
圖3-5

Netty中的編解碼器

在Netty中,默認幫我們提供了一些常用的編解碼器用來解決拆包粘包的問題。下面簡單演示幾種解碼器的使用。

FixedLengthFrameDecoder解碼器

固定長度解碼器FixedLengthFrameDecoder的原理很簡單,就是通過構造方法設置一個固定消息大小frameLength,無論接收方一次收到多大的數據,都會嚴格按照frameLength進行解碼。

如果累計讀取的長度大小為frameLength的消息,那么解碼器會認為已經獲取到了一個完整的消息,如果消息長度小於frameLength,那么該解碼器會一直等待后續數據包的達到,知道獲得指定長度后返回。

使用方法如下,在3.3節中演示的代碼的Server端,增加一個FixedLengthFrameDecoder,長度為10。

ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
                .addLast(new FixedLengthFrameDecoder(10)) //增加解碼器
                .addLast(new SimpleServerHandler());
        }
    });

DelimiterBasedFrameDecoder解碼器

特殊分隔符解碼器: DelimiterBasedFrameDecoder,它有以下幾個屬性

  • delimiters,delimiters指定特殊分隔符,參數類型是ByteBuf,ByteBuf可以傳遞一個數組,意味着我們可以同時指定多個分隔符,但最終會選擇長度最短的分隔符進行拆分。

    比如接收方收到的消息體為

    hello\nworld\r\n

    此時指定多個分隔符\n\r\n,那么最終會選擇最短的分隔符解碼,得到如下數據

    hello | world |

  • maxLength,表示報文的最大長度限制,如果超過maxLength還沒檢測到指定分隔符,將會拋出TooLongFrameException。

  • failFast,表示容錯機制,它與maxLength配合使用。如果failFast=true,當超過maxLength后會立刻拋出TooLongFrameException,不再進行解碼。如果failFast=false,那么會等到解碼出一個完整的消息后才會拋出TooLongFrameException

  • stripDelimiter,它的作用是判斷解碼后的消息是否去除分隔符,如果stripDelimiter=false,而制定的特定分隔符是\n,那么數據解碼的方式如下。

    hello\nworld\r\n

    當stripDelimiter=false時,解碼后得到

    hello\n | world\r\n

DecoderNettyServer

下面演示一下DelimiterBasedFrameDecoder的用法。

public class DecoderNettyServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ByteBuf delimiter= Unpooled.copiedBuffer("&".getBytes());
                            ch.pipeline()
                                    .addLast(new DelimiterBasedFrameDecoder(10,true,true,delimiter))
                                    .addLast(new PrintServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); //綁定端口
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {

        }
    }
}

PrintServerHandler

定義一個普通的Inbound,打印接收到的數據。

public class PrintServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf=(ByteBuf)msg;
        System.out.println("Receive client Msg:"+buf.toString(CharsetUtil.UTF_8));
    }
}

演示方法

  • 進入到cmd的命令窗口,執行telnet localhost 8080 回車
  • 在telnet窗口按下 Ctrl+]組合鍵,進入到一個telnet界面
  • 在該界面繼續按回車,進入到一個新的窗口,這個時候可以開始輸入字符,此時的命令窗口會帶有數據回寫。
  • 開始輸入字符 hello&world ,就可以看到演示效果

LengthFieldBasedFrameDecoder解碼器

LengthFieldBasedFrameDecoder是長度域解碼器,它是解決拆包粘包最常用的解碼器,基本上能覆蓋大部分基於長度拆包的場景。其中開源的消息中間件RocketMQ就是使用該解碼器進行解碼的。

首先來說明一下該解碼器的核心參數

  • lengthFieldOffset,長度字段的偏移量,也就是存放長度數據的起始位置
  • lengthFieldLength,長度字段鎖占用的字節數
  • lengthAdjustment,在一些較為復雜的協議設計中,長度域不僅僅包含消息的長度,還包含其他數據比如版本號、數據類型、數據狀態等,這個時候我們可以使用lengthAdjustment進行修正,它的值=包體的長度值-長度域的值
  • initialBytesToStrip,解碼后需要跳過的初始字節數,也就是消息內容字段的起始位置
  • lengthFieldEndOffset,長度字段結束的偏移量, 該屬性的值=lengthFieldOffset+lengthFieldLength

上面這些參數理解起來比較難,我們通過幾個案例來說明一下。

消息長度+消息內容的解碼

假設存在圖3-6所示的由長度和消息內容組成的數據包,其中length表示報文長度,用16進制表示,共占用2個字節,那么該協議對應的編解碼器參數設置如下。

  • lengthFieldOffset=0, 因為Length字段就在報文的開始位置
  • lengthFieldLength=2,協議設計的固定長度為2個字節
  • lengthAdjustment=0,Length字段質保函消息長度,不需要做修正
  • initialBytesToStrip=0,解碼內容是Length+content,不需要跳過任何初始字節。

image-20210817161855726

圖3-6

截斷解碼結果

如果我們希望解碼后的結果中只包含消息內容,其他部分不變,如圖3-7所示。對應解碼器參數組合如下

  • lengthFieldOffset=0,因為Length字段就在報文開始位置
  • lengthFieldLength=2 , 協議設計的固定長度
  • lengthAdjustment=0, Length字段只包含消息長度,不需要做任何修正
  • initialBytesToStrip=2, 跳過length字段的字節長度,解碼后ByteBuf只包含Content字段。

image-20210817163346231

圖3-7

長度字段包含消息內容

如圖3-8所示,如果Length字段中包含Length字段自身的長度以及Content字段所占用的字節數,那么Length的值為0x00d(2+11=13字節),在這種情況下解碼器的參數組合如下

  • lengthFieldOffset=0,因為Length字段就在報文開始的位置
  • lengthFieldLength=2,協議設計的固定長度
  • lengthAdjustment=-2,長度字段為13字節,需要減2才是拆包所需要的長度。
  • initialBytesToStrip=0,解碼后內容依然是Length+Content,不需要跳過任何初始字節

image-20210817185158510

圖3-8

基於長度字段偏移的解碼

如圖3-9所示,Length字段已經不再是報文的起始位置,Length字段的值是0x000b,表示content字段占11個字節,那么此時解碼器的參數配置如下:

  • lengthFieldOffset=2,需要跳過Header所占用的2個字節,才是Length的起始位置
  • lengthFieldLength=2,協議設計的固定長度
  • lengthAdjustment=0,Length字段只包含消息長度,不需要做任何修正
  • initialBytesToStrip=0,解碼后內容依然是Length+Content,不需要跳過任何初始字節

image-20210817190301211

圖3-9

基於長度偏移和長度修正解碼

如圖3-10所示,Length字段前后分別有hdr1和hdr2字段,各占據1個字節,所以需要做長度字段的便宜,還需要做lengthAdjustment的修正,相關參數配置如下。

  • lengthFieldOffset=1,需要跳過hdr1所占用的1個字節,才是Length的起始位置
  • lengthFieldLength=2,協議設計的固定長度
  • lengthAdjustment=1,由於hdr2+content一共占了1+11=12字節,所以Length字段值(11字節)加上lengthAdjustment(1)才能得到hdr2+Content的內容(12字節)
  • initialBytesToStrip=3,解碼后跳過hdr1和length字段,共3個字節

image-20210817191318391

圖3-10

解碼器實戰

比如我們定義如下消息頭,客戶端通過該消息協議發送數據,服務端收到該消息后需要進行解碼

image-20210817201545060

先定義客戶端,其中Length部分,可以使用Netty自帶的LengthFieldPrepender來實現,它可以計算當前發送消息的二進制字節長度,然后把該長度添加到ByteBuf的緩沖區頭中。

public class LengthFieldBasedFrameDecoderClient {

    public static void main(String[] args) {
        EventLoopGroup workGroup=new NioEventLoopGroup();
        Bootstrap b=new Bootstrap();
        b.group(workGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        //如果協議中的第一個字段為長度字段,
                        // netty提供了LengthFieldPrepender編碼器,
                        // 它可以計算當前待發送消息的二進制字節長度,將該長度添加到ByteBuf的緩沖區頭中
                        .addLast(new LengthFieldPrepender(2,0,false))
                        //使用StringEncoder,在通過writeAndFlush時,不需要自己轉化成ByteBuf
                        //StringEncoder會自動做這個事情
                        .addLast(new StringEncoder())
                        .addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                ctx.writeAndFlush("i am request!");
                                ctx.writeAndFlush("i am a another request!");
                            }
                        });
                }
            });
        try {
            ChannelFuture channelFuture=b.connect("localhost",8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            workGroup.shutdownGracefully();
        }
    }
}

上述代碼運行時,會得到兩個報文。

image-20210817202039956

下面是Server端的代碼,增加了LengthFieldBasedFrameDecoder解碼器,其中有兩個參數的值如下

  • lengthFieldLength:2 , 表示length所占用的字節數為2

  • initialBytesToStrip: 2 , 表示解碼后跳過length的2個字節,得到content內容

public class LengthFieldBasedFrameDecoderServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,2))
                                    .addLast(new StringDecoder())
                                    .addLast(new ChannelInboundHandlerAdapter(){
                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            System.out.println("receive message:"+msg);
                                        }
                                    });
                        }
                    });
            ChannelFuture channelFuture=serverBootstrap.bind(8080).sync(); //綁定端口
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

總結

前面我們分析的幾個常用解碼器,只是幫我們解決了半包和粘包的問題,最終會讓接受者收到一個完整有效的請求報文並且封裝到ByteBuf中, 而這個報文內容是否有其他的編碼方式,比如序列化等,還需要單獨進行解析處理。

另外,很多的中間件,都會定義自己的報文協議,這些報文協議除了本身解決粘包半包問題以外,還會傳遞一些其他有意義的數據,比如zookeeper的jute、dubbo框架的dubbo協議等。

版權聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 4.0 許可協議。轉載請注明來自 Mic帶你學架構
如果本篇文章對您有幫助,還請幫忙點個關注和贊,您的堅持是我不斷創作的動力。歡迎關注「跟着Mic學架構」公眾號公眾號獲取更多技術干貨!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM