在TCP連接開始到結束連接,之間可能會多次傳輸數據,也就是服務器和客戶端之間可能會在連接過程中互相傳輸多條消息。理想狀況是一方每發送一條消息,另一方就立即接收到一條,也就是一次write對應一次read。但是,現實不總是按照劇本來走。
MINA官方文檔節選:
TCP guarantess delivery of all packets in the correct order. But there is no guarantee that one write operation on the sender-side will result in one read event on the receiving side. One call of IoSession.write(Object message) by the sender can result in multiple messageReceived(IoSession session, Object message) events on the receiver; and multiple calls of IoSession.write(Object message) can lead to a single messageReceived event.
Netty官方文檔節選:
In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote.
上面兩段話表達的意思相同:TCP是基於字節流的協議,它只能保證一方發送和另一方接收到的數據的字節順序一致,但是,並不能保證一方每發送一條消息,另一方就能完整的接收到一條信息。有可能發送了兩條對方將其合並成一條,也有可能發送了一條對方將其拆分成兩條。所以在上一篇博文中的Demo,可以說是一個錯誤的示范。不過服務器和客戶端在同一台機器上或者在局域網等網速很好的情況下,這種問題還是很難測試出來。
舉個簡單了例子(這個例子來源於Netty官方文檔):
消息發送方發送了三個字符串:
但是接收方收到的可能是這樣的:
那么問題就很嚴重了,接收方沒法分開這三條信息了,也就沒法解析了。
對此,MINA的官方文檔提供了以下幾種解決方案:
1、use fixed length messages
使用固定長度的消息。比如每個長度4字節,那么接收的時候按每條4字節拆分就可以了。
2、use a fixed length header that indicates the length of the body
使用固定長度的Header,Header中指定Body的長度(字節數),將信息的內容放在Body中。例如Header中指定的Body長度是100字節,那么Header之后的100字節就是Body,也就是信息的內容,100字節的Body后面就是下一條信息的Header了。
3、using a delimiter; for example many text-based protocols append a newline (or CR LF pair) after every message
使用分隔符。例如許多文本內容的協議會在每條消息后面加上換行符(CR LF,即"\r\n"),也就是一行一條消息。當然也可以用其他特殊符號作為分隔符,例如逗號、分號等等。
當然除了上面說到的3種方案,還有其他方案。有的協議也可能會同時用到上面多種方案。例如HTTP協議,Header部分用的是CR LF換行來區分每一條Header,而Header中用Content-Length來指定Body字節數。
下面,分別用MINA、Netty、Twisted自帶的相關API實現按換行符CR LF來分割消息。
MINA:
MINA可以使用ProtocolCodecFilter來對發送和接收的二進制數據進行加工,如何加工取決於ProtocolCodecFactory或ProtocolEncoder、ProtocolDecoder,加工后在IoHandler中messageReceived事件函數獲取的message就不再是IoBuffer了,而是你想要的其他類型,可以是字符串,Java對象。這里可以使用TextLineCodecFactory(ProtocolCodecFactory的一個實現類)實現CR LF分割消息。
public class TcpServer { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); // 添加一個Filter,用於接收、發送的內容按照"\r\n"分割 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), "\r\n", "\r\n"))); acceptor.setHandler(new TcpServerHandle()); acceptor.bind(new InetSocketAddress(8080)); } } class TcpServerHandle extends IoHandlerAdapter { @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { cause.printStackTrace(); } // 接收到新的數據 @Override public void messageReceived(IoSession session, Object message) throws Exception { // 接收客戶端的數據,這里接收到的不再是IoBuffer類型,而是字符串 String line = (String) message; System.out.println("messageReceived:" + line); } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("sessionCreated"); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("sessionClosed"); } }
Netty:
Netty設計上和MINA類似,需要在ChannelPipeline加上一些ChannelHandler用來對原始數據進行處理。這里用LineBasedFrameDecoder將接收到的數據按行分割,StringDecoder再將數據由字節碼轉成字符串。同樣,接收到的數據進過加工后,在channelRead事件函數中,msg參數不再是ByteBuf而是String。
public class TcpServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // LineBasedFrameDecoder按行分割消息 pipeline.addLast(new LineBasedFrameDecoder(80)); // 再按UTF-8編碼轉成字符串 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new TcpServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } class TcpServerHandler extends ChannelInboundHandlerAdapter { // 接收到新的數據 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // msg經過StringDecoder后類型不再是ByteBuf而是String String line = (String) msg; System.out.println("channelRead:" + line); } @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("channelInactive"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
Twisted:
Twisted的設計和上面兩者的設計不太一樣,所以實現消息分割也不太一樣。處理事件的類TcpServerHandle不再繼承Protocol,而是繼承Protocol的子類LineOnlyReceiver。接收到新數據的事件方法也不再是dataReceived,而是LineOnlyReceiver提供的lineReceived。看Twisted源碼的話可以發現LineOnlyReceiver的內部實際上自己實現了dataReceived,然后將其按行分割,有新的一行數據就調用lineReceived。
# -*- coding:utf-8 –*- from twisted.protocols.basic import LineOnlyReceiver from twisted.internet.protocol import Factory from twisted.internet import reactor class TcpServerHandle(LineOnlyReceiver): # 新的連接建立 def connectionMade(self): print 'connectionMade' # 連接斷開 def connectionLost(self, reason): print 'connectionLost' # 接收到新的一行數據 def lineReceived(self, data): print 'lineReceived:' + data factory = Factory() factory.protocol = TcpServerHandle reactor.listenTCP(8080, factory) reactor.run()
下面用一個Java客戶端對三個服務器進行測試:
public class TcpClient { public static void main(String[] args) throws IOException { Socket socket = null; OutputStream out = null; try { socket = new Socket("localhost", 8080); out = socket.getOutputStream(); // 請求服務器 String lines = "床前明月光\r\n疑是地上霜\r\n舉頭望明月\r\n低頭思故鄉\r\n"; byte[] outputBytes = lines.getBytes("UTF-8"); out.write(outputBytes); out.flush(); } finally { // 關閉連接 out.close(); socket.close(); } } }
MINA服務器輸出結果:
sessionCreated
messageReceived:床前明月光
messageReceived:疑是地上霜
messageReceived:舉頭望明月
messageReceived:低頭思故鄉
sessionClosed
Netty服務器輸出結果:
channelActive
channelRead:床前明月光
channelRead:疑是地上霜
channelRead:舉頭望明月
channelRead:低頭思故鄉
channelInactive
Twisted服務器輸出結果:
connectionMade
lineReceived:床前明月光
lineReceived:疑是地上霜
lineReceived:舉頭望明月
lineReceived:低頭思故鄉
connectionLost
當然,測試的時候也可以將發送的數據模擬成不按規則分割的情況,下面用一個更變態的客戶端來測試:
public class TcpClient { public static void main(String[] args) throws IOException, InterruptedException { Socket socket = null; OutputStream out = null; try{ socket = new Socket("localhost", 8080); out = socket.getOutputStream(); String lines = "床前"; byte[] outputBytes = lines.getBytes("UTF-8"); out.write(outputBytes); out.flush(); Thread.sleep(1000); lines = "明月"; outputBytes = lines.getBytes("UTF-8"); out.write(outputBytes); out.flush(); Thread.sleep(1000); lines = "光\r\n疑是地上霜\r\n舉頭"; outputBytes = lines.getBytes("UTF-8"); out.write(outputBytes); out.flush(); Thread.sleep(1000); lines = "望明月\r\n低頭思故鄉\r\n"; outputBytes = lines.getBytes("UTF-8"); out.write(outputBytes); out.flush(); } finally { // 關閉連接 out.close(); socket.close(); } } }
再次分別測試上面三個服務器,結果和上面的輸出結果一樣,沒有任何問題。
MINA、Netty、Twisted一起學系列
MINA、Netty、Twisted一起學(一):實現簡單的TCP服務器
MINA、Netty、Twisted一起學(二):TCP消息邊界問題及按行分割消息
MINA、Netty、Twisted一起學(三):TCP消息固定大小的前綴(Header)
MINA、Netty、Twisted一起學(四):定制自己的協議
MINA、Netty、Twisted一起學(五):整合protobuf
MINA、Netty、Twisted一起學(六):session
MINA、Netty、Twisted一起學(七):發布/訂閱(Publish/Subscribe)
MINA、Netty、Twisted一起學(八):HTTP服務器
MINA、Netty、Twisted一起學(九):異步IO和回調函數
MINA、Netty、Twisted一起學(十一):SSL/TLS
MINA、Netty、Twisted一起學(十二):HTTPS