使用Dotnetty解決粘包問題


一,為什么TCP會有粘包和拆包的問題

粘包:TCP發送方發送多個數據包,接收方收到數據時這幾個數據包粘成了一個包,從接收緩沖區看,后一包數據的頭緊接着前一包數據的尾,接收方必需根據協議將這幾個數據包分離出來才能得到正確的數據。

 為什么會發生粘包,從幾個方面來看:

1,TCP是基於字節流的,TCP的報文沒有規划數據長度,發送端和接收端從緩存中取數據,應用程序對於消息的長度是不可見的,不知道數據流應該從什么地方開始,從什么地方結束。

2,發送方:TCP默認啟用了Negal算法,優化數據流的發送效率,Nagle算法主要做兩件事:1)只有上一個分組得到確認,才會發送下一個分組;2)收集多個小分組,在一個確認到來時一起發送。

3,接收方:TCP將收到的分組保存至接收緩存里,然后應用程序主動從緩存里讀收。這樣一來,如果TCP接收的速度大於應用程序讀的速度,多個包就會被存至緩存,應用程序讀時,就會讀到多個首尾相接粘到一起的包。

二,Dotnetty項目

Dotnetty監聽端口,啟用管道處理器

public class NettyServer
    {
        public async System.Threading.Tasks.Task RunAsync(int[] port)
        {
            IEventLoopGroup bossEventLoop = new MultithreadEventLoopGroup(port.Length);
            IEventLoopGroup workerLoopGroup = new MultithreadEventLoopGroup();
            try
            {
                ServerBootstrap boot = new ServerBootstrap();
                boot.Group(bossEventLoop, workerLoopGroup)
                    .Channel<TcpServerSocketChannel>()
                    .Option(ChannelOption.SoBacklog, 100)
                    .ChildOption(ChannelOption.SoKeepalive, true)
                    .Handler(new LoggingHandler("netty server"))
                    .ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        channel.Pipeline.AddLast(new NettyServerHandler());
                    }));
                List<IChannel> list = new List<IChannel>();
                foreach(var item in port)
                {
                    IChannel boundChannel = await boot.BindAsync(item);
                    list.Add(boundChannel);
                }
                Console.WriteLine("按任意鍵退出");
                Console.ReadLine();
                list.ForEach(r =>
                {
                    r.CloseAsync();
                });
                //await boundChannel.CloseAsync();
            }
            catch(Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                await bossEventLoop.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
                await workerLoopGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
            }
        }
    }

 管理處理器,簡單打印一下收到的內容

 public class NettyServerHandler: ChannelHandlerAdapter
    {
        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            if (message is IByteBuffer buffer)
            {
                StringBuilder sb = new StringBuilder();
                while (buffer.IsReadable())
                {
                    sb.Append(buffer.ReadByte().ToString("X2"));
                }
                Console.WriteLine($"RECIVED FROM CLIENT : {sb.ToString()}");
            }
        }
        public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
        {
            Console.WriteLine("Exception: " + exception);
            context.CloseAsync();
        }
    }

  入口方法啟動服務,監聽四個端口

static void Main(string[] args)
        {
            new NettyServer().RunAsync(new int[] { 9001 ,9002,9003,9004}).Wait();
        }

二,在應用層解決TCP的粘包問題

要在應用層解決粘包問題,只需要讓程序知道數據流什么時候開始,什么時候結束。按常用協議規划,可以有以下四種方案:

1,定長協議解碼。每次發送的數據包長度為固定的,這種協議最簡單,但沒有靈活性。

粘包處理:根據固定的長度處理。如協議長度為4,發送方發送數據包 FF 00 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA

拆包處理:根據固定的長度處理。如協議長度為4,發送方發送二個數據包 FF 00, 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA

Dotnetty實現:監聽9001端口,處理室長協議數據包

新建一個管理處理器,當緩存中的可讀長度達到4,處理數據包,管道向下執行,否則不處理。

  public class KsLengthfixedHandler : ByteToMessageDecoder
    {
        protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
        {
            if (input.ReadableBytes < 12) {
                return; // (3)
            }
            output.Add(input.ReadBytes(12)); // (4)
        }
    }

在啟動服務器是根據端口號加入不同的管道處理器

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

2,行解碼,即每個數據包的結尾都是/r/n或者/n(換成16進制為0d0a)。  

粘包處理:根據換行符做分隔處理。如發送方發送一包數據 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

拆包處理:根據換行符做分隔處理。如發送方發送二包數據 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

Dotnetty實現:監聽9002端口,處理行結尾協議數據包

 

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收數據包最大長度
                                stripDelimiter: true, //解碼后的數據包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的數據包內容
                                ));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

3,特定的分隔符解碼。與行解碼規定了以換行符做分隔符不同,這個解碼方案可以自己定義分隔符。

粘包處理:根據自定義的分隔符做分隔處理。如發送方發送一包數據 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

拆包處理:根據自定義的分隔符做分隔處理。如發送方發送二包數據 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA

下面的實例是自定義了一個以“}”為分隔符的解碼器

Dotnetty實現:監聽9003端口,處理自定義分隔符協議數據包

.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收數據包最大長度
                                stripDelimiter: true, //解碼后的數據包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的數據包內容
                                ));
                        }
                        else if (ip.Port == 9003)
                        {
                            IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
                            channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
                               maxFrameLength: 1024,
                               stripDelimiter: true,
                               failFast: false, 
                              delimiter:  delimiter));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
}

4,指定長度標識。與第一種方案的固定長度不同,這種方案可以指定一個位置存放該數據包的長度,以便程序推算出開始及結束位置。Modbus協議是典型的變長協議。

Dotnetty中使用LengthFieldBasedFrameDecoder解碼器對變長協議解析,了解下以下幾個參數:

  * +------+--------+------+----------------+
  * | HDR1 | Length | HDR2 | Actual Content |
  * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |
  * +------+--------+------+----------------+   

1,maxFrameLength:數據包最大長度

2,lengthFieldOffset:長度標識的偏移量。如上面的協議,lengthFieldOffset為1(HDR1的長度)

3,lengthFieldLength:長度標識位的長度。如上面的協議,lengthFieldLength為2(Length的長度)

4,lengthAdjustment:調整長度。如上面的協議,0x000c轉為10進制為12,只標識了Content的長度,並不包括HDR2的長度,在解析時就要設置該默值為HDR2的長度。

5,initialBytesToStrip:從何處開始剝離。如上面的協議,如果想要的解析結果為:0xFE  "HELLO, WORLD" 則將initialBytesToStrip設置為3(HDR1的長度+Length的長度)。

Dotnetty實現:監聽9004端口處理變長協議

  .ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
                        IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
                        Console.WriteLine(ip.Port);
                        if (ip.Port == 9001)
                        {
                            channel.Pipeline.AddLast(new KsLengthfixedHandler());
                        } 
                        else if (ip.Port == 9002)
                        {
                            channel.Pipeline.AddLast(new LineBasedFrameDecoder(
                                maxLength: 1024, //可接收數據包最大長度
                                stripDelimiter: true, //解碼后的數據包是否去掉分隔符
                                failFast: false //是否讀取超過最大長度的數據包內容
                                ));
                        }
                        else if (ip.Port == 9003)
                        {
                            IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
                            channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
                               maxFrameLength: 1024,
                               stripDelimiter: true,
                               failFast: false, 
                              delimiter:  delimiter));
                        }
                        else if (ip.Port == 9004)
                        {
                            channel.Pipeline.AddLast(new LengthFieldBasedFrameDecoder(
                               maxFrameLength: 1024,
                               lengthFieldOffset: 1, 
                               lengthFieldLength: 2));
                        }
                        channel.Pipeline.AddLast(new NettyServerHandler());
                    }));

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM