TCP以流的方式進行數據傳輸,上層的應用協議為了對消息進行區分,往往采用如下4種方式。
(1)消息長度固定,累計讀取到長度總和為定長LEN的報文后,就認為讀取到了一個完整的消息;將計數器置位,重新開始讀取下一個數據報;
(2)將回車換行符作為消息結束符,例如FTP協議,這種方式在文本協議中應用比較廣泛;
(3)將特殊的分隔符作為消息的結束標志,回車換行符就是一種特殊的結束分隔符;
(4)通過在消息頭中定義長度字段來標識消息的總長度。
Netty對上面四種應用做了統一的抽象,提供了4種解碼器來解決對應的問題,使用起來非常方便。有了這些解碼器,用戶不需要自己對讀取的報文進行人工解碼,也不需要考慮TCP的粘包和拆包。
兩種實用的解碼器——DelimiterBasedFrameDecoder和FixedLengthFrameDecoder,前者可以自動完成以分隔符做結束標志的消息的解碼,后者可以自動完成對定長消息的解碼,它們都能解決TCP粘包/拆包導致的讀半包問題。
DelimiterBasedFrameDecoder應用開發
演示程序以經典的Echo服務為例。EchoServer接收到EchoClient的請求消息后,將其打印出來,然后將原始消息返回給客戶端,消息以“$_”作為分隔符。
服務端示例:
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class EchoServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChildChannelHandler()); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(Channel arg0) throws Exception { //首先創建分隔符緩沖對象ByteBuf,本例程中使用“$_”作為分隔符。 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //創建DelimiterBasedFrameDecoder對象,將其加入到ChannelPipeline中。 //DelimiterBasedFrameDecoder有多個構造方法,這里我們傳遞兩個參數, //第一個1024表示單條消息的最大長度,當達到該長度后仍然沒有查找到分隔符, //就拋出TooLongFrame Exception異常,防止由於異常碼流缺失分隔符導致的內存溢出, //這是Netty解碼器的可靠性保護;第二個參數就是分隔符緩沖對象。 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new EchoServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new EchoServer().bind(port); } } import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @ChannelHandler.Sharable public class EchoServerHandler extends ChannelHandlerAdapter { int counter = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //直接將接收的消息打印出來,由於DelimiterBasedFrameDecoder自動對請求消息進行了解碼 //后續的ChannelHandler接收到的msg對象就是個完整的消息包; //第二個ChannelHandler是StringDecoder,它將ByteBuf解碼成字符串對象 //第三個EchoServerHandler接收到的msg消息就是解碼后的字符串對象。 String body = (String) msg; System.out.println("This is " + ++counter + " times receive client : ["+ body + "]"); body += "$_"; //由於我們設置DelimiterBasedFrameDecoder過濾掉了分隔符, //所以,返回給客戶端時需要在請求消息尾部拼接分隔符“$_”, //最后創建ByteBuf,將原始消息重新返回給客戶端。 ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(echo); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 發生異常,關閉鏈路 } }
客戶端示例:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class EchoClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); // 發起異步連接操作 ChannelFuture f = b.connect(host, port).sync(); // 等待客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new EchoClient().connect(port, "127.0.0.1"); } } import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class EchoClientHandler extends ChannelHandlerAdapter { private int counter; static final String ECHO_REQ = "Hi, Netty. Welcome to Netty.$_"; public EchoClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("This is " + ++counter + " times receive server : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
運行結果:
服務端
This is 1 times receive client : [Hi, Netty. Welcome to Netty.]
...............................
This is 10 times receive client : [Hi, Netty. Welcome to Netty.]
客戶端
This is 1 times receive client : [Hi, Netty. Welcome to Netty.]
...............................
This is 10 times receive client : [Hi, Netty. Welcome to Netty.]
FixedLengthFrameDecoder應用開發
FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對消息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題,非常實用。
利用FixedLengthFrameDecoder解碼器,無論一次接收到多少數據報,它都會按照構造函數中設置的固定長度進行解碼,如果是半包消息,FixedLengthFrameDecoder會緩存半包消息並等待下個包到達后進行拼包,直到讀取到一個完整的包。
服務端示例:
在服務端的ChannelPipeline中新增FixedLengthFrameDecoder,長度設置為20,然后再依次增加字符串解碼器和EchoServerHandler
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class EchoServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch)throws Exception { ch.pipeline().addLast( new FixedLengthFrameDecoder(20)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new EchoServer().bind(port); } } import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @ChannelHandler.Sharable public class EchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive client : [" + msg + "]"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 發生異常,關閉鏈路 } }
客戶端示例:
- telnet localhost 8080
- 通過set localecho命令打開本地回顯功能
➜ zcy-fixed git:(feature/transaction) ✗ telnet localhost 8080 Trying ::1... Connected to localhost. Escape character is '^]'. 1234567890123456789012 123456789012345678
//連接日志 10:10:59.755 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x979fc7a5, /0:0:0:0:0:0:0:0:8080] RECEIVED: [id: 0x79dc7a78, /0:0:0:0:0:0:0:1:50749 => /0:0:0:0:0:0:0:1:8080] Receive client : [12345678901234567890] Receive client : [12 1234567890123456]
