1. 簡介
1. TCP 是面向連接的,面向流的,提供可靠性服務,收發兩端(客戶端和服務器端) 都要有一一成對的Socket, 因此,發送端為了將多個發送給接收端的包更有效的發給對方,使用了優化算法(Nagle 算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包,這樣雖然提高了效率,但是接收端就難於分辨出完整的數據包了。 因為面向流的通信是無消息保護邊界的。
2. 由於TCP 無消息保護邊界,需要在接收端處理消息邊界問題, 也就是我們所說的粘包拆包問題。
2. 粘包問題演示
TcpServer:
package netty.tcp; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TcpServer { private static final Integer PORT = 6666; public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyNettyServerInitializer()); ChannelFuture sync = serverBootstrap.bind(PORT).sync(); sync.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("服務端啟動成功,監聽地址: " + PORT); } } }); } }
MyNettyServerInitializer
package netty.tcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 向管道加入處理器 ChannelPipeline pipeline = ch.pipeline(); // 1. 增加一個自定義的handler pipeline.addLast(new MyServerHandler()); System.out.println("server is ok~~~~"); } }
MyServerHandler
package netty.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { int i = msg.readableBytes(); System.out.println("讀取到的字節數: " + i); // 回顯一個消息給客戶端 String msgStr = msg.toString(CharsetUtil.UTF_8); String printMsg = "count: " + (++count) + "; msgStr:" + msgStr; System.out.println(printMsg); ctx.channel().writeAndFlush(Unpooled.copiedBuffer(printMsg, CharsetUtil.UTF_8)); } }
TcpClient
package netty.tcp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TcpClient { private static final Integer PORT = 6666; public static void main(String[] args) throws InterruptedException { // 創建一個事件循環組 EventLoopGroup eventExecutors = new NioEventLoopGroup(); try { // 創建一個啟動Bootstrap(注意是Netty包下的) Bootstrap bootstrap = new Bootstrap(); // 鏈式設置參數 bootstrap.group(eventExecutors) // 設置線程組 .channel(NioSocketChannel.class) // 設置通道class .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 1. 加入一個自定義的處理器 pipeline.addLast(new MyClientHandler()); } }); System.out.println("客戶端is ok..."); // 啟動客戶端連接服務器(ChannelFuture 是netty的異步模型) ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync(); // 監聽關閉通道 channelFuture.channel().closeFuture().sync(); } finally { // 關閉 eventExecutors.shutdownGracefully(); } } }
MyClientHandler
package netty.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * 自定義服務器端處理handler,需要繼承netty定義的 ChannelInboundHandlerAdapter 類 */ public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 通道就緒事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 循環發送十條消息 int count = 10; String msg = "client msg "; for (int i = 0; i < count; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(msg + i, CharsetUtil.UTF_8)); } } /** * 發生異常事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("從客戶端 " + ctx.channel().remoteAddress() + " 讀取到的消息, long: " + msg); // 回顯一個消息給客戶端 String msgStr = msg.toString(CharsetUtil.UTF_8); System.out.println(msgStr); } }
啟動服務器端,然后啟動兩個客戶端,查看日志:
(1) 服務器端:
服務端啟動成功,監聽地址: 6666 server is ok~~~~ 讀取到的字節數: 120 count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9 server is ok~~~~ 讀取到的字節數: 12 count: 1; msgStr:client msg 0 讀取到的字節數: 96 count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8 讀取到的字節數: 12 count: 3; msgStr:client msg 9
(2) 客戶端1
客戶端is ok... 從客戶端 /127.0.0.1:6666 讀取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 137, cap: 1024) count: 1; msgStr:client msg 0client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8client msg 9
(3) 客戶端2
客戶端is ok... 從客戶端 /127.0.0.1:6666 讀取到的消息, long: PooledUnsafeDirectByteBuf(ridx: 0, widx: 171, cap: 1024) count: 1; msgStr:client msg 0count: 2; msgStr:client msg 1client msg 2client msg 3client msg 4client msg 5client msg 6client msg 7client msg 8count: 3; msgStr:client msg 9
可以看到發生了消息錯亂。
3. 解決辦法
1. 加入一個消息包裝類 TransferMsg
package netty.tcp; import lombok.Data; @Data public class TransferMsg { private Integer length; private byte[] msg; }
2. TransferMsgEncoder 編碼器
package netty.tcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class TransferMsgEncoder extends MessageToByteEncoder<TransferMsg> { @Override protected void encode(ChannelHandlerContext ctx, TransferMsg msg, ByteBuf out) throws Exception { System.out.println("netty.tcp.TransferMsgEncoder.encode 被調用"); out.writeInt(msg.getLength()); out.writeBytes(msg.getMsg()); } }
3. TransferMsgDecoder 解碼器
package netty.tcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ReplayingDecoder; import java.util.List; public class TransferMsgDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("netty.tcp.TransferMsgDecoder.decode 被調用"); int count = in.readInt(); byte[] bytes = new byte[count]; in.readBytes(bytes); TransferMsg transferMsg = new TransferMsg(); transferMsg.setMsg(bytes); transferMsg.setLength(count); out.add(transferMsg); } }
4. 服務器端修改
修改MyNettyServerInitializer 加入自己的解碼器
package netty.tcp; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; public class MyNettyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 向管道加入處理器 ChannelPipeline pipeline = ch.pipeline(); // 1. 增加一個自定義的handler pipeline.addLast(new TransferMsgDecoder()); pipeline.addLast(new MyServerHandler()); System.out.println("server is ok~~~~"); } }
修改MyServerHandler
package netty.tcp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class MyServerHandler extends SimpleChannelInboundHandler<TransferMsg> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, TransferMsg msg) throws Exception { Integer length = msg.getLength(); byte[] msg1 = msg.getMsg(); String s = new String(msg1); System.out.println("讀到消息,length: " + length + "\tmsg:" + s); } }
2. 客戶端修改
TcpClient 加入自己的編碼器
package netty.tcp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TcpClient { private static final Integer PORT = 6666; public static void main(String[] args) throws InterruptedException { // 創建一個事件循環組 EventLoopGroup eventExecutors = new NioEventLoopGroup(); try { // 創建一個啟動Bootstrap(注意是Netty包下的) Bootstrap bootstrap = new Bootstrap(); // 鏈式設置參數 bootstrap.group(eventExecutors) // 設置線程組 .channel(NioSocketChannel.class) // 設置通道class .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 1. 加入一個自定義的處理器 pipeline.addLast(new TransferMsgEncoder()); pipeline.addLast(new MyClientHandler()); } }); System.out.println("客戶端is ok..."); // 啟動客戶端連接服務器(ChannelFuture 是netty的異步模型) ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync(); // 監聽關閉通道 channelFuture.channel().closeFuture().sync(); } finally { // 關閉 eventExecutors.shutdownGracefully(); } } }
MyClientHandler 修改發送的消息格式
package netty.tcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * 自定義服務器端處理handler,需要繼承netty定義的 ChannelInboundHandlerAdapter 類 */ public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 通道就緒事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 循環發送十條消息 int count = 10; String msg = "client msg "; String sendMsg = null; TransferMsg transferMsg = null; for (int i = 0; i < count; i++) { transferMsg = new TransferMsg(); sendMsg = msg + i; transferMsg.setMsg(sendMsg.getBytes()); transferMsg.setLength(sendMsg.getBytes().length); ctx.writeAndFlush(transferMsg); } } /** * 發生異常事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("從客戶端 " + ctx.channel().remoteAddress() + " 讀取到的消息, long: " + msg); // 回顯一個消息給客戶端 String msgStr = msg.toString(CharsetUtil.UTF_8); System.out.println(msgStr); } }
測試: 啟動一個服務器端,然后啟動一個客戶端,查看服務器日志如下:
服務端啟動成功,監聽地址: 6666 server is ok~~~~ netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 0 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 1 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 2 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 3 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 4 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 5 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 6 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 7 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 8 netty.tcp.TransferMsgDecoder.decode 被調用 讀到消息,length: 12 msg:client msg 9
如果服務器向客戶端返回相同的消息,在服務器端也需要加入自己的編碼器;客戶端加入自己的解碼器。實測解決了粘包問題。
總結: Netty 解決方法:
1》 使用自定義協議 + 編解碼器來解決
2》 關鍵就是解決服務器每次讀取數據長度的問題,這個問題解決,就不會出現服務器多讀或者少讀的問題,從而避免TCP的粘包、拆包
還有其他的解決辦法又自定義消息的結束符,比如我們約定以"XXXXXX" 結尾,則收到消息可以判斷是否以這個結尾。
參考: https://juejin.cn/post/6975109908106575903