一,為什么TCP會有粘包和拆包的問題
粘包:TCP發送方發送多個數據包,接收方收到數據時這幾個數據包粘成了一個包,從接收緩沖區看,后一包數據的頭緊接着前一包數據的尾,接收方必需根據協議將這幾個數據包分離出來才能得到正確的數據。
為什么會發生粘包,從幾個方面來看:
1,TCP是基於字節流的,TCP的報文沒有規划數據長度,發送端和接收端從緩存中取數據,應用程序對於消息的長度是不可見的,不知道數據流應該從什么地方開始,從什么地方結束。
2,發送方:TCP默認啟用了Negal算法,優化數據流的發送效率,Nagle算法主要做兩件事:1)只有上一個分組得到確認,才會發送下一個分組;2)收集多個小分組,在一個確認到來時一起發送。
3,接收方:TCP將收到的分組保存至接收緩存里,然后應用程序主動從緩存里讀收。這樣一來,如果TCP接收的速度大於應用程序讀的速度,多個包就會被存至緩存,應用程序讀時,就會讀到多個首尾相接粘到一起的包。
二,Dotnetty項目
Dotnetty監聽端口,啟用管道處理器
public class NettyServer
{
public async System.Threading.Tasks.Task RunAsync(int[] port)
{
IEventLoopGroup bossEventLoop = new MultithreadEventLoopGroup(port.Length);
IEventLoopGroup workerLoopGroup = new MultithreadEventLoopGroup();
try
{
ServerBootstrap boot = new ServerBootstrap();
boot.Group(bossEventLoop, workerLoopGroup)
.Channel<TcpServerSocketChannel>()
.Option(ChannelOption.SoBacklog, 100)
.ChildOption(ChannelOption.SoKeepalive, true)
.Handler(new LoggingHandler("netty server"))
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
Console.WriteLine(ip.Port);
channel.Pipeline.AddLast(new NettyServerHandler());
}));
List<IChannel> list = new List<IChannel>();
foreach(var item in port)
{
IChannel boundChannel = await boot.BindAsync(item);
list.Add(boundChannel);
}
Console.WriteLine("按任意鍵退出");
Console.ReadLine();
list.ForEach(r =>
{
r.CloseAsync();
});
//await boundChannel.CloseAsync();
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
await bossEventLoop.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
await workerLoopGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
}
}
管理處理器,簡單打印一下收到的內容
public class NettyServerHandler: ChannelHandlerAdapter
{
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is IByteBuffer buffer)
{
StringBuilder sb = new StringBuilder();
while (buffer.IsReadable())
{
sb.Append(buffer.ReadByte().ToString("X2"));
}
Console.WriteLine($"RECIVED FROM CLIENT : {sb.ToString()}");
}
}
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
入口方法啟動服務,監聽四個端口
static void Main(string[] args)
{
new NettyServer().RunAsync(new int[] { 9001 ,9002,9003,9004}).Wait();
}
二,在應用層解決TCP的粘包問題
要在應用層解決粘包問題,只需要讓程序知道數據流什么時候開始,什么時候結束。按常用協議規划,可以有以下四種方案:
1,定長協議解碼。每次發送的數據包長度為固定的,這種協議最簡單,但沒有靈活性。
粘包處理:根據固定的長度處理。如協議長度為4,發送方發送數據包 FF 00 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA
拆包處理:根據固定的長度處理。如協議長度為4,發送方發送二個數據包 FF 00, 00 FF 00 FF 00 AA會被解析成二包:FF 00 00 FF及 FF 00 00 AA
Dotnetty實現:監聽9001端口,處理室長協議數據包
新建一個管理處理器,當緩存中的可讀長度達到4,處理數據包,管道向下執行,否則不處理。
public class KsLengthfixedHandler : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
if (input.ReadableBytes < 12) {
return; // (3)
}
output.Add(input.ReadBytes(12)); // (4)
}
}
在啟動服務器是根據端口號加入不同的管道處理器
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
Console.WriteLine(ip.Port);
if (ip.Port == 9001)
{
channel.Pipeline.AddLast(new KsLengthfixedHandler());
}
channel.Pipeline.AddLast(new NettyServerHandler());
}
2,行解碼,即每個數據包的結尾都是/r/n或者/n(換成16進制為0d0a)。
粘包處理:根據換行符做分隔處理。如發送方發送一包數據 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
拆包處理:根據換行符做分隔處理。如發送方發送二包數據 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
Dotnetty實現:監聽9002端口,處理行結尾協議數據包
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
Console.WriteLine(ip.Port);
if (ip.Port == 9001)
{
channel.Pipeline.AddLast(new KsLengthfixedHandler());
}
else if (ip.Port == 9002)
{
channel.Pipeline.AddLast(new LineBasedFrameDecoder(
maxLength: 1024, //可接收數據包最大長度
stripDelimiter: true, //解碼后的數據包是否去掉分隔符
failFast: false //是否讀取超過最大長度的數據包內容
));
}
channel.Pipeline.AddLast(new NettyServerHandler());
}
3,特定的分隔符解碼。與行解碼規定了以換行符做分隔符不同,這個解碼方案可以自己定義分隔符。
粘包處理:根據自定義的分隔符做分隔處理。如發送方發送一包數據 FF 00 00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
拆包處理:根據自定義的分隔符做分隔處理。如發送方發送二包數據 FF 00以及00 FF 0D 0A FF 00 00 AA 0D 0A則將會解析成兩包:FF 00 00 FF以及FF 00 00 AA
下面的實例是自定義了一個以“}”為分隔符的解碼器
Dotnetty實現:監聽9003端口,處理自定義分隔符協議數據包
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
Console.WriteLine(ip.Port);
if (ip.Port == 9001)
{
channel.Pipeline.AddLast(new KsLengthfixedHandler());
}
else if (ip.Port == 9002)
{
channel.Pipeline.AddLast(new LineBasedFrameDecoder(
maxLength: 1024, //可接收數據包最大長度
stripDelimiter: true, //解碼后的數據包是否去掉分隔符
failFast: false //是否讀取超過最大長度的數據包內容
));
}
else if (ip.Port == 9003)
{
IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
maxFrameLength: 1024,
stripDelimiter: true,
failFast: false,
delimiter: delimiter));
}
channel.Pipeline.AddLast(new NettyServerHandler());
}
4,指定長度標識。與第一種方案的固定長度不同,這種方案可以指定一個位置存放該數據包的長度,以便程序推算出開始及結束位置。Modbus協議是典型的變長協議。
Dotnetty中使用LengthFieldBasedFrameDecoder解碼器對變長協議解析,了解下以下幾個參數:
* +------+--------+------+----------------+ * | HDR1 | Length | HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+
1,maxFrameLength:數據包最大長度
2,lengthFieldOffset:長度標識的偏移量。如上面的協議,lengthFieldOffset為1(HDR1的長度)
3,lengthFieldLength:長度標識位的長度。如上面的協議,lengthFieldLength為2(Length的長度)
4,lengthAdjustment:調整長度。如上面的協議,0x000c轉為10進制為12,只標識了Content的長度,並不包括HDR2的長度,在解析時就要設置該默值為HDR2的長度。
5,initialBytesToStrip:從何處開始剝離。如上面的協議,如果想要的解析結果為:0xFE "HELLO, WORLD" 則將initialBytesToStrip設置為3(HDR1的長度+Length的長度)。
Dotnetty實現:監聽9004端口處理變長協議
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => {
IPEndPoint ip = (IPEndPoint)channel.LocalAddress;
Console.WriteLine(ip.Port);
if (ip.Port == 9001)
{
channel.Pipeline.AddLast(new KsLengthfixedHandler());
}
else if (ip.Port == 9002)
{
channel.Pipeline.AddLast(new LineBasedFrameDecoder(
maxLength: 1024, //可接收數據包最大長度
stripDelimiter: true, //解碼后的數據包是否去掉分隔符
failFast: false //是否讀取超過最大長度的數據包內容
));
}
else if (ip.Port == 9003)
{
IByteBuffer delimiter = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("}"));
channel.Pipeline.AddLast(new DotNetty.Codecs.DelimiterBasedFrameDecoder(
maxFrameLength: 1024,
stripDelimiter: true,
failFast: false,
delimiter: delimiter));
}
else if (ip.Port == 9004)
{
channel.Pipeline.AddLast(new LengthFieldBasedFrameDecoder(
maxFrameLength: 1024,
lengthFieldOffset: 1,
lengthFieldLength: 2));
}
channel.Pipeline.AddLast(new NettyServerHandler());
}));
