TCP粘包/拆包
TCP是個”流”協議,所謂流,就是沒有界限的一串數據。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題
TCP粘包/拆包發生的原因
1. 應用程序write寫入的字節大小大於套接口發送緩沖區大小
2. 進行MSS大小的TCP分段
3. 以太網幀的payload大於MTU進行IP分片
粘包問題的解決策略
由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下
先來看一個粘包的例子
新建maven工程,添加依賴包
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha1</version> </dependency>
TimeServer
package com.zhen.netty1129_TCP_HALF_PACKAGE; import java.awt.Event; import java.net.Socket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class TimeServer { public void bind(int port) throws Exception{ //配置服務端的NIO線程組 //NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組 //bossGroup用於服務端接受客戶端的連接 EventLoopGroup bossGroup = new NioEventLoopGroup(); //workerGroup進行SocketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發復雜度 ServerBootstrap bootstrap = new ServerBootstrap(); //將兩個NIO線程組當作入參傳遞到ServerBootstrap bootstrap.group(bossGroup, workerGroup) //設置創建的Channel為NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類。 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP參數,此處將它的backlog設置為1024 .option(ChannelOption.SO_BACKLOG, 1024) //綁定I/O事件的處理類ChildChannelHandler,它的作用類似於Reactor模式中的Handler類,主要用於處理網絡I/O事件,例如記錄日志、對消息進行編解碼等 .childHandler(new ChildChannelHandler()); //調用bind方法綁定監聽端口,隨后,調用它的同步阻塞方法sync等待綁定操作完成。 //完成之后Netty會返回一個ChannelFuture,它的功能類似於JDK的java.util.concurrent.Future,主要用於異步操作的通知回調 ChannelFuture future = bootstrap.bind(port).sync(); //等待服務端監聽端口關閉,等待服務端鏈路關閉之后main函數才退出 future.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 9090; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { e.printStackTrace(); } } new TimeServer().bind(port); } }
TimeServerHandler
package com.zhen.netty1129_TCP_HALF_PACKAGE; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //TimeServerHandler 繼承自ChannelHandlerAdapter,它用於對網絡事件進行讀寫操作 public class TimeServerHandler extends ChannelHandlerAdapter{ private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將msg轉換成Netty的ByteBuf對象。ByteBuf類似於jdk中的java.nio.ByteBuffer對象,不過它提供了更加強大和靈活的功能 ByteBuf buf = (ByteBuf) msg; //通過ByteBuf的readableBytes方法可以獲取緩沖區可讀的字節數,根據可讀的字節數創建byte數組 byte[] req = new byte[buf.readableBytes()]; //通過ByteBuf的readBytes方法將緩沖區中的字節數據復制到新建的byte數組中 buf.readBytes(req); //通過new String構造函數獲取請求消息 String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); System.out.println("The time server receive order : " + body + "; the counter is : "+ ++counter); //對請求消息進行判斷,如果是QUERY TIME ORDER則創建應答消息 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //通過ChannelHandlerContext的write方法異步發送應答消息給客戶端 ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //當發生異常時,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的句柄等資源 ctx.close(); } }
TimeClient
package com.zhen.netty1129_TCP_HALF_PACKAGE; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(int port,String host) throws Exception{ //配置客戶端NIO線程組,客戶端處理I/O讀寫的NioEventLoopGroup線程組 EventLoopGroup group = new NioEventLoopGroup(); try { //客戶端輔助啟動類Bootstrap Bootstrap bootstrap = new Bootstrap(); //設置線程組 bootstrap.group(group) //與服務端不同的是,它的channel需要設置為NioSocketChannel .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) //然后為其添加Handler,此處為了簡單直接創建匿名內部類,實現initChannel方法 //作用是當創建NioSocketChannel成功之后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件 .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); //調用connect發起異步連接操作,然后調用sync同步方法等待連接成功。 ChannelFuture future = bootstrap.connect(host, port).sync(); //等待客戶端鏈路關閉,當客戶端連接關閉之后,客戶端主函數退出,退出之前釋放NIO線程組的資源 future.channel().closeFuture().sync(); } finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ int port = 9090; String host = "127.0.0.1"; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { e.printStackTrace(); } } new TimeClient().connect(port, host); } }
TimeClientHandler
package com.zhen.netty1129_TCP_HALF_PACKAGE; import java.util.logging.Logger; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter{ private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName()); private int counter; private byte[] req; public TimeClientHandler(){ req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); } //當客戶端和服務端TCP鏈路建立成功之后,Netty的NIO線程會調用channelActive方法,發送查詢時間的指令給服務端 //調用ChannelHandlerContext的writeAndFlush方法將請求消息發送給客戶端 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } //當客戶端返回應答消息,channelRead方法被調用 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is :" + body + " ; the counter is : " + ++counter); } //發生異常時,釋放客戶端資源 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warning("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
此時啟動server,再啟動client,可看到以下結果
server端
client端
可以發現,server只受到了兩條消息,說明發生了粘包,但是我們期望的是收到100條消息,每條包含一條”QUERY TIME ORDER”指令,這說明發生了TCP粘包
客戶端應該收到100條當前系統時間,但實際上只收到了一條,因為服務端只收到了2條請求消息,所以實際服務端只發送了2條應答,由於請求消息不滿足查詢條件,所以返回了2條”BAD ORDER”應答消息。但是實際上客戶端只收到了一條包含兩條”BAD ORDER”指令的消息,說明服務端返回的應答消息也發生了粘包
解決TCP粘包問題
利用LineBasedFrameDecoder解決TCP粘包問題
為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,只要能熟練掌握這些類庫的使用,TCP粘包問題從此會變得非常容易
來看代碼
TimeServer
package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE; import java.awt.Event; import java.net.Socket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeServer { public void bind(int port) throws Exception{ //配置服務端的NIO線程組 //NioEventLoopGroup是個線程組,它包含了一組NIO線程,專門用於網絡事件的處理,實際上它們就是Reactor線程組 //bossGroup用於服務端接受客戶端的連接 EventLoopGroup bossGroup = new NioEventLoopGroup(); //workerGroup進行SocketChannel的網絡讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發復雜度 ServerBootstrap bootstrap = new ServerBootstrap(); //將兩個NIO線程組當作入參傳遞到ServerBootstrap bootstrap.group(bossGroup, workerGroup) //設置創建的Channel為NioServerSocketChannel,它的功能對應於JDK NIO類庫中的ServerSocketChannel類。 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP參數,此處將它的backlog設置為1024 .option(ChannelOption.SO_BACKLOG, 1024) //綁定I/O事件的處理類ChildChannelHandler,它的作用類似於Reactor模式中的Handler類,主要用於處理網絡I/O事件,例如記錄日志、對消息進行編解碼等 .childHandler(new ChildChannelHandler()); //調用bind方法綁定監聽端口,隨后,調用它的同步阻塞方法sync等待綁定操作完成。 //完成之后Netty會返回一個ChannelFuture,它的功能類似於JDK的java.util.concurrent.Future,主要用於異步操作的通知回調 ChannelFuture future = bootstrap.bind(port).sync(); //等待服務端監聽端口關閉,等待服務端鏈路關閉之后main函數才退出 future.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { //在原來的TimeServerHandler之前新增了兩個解碼器LineBasedFrameDecoder、StringDecoder ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 9090; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { e.printStackTrace(); } } new TimeServer().bind(port); } }
TimeServerHandler
package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE; import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; //TimeServerHandler 繼承自ChannelHandlerAdapter,它用於對網絡事件進行讀寫操作 public class TimeServerHandler extends ChannelHandlerAdapter{ private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String)msg; System.out.println("The time server receive order : " + body + "; the counter is : "+ ++counter); //對請求消息進行判斷,如果是QUERY TIME ORDER則創建應答消息 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //通過ChannelHandlerContext的write方法異步發送應答消息給客戶端 ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //當發生異常時,關閉ChannelHandlerContext,釋放和ChannelHandlerContext相關聯的句柄等資源 ctx.close(); } }
TimeClient
package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeClient { public void connect(int port,String host) throws Exception{ //配置客戶端NIO線程組,客戶端處理I/O讀寫的NioEventLoopGroup線程組 EventLoopGroup group = new NioEventLoopGroup(); try { //客戶端輔助啟動類Bootstrap Bootstrap bootstrap = new Bootstrap(); //設置線程組 bootstrap.group(group) //與服務端不同的是,它的channel需要設置為NioSocketChannel .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) //然后為其添加Handler,此處為了簡單直接創建匿名內部類,實現initChannel方法 //作用是當創建NioSocketChannel成功之后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件 .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { //在原來的TimeClientHandler之前新增了兩個解碼器LineBasedFrameDecoder、StringDecoder ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); //調用connect發起異步連接操作,然后調用sync同步方法等待連接成功。 ChannelFuture future = bootstrap.connect(host, port).sync(); //等待客戶端鏈路關閉,當客戶端連接關閉之后,客戶端主函數退出,退出之前釋放NIO線程組的資源 future.channel().closeFuture().sync(); } finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ int port = 9090; String host = "127.0.0.1"; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { e.printStackTrace(); } } new TimeClient().connect(port, host); } }
TimeClientHandler
package com.zhen.netty1129_TCP_HALF_PACKAGE_SOLVE; import java.util.logging.Logger; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter{ private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName()); private int counter; private byte[] req; public TimeClientHandler(){ req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); } //當客戶端和服務端TCP鏈路建立成功之后,Netty的NIO線程會調用channelActive方法,發送查詢時間的指令給服務端 //調用ChannelHandlerContext的writeAndFlush方法將請求消息發送給客戶端 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } //當客戶端返回應答消息,channelRead方法被調用 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //拿到的msg已經是解碼成字符串之后的應答消息了。 String body = (String)msg; System.out.println("Now is :" + body + " ; the counter is : " + ++counter); } //發生異常時,釋放客戶端資源 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warning("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
此時再次運行,查看結果
Server端
client端
此時TCP粘包問題已經解決
LineBasedFrameDecoder和StringDecoder的原理分析
LineBasedFrameDecoder的工作原理是依次便利ByteBuf中的刻度子節,判斷看是否有”\n” 或者“\r”,如果有,就以此為止為結束位置,從可讀索引到結束位置區間的字節久組成了一行。它是以換行符為結束標志的解碼器,支持攜帶結束符或者不攜帶結束符兩種編碼方式,同時支持配置單行的最大長度后仍然沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。
StringDecoder的功能非常簡單,就是將接收到的對象轉換成字符串,然后繼續調用后面的handler。LineBasedFrameDecoder+StringDecoder組合就是按行切換的文本解碼器,它被設計用來支持TCP的粘包和拆包.