一、概述
Netty是一個Java的開源框架。提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
Netty是一個NIO客戶端,服務端框架。允許快速簡單的開發網絡應用程序。例如:服務端和客戶端之間的協議,它簡化了網絡編程規范。
二、NIO開發的問題
1、NIO類庫和API復雜,使用麻煩。
2、需要具備Java多線程編程能力(涉及到Reactor模式)。
3、客戶端斷線重連、網絡不穩定、半包讀寫、失敗緩存、網絡阻塞和異常碼流等問題處理難度非常大
4、存在部分BUG
NIO進行服務器開發的步驟很復雜有以下步驟:
1、創建ServerSocketChannel,配置為非阻塞模式;
2、綁定監聽,配置TCP參數;
3、創建一個獨立的IO線程,用於輪詢多路復用器Selector;
4、創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽Accept事件;
5、啟動IO線程,在循環中執行Select.select()方法,輪詢就緒的Channel;
6、當輪詢到處於就緒狀態的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態,說明有新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端;
7、設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP參數;
8、將SocketChannel注冊到Selector上,監聽READ事件;
9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的准備就緒的數據包需要讀取,則構造ByteBuffer對象,讀取數據包;
10、如果輪詢的Channel為OP_WRITE,則說明還有數據沒有發送完成,需要繼續發送。
三、Netty的優點
1、API使用簡單,開發門檻低;
2、功能強大,預置了多種編解碼功能,支持多種主流協議;
3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展;
4、性能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優;
5、成熟、穩定,Netty修復了已經發現的NIO所有BUG;
6、社區活躍;
7、經歷了很多商用項目的考驗。
四、Netty使用demo示例
服務端TimeServer.java
1 package com.studyio.netty; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 /** 13 * 14 * @author lgs 15 * 服務端 16 */ 17 public class TimeServer { 18 19 public static void main(String[] args) throws Exception { 20 int port=8080; //服務端默認端口 21 new TimeServer().bind(port); 22 } 23 24 public void bind(int port) throws Exception{ 25 //1用於服務端接受客戶端的連接 線程池里面只有一個線程 26 EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); 27 //2用於進行SocketChannel的網絡讀寫 線程池有多個線程 28 EventLoopGroup workerGroup = new NioEventLoopGroup(); 29 try { 30 //Netty用於啟動NIO服務器的輔助啟動類 31 ServerBootstrap sb = new ServerBootstrap(); 32 //將兩個NIO線程組傳入輔助啟動類中 33 sb.group(acceptorGroup, workerGroup) 34 //設置創建的Channel為NioServerSocketChannel類型 35 .channel(NioServerSocketChannel.class) 36 //配置NioServerSocketChannel的TCP參數 37 .option(ChannelOption.SO_BACKLOG, 1024) 38 //設置綁定IO事件的處理類 39 .childHandler(new ChannelInitializer<SocketChannel>() { 40 //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 41 @Override 42 protected void initChannel(SocketChannel arg0) throws Exception { 43 44 arg0.pipeline().addLast(new TimeServerHandler()); 45 } 46 }); 47 //綁定端口,同步等待成功(sync():同步阻塞方法,等待bind操作完成才繼續) 48 //ChannelFuture主要用於異步操作的通知回調 49 ChannelFuture cf = sb.bind(port).sync(); 50 System.out.println("服務端啟動在8080端口。"); 51 //等待服務端監聽端口關閉 52 cf.channel().closeFuture().sync(); 53 } finally { 54 //優雅退出,釋放線程池資源 55 acceptorGroup.shutdownGracefully(); 56 workerGroup.shutdownGracefully(); 57 } 58 } 59 }
服務端用於對網絡資源進行讀寫操作TimeServerHandler.java
1 package com.studyio.netty; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * @readme 服務端用於對網絡資源進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。 14 * 15 */ 16 public class TimeServerHandler extends ChannelHandlerAdapter { 17 18 @Override 19 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 20 ByteBuf buf = (ByteBuf) msg; 21 //buf.readableBytes():獲取緩沖區中可讀的字節數; 22 //根據可讀字節數創建數組 23 byte[] req = new byte[buf.readableBytes()]; 24 buf.readBytes(req); 25 String body = new String(req, "UTF-8"); 26 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body); 27 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 28 29 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 30 //將待發送的消息放到發送緩存數組中 31 ctx.writeAndFlush(resp); 32 } 33 }
客戶端TimeClient.java
1 package com.studyio.netty; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 12 /** 13 * 14 * @author lgs 15 * 客戶端 16 */ 17 public class TimeClient { 18 public static void main(String[] args) throws Exception { 19 int port=8080; //服務端默認端口 20 new TimeClient().connect(port, "127.0.0.1"); 21 } 22 public void connect(int port, String host) throws Exception{ 23 //配置客戶端NIO線程組 24 EventLoopGroup group = new NioEventLoopGroup(); 25 try { 26 Bootstrap bs = new Bootstrap(); 27 bs.group(group) 28 .channel(NioSocketChannel.class) 29 .option(ChannelOption.TCP_NODELAY, true) 30 .handler(new ChannelInitializer<SocketChannel>() { 31 @Override 32 //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 33 protected void initChannel(SocketChannel arg0) throws Exception { 34 arg0.pipeline().addLast(new TimeClientHandler()); 35 } 36 }); 37 //發起異步連接操作 38 ChannelFuture cf = bs.connect(host, port).sync(); 39 //等待客戶端鏈路關閉 40 cf.channel().closeFuture().sync(); 41 } finally { 42 //優雅退出,釋放NIO線程組 43 group.shutdownGracefully(); 44 } 45 } 46 }
客戶端向服務端發送數據和接收服務端數據TimeClientHandler.java
1 package com.studyio.netty; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 客戶端向服務端發送數據和接收服務端數據 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 @Override 16 //向服務器發送指令 17 public void channelActive(ChannelHandlerContext ctx) throws Exception { 18 for (int i = 0; i < 5; i++) { 19 byte[] req = "QUERY TIME ORDER".getBytes(); 20 ByteBuf firstMessage = Unpooled.buffer(req.length); 21 firstMessage.writeBytes(req); 22 ctx.writeAndFlush(firstMessage); 23 } 24 } 25 26 @Override 27 //接收服務器的響應 28 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 29 ByteBuf buf = (ByteBuf) msg; 30 //buf.readableBytes():獲取緩沖區中可讀的字節數; 31 //根據可讀字節數創建數組 32 byte[] req = new byte[buf.readableBytes()]; 33 buf.readBytes(req); 34 String body = new String(req, "UTF-8"); 35 System.out.println("Now is : "+body); 36 } 37 38 @Override 39 //異常處理 40 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 41 //釋放資源 42 ctx.close(); 43 } 44 45 }
五、 粘包/拆包問題
TCP粘包拆包問題示例圖:
說明:
假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到的字節數是不確定的,可能存在以下4種情況。
1、服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
2、服務端一次接收到了兩個數據包,D1和D2粘合在一起,被稱為TCP粘包;
3、服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余部分內容,這被稱為TCP拆包;
4、服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余內容D1_1和D2包的完整內容;
如果此時服務器TCP接收滑窗非常小,而數據包D1和D2比較大,很有可能發生第五種情況,既服務端分多次才能將D1和D2包接收完全,期間發生多次拆包;
總結:
粘包:客戶端發送的數據D2,D1可能被合並成一個D2D1發送到服務端
拆包:客戶端發送的數據D2,D1可能被拆分成D2_1,D2_2D1或者D2D1_1,D1_2發送到服務端
粘包拆包問題的解決策略
由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可歸納如下:
1、消息定長,例如每個報文的大小為固定長度200字節,如果不夠,空位補空格;
FixedLengthFrameDecoder
是固定長度解碼器,能夠按照指定的長度對消息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題。
服務端:
TimeServer.java
1 package com.studyio.nettyFixedLength; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.FixedLengthFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 處理粘包/拆包問題-消息定長,固定長度處理 18 */ 19 public class TimeServer { 20 21 public static void main(String[] args) throws Exception { 22 int port=8080; //服務端默認端口 23 new TimeServer().bind(port); 24 } 25 public void bind(int port) throws Exception{ 26 //Reactor線程組 27 //1用於服務端接受客戶端的連接 28 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 29 //2用於進行SocketChannel的網絡讀寫 30 EventLoopGroup workerGroup = new NioEventLoopGroup(); 31 try { 32 //Netty用於啟動NIO服務器的輔助啟動類 33 ServerBootstrap sb = new ServerBootstrap(); 34 //將兩個NIO線程組傳入輔助啟動類中 35 sb.group(acceptorGroup, workerGroup) 36 //設置創建的Channel為NioServerSocketChannel類型 37 .channel(NioServerSocketChannel.class) 38 //配置NioServerSocketChannel的TCP參數 39 .option(ChannelOption.SO_BACKLOG, 1024) 40 //設置綁定IO事件的處理類 41 .childHandler(new ChannelInitializer<SocketChannel>() { 42 @Override 43 protected void initChannel(SocketChannel arg0) throws Exception { 44 //處理粘包/拆包問題-固定長度處理 45 arg0.pipeline().addLast(new FixedLengthFrameDecoder(16)); 46 arg0.pipeline().addLast(new StringDecoder()); 47 arg0.pipeline().addLast(new TimeServerHandler()); 48 } 49 }); 50 //綁定端口,同步等待成功(sync():同步阻塞方法) 51 //ChannelFuture主要用於異步操作的通知回調 52 ChannelFuture cf = sb.bind(port).sync(); 53 54 //等待服務端監聽端口關閉 55 cf.channel().closeFuture().sync(); 56 } finally { 57 //優雅退出,釋放線程池資源 58 acceptorGroup.shutdownGracefully(); 59 workerGroup.shutdownGracefully(); 60 } 61 } 62 }
TimeServerHandler.java
1 package com.studyio.nettyFixedLength; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。 12 * 13 */ 14 public class TimeServerHandler extends ChannelHandlerAdapter { 15 16 private int counter; 17 @Override 18 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 19 String body = (String) msg; 20 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 21 String currentTime = body; 22 23 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 24 25 //將待發送的消息放到發送緩存數組中 26 ctx.writeAndFlush(resp); 27 } 28 }
客戶端:
TimeClient.java
1 package com.studyio.nettyFixedLength; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.codec.FixedLengthFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 處理粘包/拆包問題-消息定長,固定長度處理 18 */ 19 public class TimeClient { 20 public static void main(String[] args) throws Exception { 21 int port=8080; //服務端默認端口 22 new TimeClient().connect(port, "127.0.0.1"); 23 } 24 public void connect(int port, String host) throws Exception{ 25 //配置客戶端NIO線程組 26 EventLoopGroup group = new NioEventLoopGroup(); 27 try { 28 Bootstrap bs = new Bootstrap(); 29 bs.group(group) 30 .channel(NioSocketChannel.class) 31 .option(ChannelOption.TCP_NODELAY, true) 32 .handler(new ChannelInitializer<SocketChannel>() { 33 @Override 34 //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 35 protected void initChannel(SocketChannel arg0) throws Exception { 36 //處理粘包/拆包問題-固定長度處理 37 arg0.pipeline().addLast(new FixedLengthFrameDecoder(16)); 38 arg0.pipeline().addLast(new StringDecoder()); 39 40 arg0.pipeline().addLast(new TimeClientHandler()); 41 } 42 }); 43 //發起異步連接操作 44 ChannelFuture cf = bs.connect(host, port).sync(); 45 //等待客戶端鏈路關閉 46 cf.channel().closeFuture().sync(); 47 } finally { 48 //優雅退出,釋放NIO線程組 49 group.shutdownGracefully(); 50 } 51 } 52 }
TimeClientHandler.java
1 package com.studyio.nettyFixedLength; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 private int counter; 16 private byte[] req; 17 18 @Override 19 //向服務器發送指令 20 public void channelActive(ChannelHandlerContext ctx) throws Exception { 21 ByteBuf message=null; 22 //模擬一百次請求,發送重復內容 23 for (int i = 0; i < 100; i++) { 24 req = ("QUERY TIME ORDER").getBytes(); 25 message=Unpooled.buffer(req.length); 26 message.writeBytes(req); 27 ctx.writeAndFlush(message); 28 } 29 } 30 31 @Override 32 //接收服務器的響應 33 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 34 String body = (String) msg; 35 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 36 } 37 38 @Override 39 //異常處理 40 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 41 //釋放資源 42 ctx.close(); 43 } 44 45 }
2、在包尾增加回車換行符進行分割,例如FTP協議;
2.1 處理粘包/拆包問題-回車換行符進行分割
LineBasedFrameDecoder
服務端:
TimeServer.java
1 package com.studyio.nettyLine; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 處理粘包/拆包問題-回車換行符進行分割 18 * 19 */ 20 public class TimeServer { 21 22 public static void main(String[] args) throws Exception { 23 int port=8080; //服務端默認端口 24 new TimeServer().bind(port); 25 } 26 public void bind(int port) throws Exception{ 27 //Reactor線程組 28 //1用於服務端接受客戶端的連接 29 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 30 //2用於進行SocketChannel的網絡讀寫 31 EventLoopGroup workerGroup = new NioEventLoopGroup(); 32 try { 33 //Netty用於啟動NIO服務器的輔助啟動類 34 ServerBootstrap sb = new ServerBootstrap(); 35 //將兩個NIO線程組傳入輔助啟動類中 36 sb.group(acceptorGroup, workerGroup) 37 //設置創建的Channel為NioServerSocketChannel類型 38 .channel(NioServerSocketChannel.class) 39 //配置NioServerSocketChannel的TCP參數 40 .option(ChannelOption.SO_BACKLOG, 1024) 41 //設置綁定IO事件的處理類 42 .childHandler(new ChannelInitializer<SocketChannel>() { 43 @Override 44 protected void initChannel(SocketChannel arg0) throws Exception { 45 //處理粘包/拆包問題-換行符處理 46 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 47 arg0.pipeline().addLast(new StringDecoder()); 48 49 arg0.pipeline().addLast(new TimeServerHandler()); 50 } 51 }); 52 //綁定端口,同步等待成功(sync():同步阻塞方法) 53 //ChannelFuture主要用於異步操作的通知回調 54 ChannelFuture cf = sb.bind(port).sync(); 55 56 //等待服務端監聽端口關閉 57 cf.channel().closeFuture().sync(); 58 } finally { 59 //優雅退出,釋放線程池資源 60 acceptorGroup.shutdownGracefully(); 61 workerGroup.shutdownGracefully(); 62 } 63 } 64 }
TimeServerHandler.java
1 package com.studyio.nettyLine; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。 14 * 處理粘包/拆包問題-回車換行符進行分割 15 */ 16 public class TimeServerHandler extends ChannelHandlerAdapter { 17 18 private int counter; 19 @Override 20 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 21 // ByteBuf buf = (ByteBuf) msg; 22 // //buf.readableBytes():獲取緩沖區中可讀的字節數; 23 // //根據可讀字節數創建數組 24 // byte[] req = new byte[buf.readableBytes()]; 25 // buf.readBytes(req); 26 // String body = new String(req, "UTF-8"); 27 String body = (String) msg; 28 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 29 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 30 currentTime = currentTime + System.getProperty("line.separator"); 31 32 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 33 //將待發送的消息放到發送緩存數組中 34 ctx.writeAndFlush(resp); 35 } 36 37 }
客戶端:
TimeClient.java
1 package com.studyio.nettyLine; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 /** 15 * 16 * @author lgs 17 * 處理粘包/拆包問題-回車換行符進行分割 18 * 19 */ 20 public class TimeClient { 21 public static void main(String[] args) throws Exception { 22 int port=8080; //服務端默認端口 23 new TimeClient().connect(port, "127.0.0.1"); 24 } 25 public void connect(int port, String host) throws Exception{ 26 //配置客戶端NIO線程組 27 EventLoopGroup group = new NioEventLoopGroup(); 28 try { 29 Bootstrap bs = new Bootstrap(); 30 bs.group(group) 31 .channel(NioSocketChannel.class) 32 .option(ChannelOption.TCP_NODELAY, true) 33 .handler(new ChannelInitializer<SocketChannel>() { 34 @Override 35 //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 36 protected void initChannel(SocketChannel arg0) throws Exception { 37 //處理粘包/拆包問題-換行符處理 38 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 39 arg0.pipeline().addLast(new StringDecoder()); 40 41 arg0.pipeline().addLast(new TimeClientHandler()); 42 } 43 }); 44 //發起異步連接操作 45 ChannelFuture cf = bs.connect(host, port).sync(); 46 //等待客戶端鏈路關閉 47 cf.channel().closeFuture().sync(); 48 } finally { 49 //優雅退出,釋放NIO線程組 50 group.shutdownGracefully(); 51 } 52 } 53 }
TimeClientHandler.java
1 package com.studyio.nettyLine; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 處理粘包/拆包問題-回車換行符進行分割 12 */ 13 public class TimeClientHandler extends ChannelHandlerAdapter { 14 15 private int counter; 16 private byte[] req; 17 18 @Override 19 //向服務器發送指令 20 public void channelActive(ChannelHandlerContext ctx) throws Exception { 21 ByteBuf message=null; 22 //模擬一百次請求,發送重復內容 23 for (int i = 0; i < 200; i++) { 24 //回車換行符System.getProperty("line.separator") 25 req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes(); 26 message=Unpooled.buffer(req.length); 27 message.writeBytes(req); 28 ctx.writeAndFlush(message); 29 } 30 31 } 32 33 @Override 34 //接收服務器的響應 35 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 36 // ByteBuf buf = (ByteBuf) msg; 37 // //buf.readableBytes():獲取緩沖區中可讀的字節數; 38 // //根據可讀字節數創建數組 39 // byte[] req = new byte[buf.readableBytes()]; 40 // buf.readBytes(req); 41 // String body = new String(req, "UTF-8"); 42 String body = (String) msg; 43 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 44 } 45 46 @Override 47 //異常處理 48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 49 //釋放資源 50 ctx.close(); 51 } 52 53 }
2.2 處理粘包/拆包問題自定義分隔符進行分割
DelimiterBasedFrameDecoder
實現自定義分隔符作為消息的結束標志,完成解碼。
服務端:
TimeServer.java
1 package com.studyio.nettyDelimiter; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelFuture; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelOption; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.SocketChannel; 12 import io.netty.channel.socket.nio.NioServerSocketChannel; 13 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 14 import io.netty.handler.codec.string.StringDecoder; 15 16 /** 17 * 18 * @author lgs 19 * 處理粘包/拆包問題-定義分隔符 20 * 21 */ 22 public class TimeServer { 23 public static void main(String[] args) throws Exception { 24 int port=8080; //服務端默認端口 25 new TimeServer().bind(port); 26 } 27 28 public void bind(int port) throws Exception{ 29 //Reactor線程組 30 //1用於服務端接受客戶端的連接 31 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); 32 //2用於進行SocketChannel的網絡讀寫 33 EventLoopGroup workerGroup = new NioEventLoopGroup(); 34 try { 35 //Netty用於啟動NIO服務器的輔助啟動類 36 ServerBootstrap sb = new ServerBootstrap(); 37 //將兩個NIO線程組傳入輔助啟動類中 38 sb.group(acceptorGroup, workerGroup) 39 //設置創建的Channel為NioServerSocketChannel類型 40 .channel(NioServerSocketChannel.class) 41 //配置NioServerSocketChannel的TCP參數 42 .option(ChannelOption.SO_BACKLOG, 1024) 43 //設置綁定IO事件的處理類 44 .childHandler(new ChannelInitializer<SocketChannel>() { 45 @Override 46 protected void initChannel(SocketChannel arg0) throws Exception { 47 //處理粘包/拆包問題-自定義分隔符處理 48 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 49 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 50 arg0.pipeline().addLast(new StringDecoder()); 51 arg0.pipeline().addLast(new TimeServerHandler()); 52 } 53 }); 54 //綁定端口,同步等待成功(sync():同步阻塞方法) 55 //ChannelFuture主要用於異步操作的通知回調 56 ChannelFuture cf = sb.bind(port).sync(); 57 58 //等待服務端監聽端口關閉 59 cf.channel().closeFuture().sync(); 60 } finally { 61 //優雅退出,釋放線程池資源 62 acceptorGroup.shutdownGracefully(); 63 workerGroup.shutdownGracefully(); 64 } 65 } 66 }
TimeServerHandler.java
1 package com.studyio.nettyDelimiter; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 12 * @author lgs 13 * 處理粘包/拆包問題-定義分隔符 14 * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。 15 * 16 */ 17 public class TimeServerHandler extends ChannelHandlerAdapter { 18 19 private int counter; 20 @Override 21 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 22 String body = (String) msg; 23 System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter); 24 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 25 //處理粘包/拆包問題-定義分隔符 26 currentTime += "$_"; 27 28 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 29 //將待發送的消息放到發送緩存數組中 30 ctx.writeAndFlush(resp); 31 } 32 }
客戶端:
TimeClient.java
1 package com.studyio.nettyDelimiter; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelFuture; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelOption; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.SocketChannel; 12 import io.netty.channel.socket.nio.NioSocketChannel; 13 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 14 import io.netty.handler.codec.string.StringDecoder; 15 16 /** 17 * 18 * @author lgs 19 * 處理粘包/拆包問題-定義分隔符 20 * 21 */ 22 public class TimeClient { 23 public static void main(String[] args) throws Exception { 24 int port=8080; //服務端默認端口 25 new TimeClient().connect(port, "127.0.0.1"); 26 } 27 public void connect(int port, String host) throws Exception{ 28 //配置客戶端NIO線程組 29 EventLoopGroup group = new NioEventLoopGroup(); 30 try { 31 Bootstrap bs = new Bootstrap(); 32 bs.group(group) 33 .channel(NioSocketChannel.class) 34 .option(ChannelOption.TCP_NODELAY, true) 35 .handler(new ChannelInitializer<SocketChannel>() { 36 @Override 37 //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件 38 protected void initChannel(SocketChannel arg0) throws Exception { 39 //處理粘包/拆包問題-自定義分隔符處理 40 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 41 arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); 42 arg0.pipeline().addLast(new StringDecoder()); 43 44 arg0.pipeline().addLast(new TimeClientHandler()); 45 } 46 }); 47 //發起異步連接操作 48 ChannelFuture cf = bs.connect(host, port).sync(); 49 //等待客戶端鏈路關閉 50 cf.channel().closeFuture().sync(); 51 } finally { 52 //優雅退出,釋放NIO線程組 53 group.shutdownGracefully(); 54 } 55 } 56 }
TimeClientHandler.java
1 package com.studyio.nettyDelimiter; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 8 /** 9 * 10 * @author lgs 11 * 處理粘包/拆包問題-定義分隔符 12 * 13 */ 14 public class TimeClientHandler extends ChannelHandlerAdapter { 15 16 private int counter; 17 private byte[] req; 18 19 @Override 20 //向服務器發送指令 21 public void channelActive(ChannelHandlerContext ctx) throws Exception { 22 ByteBuf message=null; 23 //模擬一百次請求,發送重復內容 24 for (int i = 0; i < 200; i++) { 25 //處理粘包/拆包問題-定義分隔符 26 req = ("QUERY TIME ORDER"+"$_").getBytes(); 27 message=Unpooled.buffer(req.length); 28 message.writeBytes(req); 29 ctx.writeAndFlush(message); 30 } 31 32 } 33 34 @Override 35 //接收服務器的響應 36 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 37 String body = (String) msg; 38 System.out.println("Now is : "+body+". the counter is : "+ ++counter); 39 } 40 41 @Override 42 //異常處理 43 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 44 //釋放資源 45 ctx.close(); 46 } 47 48 }
3、將消息分為消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總程度;
4、更復雜的應用層協議;
六、Netty高性能的原因
1、異步非阻塞通信
在IO編程過程中,當需要同時處理多個客戶端接入請求時,可以利用多線程或者IO多路復用技術進行處理。IO多路復用技術通過把多個IO的阻塞復用到同一個Selector的阻塞上,從而使得系統在單線程的情況下可以同時處理多個客戶端請求。與傳統的多線程/多進程模型相比,IO多路復用的最大優勢是系統開銷小,系統不需要創建新的額外進程或者線程,也不需要維護這些進程和線程的運行,降低了系統的維護工作量,節省了系統資源。
Netty的IO線程NioEventLoop由於聚合了多路復用器Selector,可以同時並發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升IO線程的運行效率,避免由頻繁的IO阻塞導致的線程掛起。另外,由於Netty采用了異步通信模式,一個IO線程可以並發處理N個客戶端連接和讀寫操作,這從根本上解決了傳統同步阻塞IO中 一連接一線程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升。
2、高效的Reactor線程模型
常用的Reactor線程模型有三種,分別如下:
Reactor單線程模型:
Reactor單線程模型,指的是所有的IO操作都在同一個NIO線程上面完成,NIO線程職責如下:
1、作為NIO服務端,接收客戶端的TCP連接;
2、作為NIO客戶端,向服務端發起TCP連接;
3、讀取通信對端的請求或者應答消息;
4、向通信對端發送請求消息或者應答消息;
由於Reactor模式使用的是異步非阻塞IO,所有的IO操作都不會導致阻塞,理論上一個線程可以獨立處理所有IO相關操作。從架構層面看,一個NIO線程確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連接請求消息,鏈路建立成功之后,通過Dispatch將對應的ByteBuffer派發到指定的Handler上進行消息編碼。用戶Handler可以通過NIO線程將消息發送給客戶端。
對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,主要原因如下:
1、一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;
2、當NIO線程負載過重后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,NIO線程會成為系統的性能瓶頸;
3、可靠性問題。一旦NIO線程意外進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障。
為了解決這些問題,從而演進出了Reactor多線程模型。
Reactor多線程模型:
Reactor多線程模型與單線程模型最大的區別就是有一組NIO線程處理IO操作,特點如下:
1、有一個專門的NIO線程——Acceptor線程用於監聽服務端,接收客戶端TCP連接請求;
2、網絡IO操作——讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發送;
3、1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。
在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端並發連接,或者服務端需要對客戶端的握手消息進行安全認證,認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。
主從Reactor多線程模型:
主從Reactor線程模型的特點是:服務端用於接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池只用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的IO線程上,由IO線程負責后續的IO操作。
利用主從NIO線程模型,可以解決1個服務端監聽線程無法有效處理所有客戶端連接的性能不足問題。Netty官方推薦使用該線程模型。它的工作流程總結如下:
1、從主線程池中隨機選擇一個Reactor線程作為Acceptor線程,用於綁定監聽端口,接收客戶端連接;
2、Acceptor線程接收客戶端連接請求之后,創建新的SocketChannel,將其注冊到主線程池的其他Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操作;
3、然后也業務層的鏈路正式建立成功,將SocketChannel從主線程池的Reactor線程的多路復用器上摘除,重新注冊到Sub線程池的線程上,用於處理IO的讀寫操作。
3、無鎖化的串行設計
在大多數場景下,並行多線程處理可以提升系統的並發性能。但是,如果對於共享資源的並發訪問處理不當,會帶來嚴重的鎖競爭,這最終會導致性能的下降。為了盡可能地避免鎖競爭帶來的性能損耗,可以通過串行化設計,既消息的處理盡可能在同一個線程內完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。
為了盡可能提升性能,Netty采用了串行無鎖化設計,在IO線程內部進行串行操作,避免多線程競爭導致的性能下降。表面上看,串行化設計似乎CPU利用率不高,並發程度不夠。但是,通過調整NIO線程池的線程參數,可以同時啟動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列——多個工作線程模型性能更優。
Netty串行化設計工作原理圖如下:
Netty的NioEventLoop讀取到消息后,直接調用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程導致的鎖競爭,從性能角度看是最優的。
4、高效的並發編程
Netty中高效並發編程主要體現:
1、volatile的大量、正確使用;
2、CAS和原子類的廣泛使用;
3、線程安全容器的使用;
4、通過讀寫鎖提升並發性能。
5、高性能的序列化框架
影響序列化性能的關鍵因素總結如下:
1、序列化后的碼流大小(網絡寬帶的占用);
2、序列化與反序列化的性能(CPU資源占用);
3、是否支持跨語言(異構系統的對接和開發語言切換)。
Netty默認提供了對GoogleProtobuf的支持,通過擴展Netty的編解碼接口,用戶可以實現其他的高性能序列化框架
6、零拷貝
Netty的“零拷貝”主要體現在三個方面:
1)、Netty的接收和發送ByteBuffer采用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不需要進行字節緩沖區的二次拷貝。如果使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,然后才寫入Socket中。相比於堆外直接內存,消息在發送過程中多了一次緩沖區的內存拷貝。
2)、第二種“零拷貝 ”的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝后的ByteBuf接口。
3)、第三種“零拷貝”就是文件傳輸,Netty文件傳輸類DefaultFileRegion通過transferTo方法將文件發送到目標Channel中。很多操作系統直接將文件緩沖區的內容發送到目標Channel中,而不需要通過循環拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸性能,降低了CPU和內存占用,實現了文件傳輸的“零拷貝”。
7、內存池
隨着JVM虛擬機和JIT即時編譯技術的發展,對象的分配和回收是個非常輕量級的工作。但是對於緩沖區Buffer,情況卻稍有不同,特別是對於堆外直接內存的分配和回收,是一件耗時的操作。為了盡量重用緩沖區,Netty提供了基於內存池的緩沖區重用機制。
1 package com.studyio.netty; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.PooledByteBufAllocator; 5 import io.netty.buffer.Unpooled; 6 7 /** 8 * 9 * @author lgs 10 * 通過內存池的方式構建直接緩沖區 11 */ 12 public class PooledByteBufDemo { 13 14 public static void main(String[] args) { 15 byte[] content = new byte[1024]; 16 int loop = 3000000; 17 long startTime = System.currentTimeMillis(); 18 19 ByteBuf poolBuffer = null; 20 for (int i = 0; i < loop; i++) { 21 //通過內存池的方式構建直接緩沖區 22 poolBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024); 23 poolBuffer.writeBytes(content); 24 //釋放buffer 25 poolBuffer.release(); 26 } 27 long startTime2 = System.currentTimeMillis(); 28 ByteBuf buffer = null; 29 for (int i = 0; i < loop; i++) { 30 //通過非內存池的方式構建直接緩沖區 31 buffer = Unpooled.directBuffer(1024); 32 buffer.writeBytes(content); 33 buffer.release(); 34 } 35 long endTime = System.currentTimeMillis(); 36 System.out.println("The PooledByteBuf use time :"+(startTime2-startTime)); 37 System.out.println("The UnpooledByteBuf use time :"+(endTime-startTime2)); 38 } 39 }
運行結果:內存池的方式構建直接緩沖區效率更高
The PooledByteBuf use time :740
The UnpooledByteBuf use time :1025
8、靈活的TCP參數配置能力
Netty在啟動輔助類中可以靈活的配置TCP參數,滿足不同的用戶場景。合理設置TCP參數在某些場景下對於性能的提升可以起到的顯著的效果,總結一下對性能影響比較大的幾個配置項:
1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;
2)、TCP_NODELAY:NAGLE算法通過將緩沖區內的小封包自動相連,組成較大的封包,阻止大量小封包的發送阻塞網絡,從而提高網絡應用效率。但是對於時延敏感的應用場景需要關閉該優化算法;
3)、軟中斷:如果Linux內核版本支持RPS(2.6.35以上版本),開啟RPS后可以實現軟中斷,提升網絡吞吐量。RPS根據數據包的源地址,目的地址以及目的和源端口,計算出一個hash值,然后根據這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每個連接和CPU綁定,並通過這個hash值,來均衡軟中斷在多個CPU上,提升網絡並行處理性能。