上一篇 nio簡介 下一篇 netty中級篇(2)
一、為什么選擇Netty
Netty是最流行的框架之一、健壯性、功能、性能、可定制性和可擴展性在同類框架中首屈一指,因此被大規模使用,例如ROCKETMQ的NameSRV,例如Hadoop的Avro,例如Dubbo中的RPC通信等等。。
為什么選擇Netty?
- API簡單;
- 功能強大,預置了選多的編碼功能,支持多種主流協議;
- 定制能力強,通過ChannelHandler對通信框架進行靈活的擴展;
- 性能強;
- 成熟,修改已發現的JDK nio BUG
- 社區活躍
- 經過大規模的商業應用考驗,質量得到驗證。
二、使用Netty開發TimeServer
環境准備: pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>demo</groupId> <artifactId>netty</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.5.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
1. Netty TimeServer
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty啟動Nio服務端的輔助類 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置服務端tcp參數 .childHandler(new ChildChannelHandler()); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 進行阻塞,等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 System.out.println("服務器關閉..."); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new TimeServer().bind(port); } }
- EventLoopGroup Reactor線程組 2個
- ServerBootstrap:Server端輔助工具
- 設置channel: NioServerSocketChannel
- option: 服務端tcp option設置,這里以backlog 1024為例..
- 增加childHandler
- f.channel().closeFuture().sync()表示進行阻塞,等待服務器端鏈路關閉之后main函數才退出
2. TimeServerHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 7 /** 8 * @author lilinfeng 9 * @version 1.0 10 * @date 2014年2月14日 11 */ 12 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 13 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object msg) 17 throws Exception { 18 ByteBuf buf = (ByteBuf) msg; 19 byte[] req = new byte[buf.readableBytes()]; 20 buf.readBytes(req); 21 String body = new String(req, "UTF-8"); 22 System.out.println("The time server receive order : " + body); 23 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( 24 System.currentTimeMillis()).toString() : "BAD ORDER"; 25 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 26 ctx.write(resp); 27 } 28 29 @Override 30 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 31 ctx.flush(); 32 } 33 34 @Override 35 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 36 ctx.close(); 37 } 38 }
(1) 18行做類型轉換,將msg轉換為Netty的ByteBuf對象,這個對象比ByteBuffer更加強大和靈活。
(2) 19行到20行通過ByteBuf的readableBytes獲取緩沖區可讀的字節數,根據可讀的字節數創建byte數組。將緩沖區的內容讀取到byte數組中。
(3) 31行發現了flush方法,其作用是將消息發送隊列中的消息寫入到SocketChannel中發送給對方。從性能上考慮,為了防止頻繁喚醒Selector進行消息發送,Netty的write方法不直接寫入到SocketChannel中,調用write方法只會寫入到緩沖數組中,調用flush方法,才會寫入到SocketChannel中。
(4) 36行的close()是在發生異常后釋放資源
總結: 就是比NIO舒服太多了.
3. Time Client
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 /** 11 * @author lilinfeng 12 * @version 1.0 13 * @date 2014年2月14日 14 */ 15 public class TimeClient { 16 17 public void connect(int port, String host) throws Exception { 18 // 配置客戶端NIO線程組 19 EventLoopGroup group = new NioEventLoopGroup(); 20 try { 21 Bootstrap b = new Bootstrap(); 22 b.group(group).channel(NioSocketChannel.class) 23 .option(ChannelOption.TCP_NODELAY, true) 24 .handler(new ChannelInitializer<SocketChannel>() { 25 @Override 26 public void initChannel(SocketChannel ch) 27 throws Exception { 28 ch.pipeline().addLast(new TimeClientHandler()); 29 } 30 }); 31 32 // 發起異步連接操作 33 ChannelFuture f = b.connect(host, port).sync(); 34 35 // 當代客戶端鏈路關閉 36 f.channel().closeFuture().sync(); 37 } finally { 38 // 優雅退出,釋放NIO線程組 39 group.shutdownGracefully(); 40 } 41 } 42 43 /** 44 * @param args 45 * @throws Exception 46 */ 47 public static void main(String[] args) throws Exception { 48 int port = 8080; 49 if (args != null && args.length > 0) { 50 try { 51 port = Integer.valueOf(args[0]); 52 } catch (NumberFormatException e) { 53 // 采用默認值 54 } 55 } 56 new TimeClient().connect(port, "127.0.0.1"); 57 } 58 }
(1) 19行創建客戶端處理I/O讀寫的NioEventLoopGroup線程組,然后繼續創建輔助類Bootstrap,並且對其配置,此處配置為 NioSocketChannel,然后為其添加Handler。
(2) 這里Handler直接使用匿名內部類
(3) 33行的connect發送異步連接請求,然后阻塞直到關閉。
4. TimeClientHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerAdapter; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 7 import java.util.logging.Logger; 8 9 /** 10 * @author lilinfeng 11 * @version 1.0 12 * @date 2014年2月14日 13 */ 14 public class TimeClientHandler extends ChannelInboundHandlerAdapter { 15 16 private static final Logger logger = Logger 17 .getLogger(TimeClientHandler.class.getName()); 18 19 private final ByteBuf firstMessage; 20 21 /** 22 * Creates a client-side handler. 23 */ 24 public TimeClientHandler() { 25 byte[] req = "QUERY TIME ORDER".getBytes(); 26 firstMessage = Unpooled.buffer(req.length); 27 firstMessage.writeBytes(req); 28 29 } 30 31 @Override 32 public void channelActive(ChannelHandlerContext ctx) { 33 ctx.writeAndFlush(firstMessage); 34 } 35 36 @Override 37 public void channelRead(ChannelHandlerContext ctx, Object msg) 38 throws Exception { 39 ByteBuf buf = (ByteBuf) msg; 40 byte[] req = new byte[buf.readableBytes()]; 41 buf.readBytes(req); 42 String body = new String(req, "UTF-8"); 43 System.out.println("Now is : " + body); 44 } 45 46 @Override 47 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 48 // 釋放資源 49 logger.warning("Unexpected exception from downstream : " 50 + cause.getMessage()); 51 ctx.close(); 52 } 53 }
這里重點關注3個方法: channelActive channelRead和exceptionCaught。
(1) 當客戶端和服務器端成功創建鏈路,調用channelActive方法,發送查詢時間的指令給服務端,調用writeAndFlush方法發送數據。
(2) 39行開始調用channelRead,讀取數據,49行處理異常時釋放資源即可。
三、TCP 粘包/拆包問題的解決之道
1、TCP得粘包和拆包問題
- TCP是一個流協議
- TCP底層不了解業務數據含義,即不知道多少個字節算是業務上的一整體數據
- 因此業務上認為,一個完整的包會被TCP拆分為多個包進行發送,也有可能將多個小的包封裝成一個大包進行發送、
用下圖進行描述,假設client發送了2個包,D1和D2,服務器端讀到的數據是不確定的
存在4種可能:
server 分2次,分別讀到D1,D2,完美巧合,沒有粘包和拆包
server一次讀到D1,D2,D1和D2粘在一起,稱為粘包
server分2次,第一次讀到D1和D2的部分內容,第二次讀到了D2的剩余內容,拆包
server分2次,第一次讀到D1的部分內容D1_1,第二次讀到D1剩下的內容D1_2和完整的D2。
如果此時服務器端TCP接收的滑窗非常的小、而且數據包D1和D2都比較大,很有可能發生第5種可能性,服務器端多次才能將D1和D2接收完全,期間發生多次拆包...即上4種情況的多次組合...
下面我們來分析一下原因:
3個原因:
(1) 應用程序write寫入的字節大於套接口(scoket)發送緩沖的大小;
(2) 進行MSS大小的TCP分段;
(3) 以太網幀的payload大於MTU進行IP分片
總結就是:不可避免...
解決思路:
(1) 定長數據,例如每個報文200bytes,不夠空格補充...
(2) 在包圍增加回車換行符或者其他的特殊字符進行分割,例如FTP協議
(3) 將消息分為消息頭和消息體,消息頭中包含消息總長度(或者消息體長度)content-length,通常的設計思路為消息頭的第一個字段用int32來表示消息的總長度;
(4) 更復雜的應用層協議
2. 下面我們來模擬未考慮TCP粘包問題導致功能異常
修改上面的代碼:
修改TimeServerHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 /** 7 * @author lilinfeng 8 * @version 1.0 9 * @date 2014年2月14日 10 */ 11 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 12 13 private int counter; 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object msg) 17 throws Exception { 18 ByteBuf buf = (ByteBuf) msg; 19 byte[] req = new byte[buf.readableBytes()]; 20 buf.readBytes(req); 21 String body = new String(req, "UTF-8").substring(0, req.length 22 - System.getProperty("line.separator").length()); 23 System.out.println("The time server receive order : " + body 24 + " ; the counter is : " + ++counter); 25 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( 26 System.currentTimeMillis()).toString() : "BAD ORDER"; 27 currentTime = currentTime + System.getProperty("line.separator"); 28 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 29 ctx.writeAndFlush(resp); 30 } 31 32 @Override 33 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 34 ctx.close(); 35 } 36 }
主要是增加了一個counter進行計數..
修改TimeClientHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.logging.Logger; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(TimeClientHandler.class.getName()); private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public TimeClientHandler() { byte[] req = "QUERY TIME ORDER".getBytes(); firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("Now is : " + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 logger.warning("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
主要是進行100次連續的發送數據...
由於tcp粘包拆包有一定的隨機性,所以每次的結果可能不同,其中一次結果大致上是:
Server端打印:
QUERY TIME ORDER
....
the counter is : 2
Client端打印:
Now is : Thu Dec 15 15:11:22 CST 2016
BAD ORDER
BAD ORDER
; the counter is : 1
結果表明:client發送了100條消息,但是server是按照2次接收,只返回2條應答,但是client上的counter為1表明只client也接收了一次,說明這2條也進行了粘包。
3. 解決TCP粘包的TimeServer
TimeServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioServerSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 /** 13 * @author lilinfeng 14 * @version 1.0 15 * @date 2014年2月14日 16 */ 17 public class TimeServer { 18 19 public void bind(int port) throws Exception { 20 // 配置服務端的NIO線程組 21 EventLoopGroup bossGroup = new NioEventLoopGroup(); 22 EventLoopGroup workerGroup = new NioEventLoopGroup(); 23 try { 24 ServerBootstrap b = new ServerBootstrap(); 25 b.group(bossGroup, workerGroup) 26 .channel(NioServerSocketChannel.class) 27 .option(ChannelOption.SO_BACKLOG, 1024) 28 .childHandler(new ChildChannelHandler()); 29 // 綁定端口,同步等待成功 30 ChannelFuture f = b.bind(port).sync(); 31 32 // 等待服務端監聽端口關閉 33 f.channel().closeFuture().sync(); 34 } finally { 35 // 優雅退出,釋放線程池資源 36 bossGroup.shutdownGracefully(); 37 workerGroup.shutdownGracefully(); 38 } 39 } 40 41 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 42 @Override 43 protected void initChannel(SocketChannel arg0) throws Exception { 44 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 45 arg0.pipeline().addLast(new StringDecoder()); 46 arg0.pipeline().addLast(new TimeServerHandler()); 47 } 48 } 49 50 /** 51 * @param args 52 * @throws Exception 53 */ 54 public static void main(String[] args) throws Exception { 55 int port = 8080; 56 if (args != null && args.length > 0) { 57 try { 58 port = Integer.valueOf(args[0]); 59 } catch (NumberFormatException e) { 60 // 采用默認值 61 } 62 } 63 new TimeServer().bind(port); 64 } 65 }
重點看44行,增加了2個解碼器: LineBasedFrameDecoder和StringDecoder。
繼續看TimeServerHandler
1 import io.netty.buffer.ByteBuf; 2 import io.netty.buffer.Unpooled; 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 /** 7 * @author lilinfeng 8 * @version 1.0 9 * @date 2014年2月14日 10 */ 11 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 12 13 private int counter; 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object msg) 17 throws Exception { 18 String body = (String) msg; 19 System.out.println("The time server receive order : " + body 20 + " ; the counter is : " + ++counter); 21 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date( 22 System.currentTimeMillis()).toString() : "BAD ORDER"; 23 currentTime = currentTime + System.getProperty("line.separator"); 24 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 25 ctx.writeAndFlush(resp); 26 } 27 28 @Override 29 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 30 ctx.close(); 31 } 32 }
看18行,直接獲取之后不是ByteBuf,而直接是一個String對象,代碼非常簡潔。
TimeClient
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import io.netty.handler.codec.LineBasedFrameDecoder; 10 import io.netty.handler.codec.string.StringDecoder; 11 12 /** 13 * @author lilinfeng 14 * @version 1.0 15 * @date 2014年2月14日 16 */ 17 public class TimeClient { 18 19 public void connect(int port, String host) throws Exception { 20 // 配置客戶端NIO線程組 21 EventLoopGroup group = new NioEventLoopGroup(); 22 try { 23 Bootstrap b = new Bootstrap(); 24 b.group(group).channel(NioSocketChannel.class) 25 .option(ChannelOption.TCP_NODELAY, true) 26 .handler(new ChannelInitializer<SocketChannel>() { 27 @Override 28 public void initChannel(SocketChannel ch) 29 throws Exception { 30 ch.pipeline().addLast( 31 new LineBasedFrameDecoder(1024)); 32 ch.pipeline().addLast(new StringDecoder()); 33 ch.pipeline().addLast(new TimeClientHandler()); 34 } 35 }); 36 37 // 發起異步連接操作 38 ChannelFuture f = b.connect(host, port).sync(); 39 40 // 當代客戶端鏈路關閉 41 f.channel().closeFuture().sync(); 42 } finally { 43 // 優雅退出,釋放NIO線程組 44 group.shutdownGracefully(); 45 } 46 } 47 48 /** 49 * @param args 50 * @throws Exception 51 */ 52 public static void main(String[] args) throws Exception { 53 int port = 8080; 54 if (args != null && args.length > 0) { 55 try { 56 port = Integer.valueOf(args[0]); 57 } catch (NumberFormatException e) { 58 // 采用默認值 59 } 60 } 61 new TimeClient().connect(port, "127.0.0.1"); 62 } 63 }
類似TimeServer增加了2個解碼器
再看TimeClientHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.logging.Logger; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class TimeClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(TimeClientHandler.class.getName()); private int counter; private byte[] req; /** * Creates a client-side handler. */ public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")) .getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) { ByteBuf message = null; for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now is : " + body + " ; the counter is : " + ++counter); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 釋放資源 logger.warning("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
直接運行發現完全符合我們需求
4. 分析LineBaseFrameDecoder和StringDecoder
LineBasedFrameDecoder的工作原理非常簡單:
(1) 遍歷ByteBuf中的可讀字節,判斷看是否有"\n"或者"\r\n",如果有,就以此為結束位置,從可讀索引到結束位置區間的字節就組成了一行
(2) 是一個以換行符為結束標志的解碼器,支持攜帶結束符或者不攜帶結束符2種方式,同時支持配置單行的最大長度。
(3) 超過單行最大長度直接拋異常
StringDecoder的非常簡單:
(1) 將接收的對象轉換為字符串
(2) 繼續調用后面的Handler
因此:
LineBasedFrameDecoder和StringDecoder組合在一起就是行切換文件解碼器。
四、分割符解碼器的應用
使用DelimiterBasedFrameDecoder即可...
1. EohoServer
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.buffer.ByteBuf; 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 import io.netty.handler.logging.LogLevel; 14 import io.netty.handler.logging.LoggingHandler; 15 16 /** 17 * @author lilinfeng 18 * @version 1.0 19 * @date 2014年2月14日 20 */ 21 public class EchoServer { 22 public void bind(int port) throws Exception { 23 // 配置服務端的NIO線程組 24 EventLoopGroup bossGroup = new NioEventLoopGroup(); 25 EventLoopGroup workerGroup = new NioEventLoopGroup(); 26 try { 27 ServerBootstrap b = new ServerBootstrap(); 28 b.group(bossGroup, workerGroup) 29 .channel(NioServerSocketChannel.class) 30 .option(ChannelOption.SO_BACKLOG, 100) 31 .handler(new LoggingHandler(LogLevel.INFO)) 32 .childHandler(new ChannelInitializer<SocketChannel>() { 33 @Override 34 public void initChannel(SocketChannel ch) 35 throws Exception { 36 ByteBuf delimiter = Unpooled.copiedBuffer("$_" 37 .getBytes()); 38 ch.pipeline().addLast( 39 new DelimiterBasedFrameDecoder(1024, 40 delimiter)); 41 ch.pipeline().addLast(new StringDecoder()); 42 ch.pipeline().addLast(new EchoServerHandler()); 43 } 44 }); 45 46 // 綁定端口,同步等待成功 47 ChannelFuture f = b.bind(port).sync(); 48 49 // 等待服務端監聽端口關閉 50 f.channel().closeFuture().sync(); 51 } finally { 52 // 優雅退出,釋放線程池資源 53 bossGroup.shutdownGracefully(); 54 workerGroup.shutdownGracefully(); 55 } 56 } 57 58 public static void main(String[] args) throws Exception { 59 int port = 8080; 60 if (args != null && args.length > 0) { 61 try { 62 port = Integer.valueOf(args[0]); 63 } catch (NumberFormatException e) { 64 // 采用默認值 65 } 66 } 67 new EchoServer().bind(port); 68 } 69 }
(1) 重點在於38行的DelimiterBasedFrameDecoder, 與上面的換行分割符類似,只是可以自定義特殊符號
(2) DelimiterBasedFrameDecoder有2個參數,一個為單行最大長度,一個為自定義符號對象
(3) 如果到達長度仍然沒有查詢到,就拋出TooLongFrameException異常
2. EchoServerHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { int counter = 0; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("This is " + ++counter + " times receive client : [" + body + "]"); body += "$_"; ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); ctx.writeAndFlush(echo); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 發生異常,關閉鏈路 } }
非常簡單直接打印再write即可... 由此也可以看出netty框架比較干凈的分離出來了業務邏輯代碼。
3. Client端和ClientHandler基本類似
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class EchoClient { public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_" .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoClientHandler()); } }); // 發起異步連接操作 ChannelFuture f = b.connect(host, port).sync(); // 當代客戶端鏈路關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new EchoClient().connect(port, "127.0.0.1"); } }
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private int counter; static final String ECHO_REQ = "Hi, Lilinfeng. Welcome to Netty.$_"; /** * Creates a client-side handler. */ public EchoClientHandler() { } @Override public void channelActive(ChannelHandlerContext ctx) { // ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.buffer(ECHO_REQ // .getBytes().length); // buf.writeBytes(ECHO_REQ.getBytes()); for (int i = 0; i < 10; i++) { ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes())); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("This is " + ++counter + " times receive server : [" + msg + "]"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
運行代碼,符合預期..
五、定長解碼器
1. 開發服務端
非常簡單,直接上代碼:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ public class EchoServer { public void bind(int port) throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new FixedLengthFrameDecoder(20)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); // 綁定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { // 優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } new EchoServer().bind(port); } }
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @author lilinfeng * @version 1.0 * @date 2014年2月14日 */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive client : [" + msg + "]"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 發生異常,關閉鏈路 } }
2. 使用telnet進行訪問
(1) 我使用的是Xshell,直接命令
(2) telnet 127.0.0.1 8080
(3) 再隨便輸入字符,發現每20個字符,服務端顯示一次,符合預期