Netty是目前業界最流行的NIO框架之一,它的健壯性、高性能、可定制和可擴展性在同類框架中都是首屈一指。它已經得到了成百上千的商業項目的驗證,例如Hadoop的RPC框架Avro就使用了Netty作為底層通信框架,其他的業界主流RPC框架,例如:Dubbo、Google 開源的gRPC、新浪微博開源的Motan、Twitter 開源的 finagle也使用Netty來構建高性能的異步通信能力。另外,阿里巴巴開源的消息中間件RocketMQ也使用Netty作為底層通信框架。
TCP黏包/拆包
TCP是一個“流”協議,所謂流,就是沒有界限的一長串二進制數據。TCP作為傳輸層協議並不不了解上層業務數據的具體含義,它會根據TCP緩沖區的實際情況進行數據包的划分,所以在業務上認為是一個完整的包,可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。
粘包問題的解決策略
由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決。業界的主流協議的解決方案,可以歸納如下:
1. 消息定長,報文大小固定長度,例如每個報文的長度固定為200字節,如果不夠空位補空格;
2. 包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符作為報文分隔符,接收方通過特殊分隔符切分報文區分;
3. 將消息分為消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段;
4. 更復雜的自定義應用層協議。
Netty粘包和拆包解決方案
Netty提供了多個解碼器,可以進行分包的操作,分別是:
* LineBasedFrameDecoder
* DelimiterBasedFrameDecoder(添加特殊分隔符報文來分包)
* FixedLengthFrameDecoder(使用定長的報文來分包)
* LengthFieldBasedFrameDecoder
LineBasedFrameDecoder解碼器
LineBasedFrameDecoder是回車換行解碼器,如果用戶發送的消息以回車換行符作為消息結束的標識,則可以直接使用Netty的LineBasedFrameDecoder對消息進行解碼,只需要在初始化Netty服務端或者客戶端時將LineBasedFrameDecoder正確的添加到ChannelPipeline中即可,不需要自己重新實現一套換行解碼器。
Netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.9.Final</version> </dependency>
1.1 Server端
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Ricky * */ public class LineBasedServer { private Logger logger = LoggerFactory.getLogger(getClass()); public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) logger.info("server bind port:{}", port); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new LineBasedServer().bind(Constants.PORT); } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LineServerHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); private int count = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { count++; String body = (String) msg; logger.info("server read msg:{}, count:{}", body, count); String response = "hello from server"+System.getProperty("line.separator"); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("server caught exception", cause); ctx.close(); } }
1.2 Client
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class LineBasedClient { public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LineBasedFrameDecoder(1024)); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new LineClientHandler()); } }); ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new LineBasedClient().connect(Constants.HOST, Constants.PORT); } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LineClientHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); private int count =0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the message to Server for(int i=0; i<100; i++){ String msg = "hello from client "+i; logger.info("client send message:{}", msg); ctx.writeAndFlush(msg+System.getProperty("line.separator")); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; count++; logger.info("client read msg:{}, count:{}", body, count); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("client caught exception", cause); ctx.close(); } }
DelimiterBasedFrameDecoder解碼器
DelimiterBasedFrameDecoder是分隔符解碼器,用戶可以指定消息結束的分隔符,它可以自動完成以分隔符作為碼流結束標識的消息的解碼。回車換行解碼器實際上是一種特殊的DelimiterBasedFrameDecoder解碼器。
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Ricky * */ public class DelimiterServer { private Logger logger = LoggerFactory.getLogger(getClass()); public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes()))); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new DelimiterServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) logger.info("server bind port:{}", port); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new DelimiterServer().bind(Constants.PORT); } }
Client:
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class DelimiterClient { public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes()))); p.addLast(new StringDecoder()); p.addLast(new StringEncoder()); p.addLast(new DelimiterClientHandler()); } }); ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new DelimiterClient().connect(Constants.HOST, Constants.PORT); } }
FixedLengthFrameDecoder解碼器
FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對消息進行自動解碼,開發者不需要考慮TCP的粘包/拆包等問題,非常實用。
對於定長消息,如果消息實際長度小於定長,則往往會進行補位操作,它在一定程度上導致了空間和資源的浪費。但是它的優點也是非常明顯的,編解碼比較簡單,因此在實際項目中仍然有一定的應用場景。
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Ricky Fung */ public class NettyServer { private Logger logger = LoggerFactory.getLogger(getClass()); public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //配置服務器啟動類 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO))//配置日志輸出 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); //等待服務器退出 f.channel().closeFuture().sync(); } finally { //釋放線程資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ServerHandler extends ChannelInboundHandlerAdapter { private int counter = 0; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("接收客戶端msg:{}", msg); ByteBuf echo = Unpooled.copiedBuffer(String.format("Hello from server:", counter).getBytes()); ctx.writeAndFlush(echo); } } public static void main(String[] args) throws InterruptedException { new NettyServer().bind(Constants.PORT); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Ricky Fung */ public class NettyClient { private Logger logger = LoggerFactory.getLogger(getClass()); public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = b.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } private class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for(int i=0; i<100; i++){ String msg = "hello from client "+i; logger.info("client send message:{}", msg); ctx.writeAndFlush(msg); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("接收服務端msg:{}", msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } public static void main(String[] args) throws InterruptedException { new NettyClient().connect(Constants.HOST, Constants.PORT); } }
LengthFieldBasedFrameDecoder解碼器
大多數的協議(私有或者公有),協議頭中會攜帶長度字段,用於標識消息體或者整包消息的長度,例如SMPP、HTTP協議等。由於基於長度解碼需求的通用性,以及為了降低用戶的協議開發難度,Netty提供了LengthFieldBasedFrameDecoder,自動屏蔽TCP底層的拆包和粘包問題,只需要傳入正確的參數,即可輕松解決“讀半包“問題。
Message.java
import java.nio.charset.Charset; /** * @author Ricky Fung */ public class Message { private final Charset charset = Charset.forName("utf-8"); private byte magicType; private byte type;//消息類型 0xAF 表示心跳包 0xBF 表示超時包 0xCF 業務信息包 private long requestId; //請求id private int length; private String body; public Message(){ } public Message(byte magicType, byte type, long requestId, byte[] data) { this.magicType = magicType; this.type = type; this.requestId = requestId; this.length = data.length; this.body = new String(data, charset); } public Message(byte magicType, byte type, long requestId, String body) { this.magicType = magicType; this.type = type; this.requestId = requestId; this.length = body.getBytes(charset).length; this.body = body; } ...setter/getter }
MessageDecoder.java
import com.mindflow.netty4.unpack.model.Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Ricky Fung */ public class MessageDecoder extends LengthFieldBasedFrameDecoder { private Logger logger = LoggerFactory.getLogger(getClass()); //頭部信息的大小應該是 byte+byte+int = 1+1+8+4 = 14 private static final int HEADER_SIZE = 14; public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in == null) { return null; } if (in.readableBytes() <= HEADER_SIZE) { return null; } in.markReaderIndex(); byte magic = in.readByte(); byte type = in.readByte(); long requestId = in.readLong(); int dataLength = in.readInt(); // FIXME 如果dataLength過大,可能導致問題 if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return null; } byte[] data = new byte[dataLength]; in.readBytes(data); String body = new String(data, "UTF-8"); Message msg = new Message(magic, type, requestId, body); return msg; } }
MessageEncoder.java
import com.mindflow.netty4.unpack.model.Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.Charset; /** * @author Ricky Fung */ public class MessageEncoder extends MessageToByteEncoder<Message> { private final Charset charset = Charset.forName("utf-8"); @Override protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { // out.writeByte(msg.getMagicType()); out.writeByte(msg.getType()); out.writeLong(msg.getRequestId()); byte[] data = msg.getBody().getBytes(charset); out.writeInt(data.length); out.writeBytes(data); } }
服務端:
import com.mindflow.netty4.unpack.model.Message; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Ricky Fung */ public class NettyServer { private Logger logger = LoggerFactory.getLogger(this.getClass()); public void bind(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new MessageDecoder(1<<20, 10, 4)); p.addLast(new MessageEncoder()); p.addLast(new ServerHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture future = b.bind(port).sync(); // (7) logger.info("server bind port:{}", port); // Wait until the server socket is closed. future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ServerHandler extends SimpleChannelInboundHandler<Message> { @Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { logger.info("server read msg:{}", msg); Message resp = new Message(msg.getMagicType(), msg.getType(), msg.getRequestId(), "Hello world from server"); ctx.writeAndFlush(resp); } } public static void main(String[] args) throws Exception { new NettyServer().bind(Constants.PORT); } }
客戶端:
import com.mindflow.netty4.unpack.model.Message; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * @author Ricky Fung */ public class NettyClient { public void connect(String host, int port) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new MessageDecoder(1<<20, 10, 4)); p.addLast(new MessageEncoder()); p.addLast(new ClientHandler()); } }); ChannelFuture future = b.connect(host, port).sync(); future.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS); if(future.channel().isActive()){ for(int i=0; i<100; i++) { String body = "Hello world from client:"+ i; Message msg = new Message((byte) 0XAF, (byte) 0XBF, i, body); future.channel().writeAndFlush(msg); } } future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } private class ClientHandler extends ChannelInboundHandlerAdapter { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("client read msg:{}, ", msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.error("client caught exception", cause); ctx.close(); } } public static void main(String[] args) throws Exception { new NettyClient().connect(Constants.HOST, Constants.PORT); } }
