1 TCP 粘包和拆包基本介紹
1) TCP 是面向連接的, 面向流的, 提供高可靠性服務。 收發兩端(客戶端和服務器端) 都要有一一成對的 socket,因此, 發送端為了將多個發給接收端的包, 更有效的發給對方, 使用了優化方法(Nagle 算法) , 將多次間隔較小且數據量小的數據, 合並成一個大的數據塊, 然后進行封包。 這樣做雖然提高了效率, 但是接收端就難於分辨出完整的數據包了, 因為面向流的通信是無消息保護邊界的。
2) 由於 TCP 無消息保護邊界, 需要在接收端處理消息邊界問題, 也就是我們所說的粘包、 拆包問題, 看一張圖
3) 示意圖 TCP 粘包、 拆包圖解
對圖的說明:
假設客戶端分別發送了兩個數據包 D1 和 D2 給服務端, 由於服務端一次讀取到字節數是不確定的, 故可能存在以
下四種情況:
1) 服務端分兩次讀取到了兩個獨立的數據包, 分別是 D1 和 D2, 沒有粘包和拆包
2) 服務端一次接受到了兩個數據包, D1 和 D2 粘合在一起, 稱之為 TCP 粘包
3) 服務端分兩次讀取到了數據包, 第一次讀取到了完整的 D1 包和 D2 包的部分內容, 第二次讀取到了 D2 包 的剩余內容, 這稱之為 TCP 拆包
4) 服務端分兩次讀取到了數據包, 第一次讀取到了 D1 包的部分內容 D1_1, 第二次讀取到了 D1 包的剩余部
分內容 D1_2 和完整的 D2 包。
2 TCP 粘包和拆包現象實例 (現象)
在編寫 Netty 程序時, 如果沒有做處理, 就會發生粘包和拆包的問題
看一個具體的實例:
MyClientHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客戶端發送 10 條數據 hello,server 編號 for(int i= 0; i< 10; ++i) { ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8")); ctx.writeAndFlush(buffer); } } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); String message = new String(buffer, Charset.forName("utf-8")); System.out.println("客戶端接收到消息=" + message); System.out.println("客戶端接收消息數量=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
MyServerHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; import java.util.UUID; public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{ private int count; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { byte[] buffer = new byte[msg.readableBytes()]; msg.readBytes(buffer); //將 buffer 轉成字符串 String message = new String(buffer, Charset.forName("utf-8")); System.out.println("服務器接收到數據 " + message); System.out.println("服務器接收到消息量=" + (++this.count)); //服務器回送數據給客戶端, 回送一個隨機 id , ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8")); ctx.writeAndFlush(responseByteBuf); } }
3 TCP 粘包和拆包解決方案
1) 使用自定義協議 + 編解碼器 來解決
2) 關鍵就是要解決 服務器端每次讀取數據長度的問題, 這個問題解決, 就不會出現服務器多讀或少讀數據的問題, 從而避免的 TCP 粘包、 拆包 。
實列:
1) 要求客戶端發送 5 個 Message 對象, 客戶端每次發送一個 Message 對象
2) 服務器端每次接收一個 Message, 分 5 次進行解碼, 每讀取到 一個 Message , 會回復一個 Message 對象 給客
戶端.
MessageProtocol //協議包

public class MessageProtocol { private int len; //關鍵 private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } p ublic byte[] getContent() { return content; } p ublic void setContent(byte[] content) { this.content = content; } }
MyClientHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客戶端發送10條數據 "今天天氣冷,吃火鍋" 編號 for(int i = 0; i< 5; i++) { String mes = "今天天氣冷,吃火鍋"; byte[] content = mes.getBytes(Charset.forName("utf-8")); int length = mes.getBytes(Charset.forName("utf-8")).length; //創建協議包對象 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); ctx.writeAndFlush(messageProtocol); } } // @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println("客戶端接收到消息如下"); System.out.println("長度=" + len); System.out.println("內容=" + new String(content, Charset.forName("utf-8"))); System.out.println("客戶端接收消息數量=" + (++this.count)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常消息=" + cause.getMessage()); ctx.close(); } }
MyClient

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class MyClient { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new MyClientInitializer()); //自定義一個初始化類 ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
MyClientInitializer

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageEncoder()); //加入編碼器 pipeline.addLast(new MyMessageDecoder()); //加入解碼器 pipeline.addLast(new MyClientHandler()); } }
MyServer

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class MyServer { public static void main(String[] args) throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); //自定義一個初始化類 ChannelFuture channelFuture = serverBootstrap.bind(9994).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerInitializer

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new MyMessageDecoder());//解碼器 pipeline.addLast(new MyMessageEncoder());//編碼器 pipeline.addLast(new MyServerHandler()); } }
MyServerHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.nio.charset.Charset; import java.util.UUID; //處理業務的handler public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol>{ private int count; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { //接收到數據,並處理 int len = msg.getLen(); byte[] content = msg.getContent(); System.out.println(); System.out.println(); System.out.println(); System.out.println("服務器接收到信息如下"); System.out.println("長度=" + len); System.out.println("內容=" + new String(content, Charset.forName("utf-8"))); System.out.println("服務器接收到消息包數量=" + (++this.count)); //回復消息 // String responseContent = UUID.randomUUID().toString(); // int responseLen = responseContent.getBytes("utf-8").length; // byte[] responseContent2 = responseContent.getBytes("utf-8"); //構建一個協議包 // MessageProtocol messageProtocol = new MessageProtocol(); // messageProtocol.setLen(responseLen); // messageProtocol.setContent(responseContent2); // // ctx.writeAndFlush(messageProtocol); } }
MyMessageDecoder

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder decode 被調用"); //需要將得到二進制字節碼-> MessageProtocol 數據包(對象) int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); //封裝成 MessageProtocol 對象,放入 out, 傳遞下一個handler業務處理 MessageProtocol messageProtocol = new MessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } }
MyMessageEncoder

import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder encode 方法被調用"); out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }