TCP粘包和拆包的基本介紹
- TCP是面向連接的, 面向流的, 提供可靠性服務, 收發兩端(客戶端和服務器端) 都有一一成對的Socket,因此發送端為了將多個發給接收端的包, 更有效的發給對方, 使用了優化算法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包, 這樣做雖然提高了效率,但是接收端就難於分辨出完整的數據包了,因為面向流的通信是無消息保護邊界的
- 由於TCP無消息保護邊界, 需要在接收端處理消息邊界問題, 也就是我們所說的粘包,拆包問題,看一張圖
- 示意圖TCP粘包,拆包圖解
對圖的說明
假設客戶端分別發送了兩個數據包D1和D2給服務端, 由於服務端一次讀取到字節數是不確定的,故有可能存在以下四種情況
- 服務端分別兩次讀取到了兩個獨立的數據包, 分別是D1 和 D2, 沒有粘包和拆包
- 服務端一次接收到了兩個數據包D1和D2粘在了一起,稱之為TCP粘包
- 服務端分兩次讀取到了數據包, 第一次讀取到了完整的D1包和D2包的部分內容, 第二次讀取到了D2包的剩余部分, 稱之為TCP拆包
- 服務器分兩次讀取到了數據包, 第一次讀取到了D1包的部分內容D1_1, 第二次讀取到了D1包的剩余部分D1_2, 和完整的D2包
TCP粘包和拆包現象實例
在編寫Netty程序時, 如果沒有做處理,就會發生粘包和拆包問題
看一個具體的實例
NettyServer
package com.dance.netty.netty.tcp; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.charset.StandardCharsets; import java.util.UUID; public class NettyServer { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync(); System.out.println("server is ready ......"); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } static class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); count++; System.out.println("服務器第"+count+"次接收到來自客戶端的數據:" + new String(bytes, StandardCharsets.UTF_8)); // 服務器回送數據給客戶端 回送隨機的UUID給客戶端 ctx.writeAndFlush(Unpooled.copiedBuffer(UUID.randomUUID().toString(),StandardCharsets.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } } }
NettyClient
package com.dance.netty.netty.tcp; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.StandardCharsets; public class NettyClient { public static void main(String[] args) { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } static class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 連續發送10條數據 for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server!" + i, StandardCharsets.UTF_8)); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); // 接收服務器的返回 count++; System.out.println("客戶端第"+count+"次接收服務端的回送:" + new String(bytes, StandardCharsets.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } } }
執行結果
Server
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. server is ready ...... 服務器第1次接收到來自客戶端的數據:hello,server!0hello,server!1hello,server!2hello,server!3hello,server!4hello,server!5hello,server!6hello,server!7hello,server!8hello,server!9 服務器第1次接收到來自客戶端的數據:hello,server!0 服務器第2次接收到來自客戶端的數據:hello,server!1 服務器第3次接收到來自客戶端的數據:hello,server!2hello,server!3hello,server!4 服務器第4次接收到來自客戶端的數據:hello,server!5hello,server!6 服務器第5次接收到來自客戶端的數據:hello,server!7hello,server!8hello,server!9
Client1
客戶端第1次接收服務端的回送:84653e99-0e7f-431d-a897-c215af959a3b
Client2
客戶端第1次接收服務端的回送:6f3b0e79-2f40-4066-bb6b-80f988ecec116b6bbd94-b345-46d6-8d36-a114534331a850628e04-ece1-4f58-b684-d30189f6cf26b2139027-6bda-4d40-9238-9fc0e59bc7a64b568ffe-f616-4f48-8f1c-05ecf3e817ee
分析:
服務器啟動后到server is ready ......
第一個客戶端啟動后 TCP將10次發送直接封包成一次直接發送,所以導致了服務器一次就收到了所有的數據,產生了TCP粘包,拆包的問題
第二客戶端啟動后 TCP將10次發送分別封裝成了5次請求,產生粘包,拆包問題
TCP粘包和拆包解決方案
- 使用自定義協議 + 編解碼器來解決
- 關鍵就是要解決 服務器每次讀取數據長度的問題, 這個問題解決, 就不會出現服務器多讀或少讀數據的問題,從而避免TCP粘包和拆包
TCP粘包, 拆包解決方案實現
- 要求客戶端發送5個Message對象, 客戶端每次發送一個Message對象
- 服務器端每次接收一個Message, 分5次進行解碼, 每讀到一個Message, 會回復一個Message對象給客戶端
新建協議MessageProtocol
package com.dance.netty.netty.protocoltcp; /** * 消息協議 */ public class MessageProtocol { private int length; private byte[] content; public MessageProtocol() { } public MessageProtocol(int length, byte[] content) { this.length = length; this.content = content; } public int getLength() { return length; } public void setLength(int length) { this.length = length; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
新建編碼器
package com.dance.netty.netty.protocoltcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 自定義協議編碼器 */ public class MyMessageProtocolEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { // System.out.println("自定義協議---->開始編碼"); // 開始發送數據 out.writeInt(msg.getLength()); // 優先發送長度,定義邊界 out.writeBytes(msg.getContent()); // System.out.println("自定義協議---->編碼完成"); } }
新建解碼器
package com.dance.netty.netty.protocoltcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyMessageProtocolDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // System.out.println("自定義協議---->開始解碼"); // 獲取定義的邊界長度 int length = in.readInt(); if(in.readableBytes() >= length){ // 根據長度讀取數據 byte[] bytes = new byte[length]; in.readBytes(bytes); // 反構造成MessageProtocol MessageProtocol messageProtocol = new MessageProtocol(length, bytes); out.add(messageProtocol); // System.out.println("自定義協議---->解碼完成"); }else{ // 內容長度不夠 } } }
新建服務器端
package com.dance.netty.netty.protocoltcp; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.charset.StandardCharsets; import java.util.UUID; public class NettyServer { public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 加入自定義協議編解碼器 pipeline.addLast(new MyMessageProtocolDecoder()); pipeline.addLast(new MyMessageProtocolEncoder()); pipeline.addLast(new NettyServerHandler()); } }); ChannelFuture sync = serverBootstrap.bind("127.0.0.1", 7000).sync(); System.out.println("server is ready ......"); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } static class NettyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { byte[] bytes = msg.getContent(); count++; System.out.println("服務器第"+count+"次接收到來自客戶端的數據:" + new String(bytes, StandardCharsets.UTF_8)); // 服務器回送數據給客戶端 回送隨機的UUID給客戶端 byte[] s = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); ctx.writeAndFlush(new MessageProtocol(s.length,s)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } } }
新建客戶端
package com.dance.netty.netty.protocoltcp; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.StandardCharsets; public class NettyClient { public static void main(String[] args) { NioEventLoopGroup eventExecutors = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 加入自定義分割符號 // ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes()); // pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter)); // 添加自定義協議編解碼器 pipeline.addLast(new MyMessageProtocolDecoder()); pipeline.addLast(new MyMessageProtocolEncoder()); pipeline.addLast(new NettyClientHandler()); } }); ChannelFuture sync = bootstrap.connect("127.0.0.1", 7000).sync(); sync.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } static class NettyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 連續發送10條數據 for (int i = 0; i < 10; i++) { String msg = "今天天氣冷, 打火鍋" + i; byte[] bytes = msg.getBytes(StandardCharsets.UTF_8); // 使用自定義協議 MessageProtocol messageProtocol = new MessageProtocol(bytes.length, bytes); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { byte[] bytes = msg.getContent(); // 接收服務器的返回 count++; System.out.println("客戶端第"+count+"次接收服務端的回送:" + new String(bytes, StandardCharsets.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } } }
測試
發送10次
服務器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. server is ready ...... 服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0 ...... 服務器第10次接收到來自客戶端的數據:今天天氣冷, 打火鍋9
客戶端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 客戶端第1次接收服務端的回送:a6b69f1c-daba-435a-802a-c19a6350ca94 ...... 客戶端第10次接收服務端的回送:5af5c297-8668-48aa-b8c4-35656142f591
ok,沒有問題, 但是真的沒有問題嗎?答案是有問題
FAQ
發送1000次
修改客戶端發送消息數量
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 連續發送10條數據 for (int i = 0; i < 1000; i++) { ...... } }
重新測試
服務器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. server is ready ...... 服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0 ...... 服務器第31次接收到來自客戶端的數據:今天天氣冷, 打火鍋30 服務器第32次接收到來自客戶端的數據:今天天氣冷, 打火鍋31 io.netty.handler.codec.DecoderException: java.lang.NegativeArraySizeException at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1412) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:943) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NegativeArraySizeException at com.dance.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:17) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ... 16 more io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:459) at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392) at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:359) at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1407) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:925) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:822) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: readerIndex(1022) + length(4) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 1022, widx: 1024, cap: 1024) at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403) at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:786) at com.dance.netty.netty.protocoltcp.MyMessageProtocolDecoder.decode(MyMessageProtocolDecoder.java:14) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ... 17 more
what ? 直接報錯了, 數組下標越界, 讀索引1022 + 長度4 > 寫縮影1024了
這個是什么問題呢 ? 我看網上關於這個BUG的解決方案很少,基本沒有, 好多都是貼問題的, 我翻了將近1個小時,才找到一個大佬寫的一篇文章解決了, 感謝大佬
博客地址:
https://blog.csdn.net/u011035407/article/details/80454511
問題描述:
這樣在剛開始的工作中數據包傳輸沒有問題,不過數據包的大小超過512b的時候就會拋出異常了。
解決方案
配合解碼器DelimiterBasedFrameDecoder一起使用,在數據包的末尾使用換行符\n表示本次數據包已經結束,當DelimiterBasedFrameDecoder把數據切割之后,再使用ByteToMessageDecoder實現decode方法把數據流轉換為Message對象。
我們在ChannelPipeline加入DelimiterBasedFrameDecoder解碼器
客戶端和服務器端都加
//使用\n作為分隔符 pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
在MessageToByteEncoder的實現方法encode()增加out.writeBytes(new byte[]{'\n'});
//在寫出字節流的末尾增加\n表示數據結束 out.writeBytes(new byte[]{'\n'});
這時候就可以愉快的繼續處理數據了。
等我還沒有高興半天的時候,問題又來了。還是一樣的問題
等等等,,,怎么又報錯了,不是已經加了黏包處理了嗎??,解決問題把,首先看解析的數據包結構
+-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 01 01 00 00 00 06 00 00 01 0a 7b 22 69 64 22 |...........{"id"| |00000010| 3a 33 2c 22 75 73 65 72 6e 61 6d 65 22 3a 22 31 |:3,"username":"1| |00000020| 38 35 30 30 33 34 30 31 36 39 22 2c 22 6e 69 63 |8500340169","nic| |00000030| 6b 6e 61 6d 65 22 3a 22 e4 bb 96 e5 9b 9b e5 a4 |kname":"........| |00000040| a7 e7 88 b7 22 2c 22 72 6f 6f 6d 49 64 22 3a 31 |....","roomId":1| |00000050| 35 32 37 32 33 38 35 36 39 34 37 34 2c 22 74 65 |527238569474,"te| |00000060| 61 6d 4e 61 6d 65 22 3a 22 e4 bf 84 e7 bd 97 e6 |amName":".......| |00000070| 96 af 22 2c 22 75 6e 69 74 73 22 3a 7b 22 75 6e |..","units":{"un| |00000080| 69 74 31 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it1":{"x":10.0,"| |00000090| 79 22 3a 31 30 2e 30 7d 2c 22 75 6e 69 74 32 22 |y":10.0},"unit2"| |000000a0| 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 79 22 3a 31 |:{"x":10.0,"y":1| |000000b0| 30 2e 30 7d 2c 22 75 6e 69 74 33 22 3a 7b 22 78 |0.0},"unit3":{"x| |000000c0| 22 3a 31 30 2e 30 2c 22 79 22 3a 31 30 2e 30 7d |":10.0,"y":10.0}| |000000d0| 2c 22 75 6e 69 74 34 22 3a 7b 22 78 22 3a 31 30 |,"unit4":{"x":10| |000000e0| 2e 30 2c 22 79 22 3a 31 30 2e 30 7d 2c 22 75 6e |.0,"y":10.0},"un| |000000f0| 69 74 35 22 3a 7b 22 78 22 3a 31 30 2e 30 2c 22 |it5":{"x":10.0,"| |00000100| 79 22 3a 31 30 2e 30 7d 7d 2c 22 73 74 61 74 75 |y":10.0}},"statu| |00000110| 73 22 3a 31 7d 0a |s":1}. | +--------+-------------------------------------------------+----------------+
接收到的數據是完整的沒錯,但是還是報錯了,而且數據結尾的字節的確是0a,轉化成字符就是\n沒有問題啊。
在ByteToMessageDecoder的decode方法里打印ByteBuf buf的長度之后,問題找到了 長度 : 10
這就是說在進入到ByteToMessageDecoder這個解碼器的時候,數據包已經只剩下10個長度了,那么長的數據被上個解碼器DelimiterBasedFrameDecoder隔空劈開了- -。問題出現在哪呢,看上面那塊字節流的字節,找到第11個字節,是0a。。。。因為不是標准的json格式,最前面使用了3個字節 加上2個int長度的屬性,所以 數據包頭應該是11個字節長。
而DelimiterBasedFrameDecoder在讀到第11個字節的時候讀成了\n,自然而然的就認為這個數據包已經結束了,而數據進入到ByteToMessageDecoder的時候就會因為規定的body長度不等於length長度而出現問題。
思來想去 不實用\n 這樣的單字節作為換行符,很容易在數據流中遇到,轉而使用\r\n倆字節來處理,而這倆字節出現在前面兩個int長度中的幾率應該很小。
最終解決
在客戶端和服務器端的pipeline中添加 以 "\r\n" 定義為邊界的符號來標識數據包結束
//這里使用自定義分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("\r\n".getBytes()); pipeline.addFirst(new DelimiterBasedFrameDecoder(8192, delimiter));
Server端
Client端
編碼器中發送結束位置增加
//這里最后修改使用\r\n out.writeBytes(new byte[]{'\r','\n'});
再次運行程序 數據包可以正常接收了。
最終測試
服務器端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. server is ready ...... 服務器第1次接收到來自客戶端的數據:今天天氣冷, 打火鍋0 ...... 服務器第999次接收到來自客戶端的數據:今天天氣冷, 打火鍋998 服務器第1000次接收到來自客戶端的數據:今天天氣冷, 打火鍋999
客戶端
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 客戶端第1次接收服務端的回送:48fa6d78-8079-4700-b488-ca2af9eb3f8c ...... 客戶端第999次接收服務端的回送:581da47b-d77b-4972-af11-6d33057f6610 客戶端第1000次接收服務端的回送:0014e906-69cb-4900-9409-f4d1af9148dd
總結
以前使用netty的時候也僅限於和硬件交互,而當時的硬件受限於成本問題是一條一條處理數據包的,所以基本上不會考慮黏包問題
然后就是ByteToMessageDecoder和MessageToByteEncoder兩個類是比較底層實現數據流處理的,並沒有帶有拆包黏包的處理機制,需要自己在數據包頭規定包的長度,而且無法處理過大的數據包,因為我一開始首先使用了這種方式處理數據,所以后來就沒有再換成DelimiterBasedFrameDecoder加 StringDecoder來解析數據包,最后使用json直接轉化為對象。
FAQ 參考粘貼於大佬的博客,加自己的改動
若有收獲,就點個贊吧