無論是服務端還是客戶端,當我們讀取或者發送消息的時候,都需要考慮TCP底層的粘包/拆包機制。
TCP粘包/拆包
TCP是個“流”協議,所謂流,就是沒有界限的一串數據。大家可以想想河里的流水,是連成一片的,其間並沒有分界線。TCP底層並不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行包的划分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
TCP粘包/拆包問題說明
假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到的字節數是不確定的,故可能存在以下4種情況。
(1)服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
(2)服務端一次接收到了兩個數據包,D1和D2粘合在一起,被稱為TCP粘包;
(3)服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余內容,這被稱為TCP拆包;
(4)服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余內容D1_2和D2包的整包。
如果此時服務端TCP接收滑窗非常小,而數據包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。
TCP粘包/拆包發生的原因
問題產生的原因有三個,分別如下。
(1)應用程序write寫入的字節大小大於套接口發送緩沖區大小;
(2)進行MSS大小的TCP分段;
(3)以太網幀的payload大於MTU進行IP分片。
粘包問題的解決策略
由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下。
(1)消息定長,例如每個報文的大小為固定長度200字節,如果不夠,空位補空格;
(2)在包尾增加回車換行符進行分割,例如FTP協議;
(3)將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度;
(4)更復雜的應用層協議。
未考慮TCP粘包導致功能異常案例
在前面的時間服務器例程中,我們多次強調並沒有考慮讀半包問題,這在功能測試時往往沒有問題,但是一旦壓力上來,或者發送大報文之后,就會存在粘包/拆包問題。如果代碼沒有考慮,往往就會出現解碼錯位或者錯誤,導致程序不能正常工作。以Netty 入門示例為例。
TimeServer的改造
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
每讀到一條消息后,就計一次數,然后發送應答消息給客戶端。按照設計,服務端接收到的消息總數應該跟客戶端發送的消息總數相同,而且請求消息刪除回車換行符后應該為"QUERY TIME ORDER"。
TimeClient的改造
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 ctx.close(); } }
客戶端跟服務端鏈路建立成功之后,循環發送100條消息,每發送一條就刷新一次,保證每條消息都會被寫入Channel中。按照我們的設計,服務端應該接收到100條查詢時間指令的請求消息。客戶端每接收到服務端一條應答消息之后,就打印一次計數器。按照設計初衷,客戶端應該打印100次服務端的系統時間。
運行結果:
服務端運行結果如下。
The time server receive order : QUERY TIME ORDER
QUERY TIME ORDER
......................
QUERY TIME ORDER ; the counter is : 1
The time server receive order :
QUERY TIME ORDER
............
QUERY TIME ORDER ; the counter is : 2
服務端運行結果表明它只接收到了兩條消息,第一條包含57條“QUERY TIME ORDER”指令,第二條包含了43條“QUERY TIME ORDER”指令,總數正好是100條。我們期待的是收到100條消息,每條包含一條“QUERY TIME ORDER”指令。這說明發生了TCP粘包。
客戶端運行結果如下。
Now is : BAD ORDER
BAD ORDER
; the counter is : 1
按照設計初衷,客戶端應該收到100條當前系統時間的消息,但實際上只收到了一條。這不難理解,因為服務端只收到了2條請求消息,所以實際服務端只發送了2條應答,由於請求消息不滿足查詢條件,所以返回了2條“BAD ORDER”應答消息。但是實際上客戶端只收到了一條包含2條“BAD ORDER”指令的消息,說明服務端返回的應答消息也發生了粘包。由於上面的例程沒有考慮TCP的粘包/拆包,所以當發生TCP粘包時,我們的程序就不能正常工作。
利用LineBasedFrameDecoder解決TCP粘包問題
為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,只要能熟練掌握這些類庫的使用,TCP粘包問題從此會變得非常容易,你甚至不需要關心它們,這也是其他NIO框架和JDK原生的NIO API所無法匹敵的。
服務端代碼:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChildChannelHandler()); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(Channel arg0) throws Exception { arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new TimeServer().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeServerHandler extends ChannelHandlerAdapter { private int counter; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
客戶端代碼:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class TimeClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeClientHandler()); } }); // 發起異步連接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new TimeClient().connect(port, "127.0.0.1"); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class TimeClientHandler extends ChannelHandlerAdapter { private int counter; private byte[] req; public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")) .getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 ctx.close(); } }
兩個變化:
- 拿到的msg已經是解碼成字符串之后的應答消息
- 新增了兩個解碼器:第一個是LineBasedFrameDecoder,第二個是StringDecoder。
運行結果:
服務端執行結果如下。
The time server receive order : QUERY TIME ORDER ; the counter is : 1
.....................................
The time server receive order : QUERY TIME ORDER ; the counter is : 100
客戶端運行結果如下。
Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 1
......................................
Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 100
程序的運行結果完全符合預期,說明通過使用LineBasedFrameDecoder和StringDecoder成功解決了TCP粘包導致的讀半包問題。對於使用者來說,只要將支持半包解碼的handler添加到ChannelPipeline中即可,不需要寫額外的代碼,用戶使用起來非常簡單。
LineBasedFrameDecoder和StringDecoder的原理分析
LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf中的可讀字節,判斷看是否有“\n”或者“\r\n”,如果有,就以此位置為結束位置,從可讀索引到結束位置區間的字節就組成了一行。它是以換行符為結束標志的解碼器,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。如果連續讀取到最大長度后仍然沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。
StringDecoder的功能非常簡單,就是將接收到的對象轉換成字符串,然后繼續調用后面的handler。LineBasedFrameDecoder + StringDecoder組合就是按行切換的文本解碼器,它被設計用來支持TCP的粘包和拆包。
如果發送的消息不是以換行符結束的該怎么辦呢?或者沒有回車換行符,靠消息頭中的長度字段來分包怎么辦?是不是需要自己寫半包解碼器?答案是否定的,Netty提供了多種支持TCP粘包/拆包的解碼器,用來滿足用戶的不同訴求。