BIO、NIO、AIO系列二:Netty


一、概述

Netty是一個Java的開源框架。提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

Netty是一個NIO客戶端,服務端框架。允許快速簡單的開發網絡應用程序。例如:服務端和客戶端之間的協議,它簡化了網絡編程規范。

二、NIO開發的問題

1、NIO類庫和API復雜,使用麻煩。

2、需要具備Java多線程編程能力(涉及到Reactor模式)。

3、客戶端斷線重連、網絡不穩定、半包讀寫、失敗緩存、網絡阻塞和異常碼流等問題處理難度非常大

4、存在部分BUG

NIO進行服務器開發的步驟很復雜有以下步驟:

1、創建ServerSocketChannel,配置為非阻塞模式;

2、綁定監聽,配置TCP參數;

3、創建一個獨立的IO線程,用於輪詢多路復用器Selector;

4、創建Selector,將之前創建的ServerSocketChannel注冊到Selector上,監聽Accept事件;

5、啟動IO線程,在循環中執行Select.select()方法,輪詢就緒的Channel;

6、當輪詢到處於就緒狀態的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態,說明有新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端;

7、設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP參數;

8、將SocketChannel注冊到Selector上,監聽READ事件;

9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的准備就緒的數據包需要讀取,則構造ByteBuffer對象,讀取數據包;

10、如果輪詢的Channel為OP_WRITE,則說明還有數據沒有發送完成,需要繼續發送。

三、Netty的優點

1、API使用簡單,開發門檻低;

2、功能強大,預置了多種編解碼功能,支持多種主流協議;

3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展;

4、性能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優;

5、成熟、穩定,Netty修復了已經發現的NIO所有BUG;

6、社區活躍;

7、經歷了很多商用項目的考驗。

四、Netty使用demo示例

服務端TimeServer.java

 1 package com.studyio.netty;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 
12 /**
13  * 
14  * @author lgs
15  * 服務端
16  */
17 public class TimeServer {
18 
19     public static void main(String[] args) throws Exception {
20         int port=8080; //服務端默認端口
21         new TimeServer().bind(port);
22     }
23     
24     public void bind(int port) throws Exception{
25         //1用於服務端接受客戶端的連接 線程池里面只有一個線程
26         EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
27         //2用於進行SocketChannel的網絡讀寫 線程池有多個線程
28         EventLoopGroup workerGroup = new NioEventLoopGroup();
29         try {
30             //Netty用於啟動NIO服務器的輔助啟動類
31             ServerBootstrap sb = new ServerBootstrap();
32             //將兩個NIO線程組傳入輔助啟動類中
33             sb.group(acceptorGroup, workerGroup)
34                 //設置創建的Channel為NioServerSocketChannel類型
35                 .channel(NioServerSocketChannel.class)
36                 //配置NioServerSocketChannel的TCP參數
37                 .option(ChannelOption.SO_BACKLOG, 1024)
38                 //設置綁定IO事件的處理類
39                 .childHandler(new ChannelInitializer<SocketChannel>() {
40                     //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
41                     @Override
42                     protected void initChannel(SocketChannel arg0) throws Exception {
43                         
44                         arg0.pipeline().addLast(new TimeServerHandler());
45                     }
46                 });
47             //綁定端口,同步等待成功(sync():同步阻塞方法,等待bind操作完成才繼續)
48             //ChannelFuture主要用於異步操作的通知回調
49             ChannelFuture cf = sb.bind(port).sync();
50             System.out.println("服務端啟動在8080端口。");
51             //等待服務端監聽端口關閉
52             cf.channel().closeFuture().sync();
53         } finally {
54             //優雅退出,釋放線程池資源
55             acceptorGroup.shutdownGracefully();
56             workerGroup.shutdownGracefully();
57         }
58     }
59 }

服務端用於對網絡資源進行讀寫操作TimeServerHandler.java

 1 package com.studyio.netty;
 2 
 3 import java.util.Date;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.buffer.Unpooled;
 7 import io.netty.channel.ChannelHandlerAdapter;
 8 import io.netty.channel.ChannelHandlerContext;
 9 
10 /**
11  * 
12  * @author lgs
13  * @readme 服務端用於對網絡資源進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。
14  * 
15  */
16 public class TimeServerHandler extends ChannelHandlerAdapter {
17 
18     @Override
19     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
20         ByteBuf buf = (ByteBuf) msg;
21         //buf.readableBytes():獲取緩沖區中可讀的字節數;
22         //根據可讀字節數創建數組
23         byte[] req = new byte[buf.readableBytes()];
24         buf.readBytes(req);
25         String body = new String(req, "UTF-8");
26         System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
27         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
28         
29         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
30         //將待發送的消息放到發送緩存數組中
31         ctx.writeAndFlush(resp);
32     }
33 }

 

客戶端TimeClient.java

 1 package com.studyio.netty;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 
12 /**
13  * 
14  * @author lgs
15  * 客戶端
16  */
17 public class TimeClient {
18     public static void main(String[] args) throws Exception {
19         int port=8080; //服務端默認端口
20         new TimeClient().connect(port, "127.0.0.1");
21     }
22     public void connect(int port, String host) throws Exception{
23         //配置客戶端NIO線程組
24         EventLoopGroup group = new NioEventLoopGroup();
25         try {
26             Bootstrap bs = new Bootstrap();
27             bs.group(group)
28                 .channel(NioSocketChannel.class)
29                 .option(ChannelOption.TCP_NODELAY, true)
30                 .handler(new ChannelInitializer<SocketChannel>() {
31                     @Override
32                     //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
33                     protected void initChannel(SocketChannel arg0) throws Exception {
34                         arg0.pipeline().addLast(new TimeClientHandler());
35                     }
36                 });
37             //發起異步連接操作
38             ChannelFuture cf = bs.connect(host, port).sync();
39             //等待客戶端鏈路關閉
40             cf.channel().closeFuture().sync();
41         } finally {
42             //優雅退出,釋放NIO線程組
43             group.shutdownGracefully();
44         }
45     }
46 }

客戶端向服務端發送數據和接收服務端數據TimeClientHandler.java

 1 package com.studyio.netty;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 /**
 9  * 
10  * @author lgs
11  * 客戶端向服務端發送數據和接收服務端數據
12  */
13 public class TimeClientHandler extends ChannelHandlerAdapter {
14 
15     @Override
16     //向服務器發送指令
17     public void channelActive(ChannelHandlerContext ctx) throws Exception {
18         for (int i = 0; i < 5; i++) {
19             byte[] req = "QUERY TIME ORDER".getBytes();
20             ByteBuf firstMessage = Unpooled.buffer(req.length);
21             firstMessage.writeBytes(req);
22             ctx.writeAndFlush(firstMessage);
23         }
24     }
25 
26     @Override
27     //接收服務器的響應
28     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
29         ByteBuf buf = (ByteBuf) msg;
30         //buf.readableBytes():獲取緩沖區中可讀的字節數;
31         //根據可讀字節數創建數組
32         byte[] req = new byte[buf.readableBytes()];
33         buf.readBytes(req);
34         String body = new String(req, "UTF-8");
35         System.out.println("Now is : "+body);
36     }
37 
38     @Override
39     //異常處理
40     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
41         //釋放資源
42         ctx.close();
43     }
44     
45 }

五、 粘包/拆包問題

TCP粘包拆包問題示例圖:

說明:

假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到的字節數是不確定的,可能存在以下4種情況。

1、服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;

2、服務端一次接收到了兩個數據包,D1和D2粘合在一起,被稱為TCP粘包;

3、服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余部分內容,這被稱為TCP拆包;

4、服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余內容D1_1和D2包的完整內容;

如果此時服務器TCP接收滑窗非常小,而數據包D1和D2比較大,很有可能發生第五種情況,既服務端分多次才能將D1和D2包接收完全,期間發生多次拆包;

總結:

粘包:客戶端發送的數據D2,D1可能被合並成一個D2D1發送到服務端

拆包:客戶端發送的數據D2,D1可能被拆分成D2_1,D2_2D1或者D2D1_1,D1_2發送到服務端

粘包拆包問題的解決策略

由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可歸納如下:

1、消息定長,例如每個報文的大小為固定長度200字節,如果不夠,空位補空格;

FixedLengthFrameDecoder

是固定長度解碼器,能夠按照指定的長度對消息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題。

服務端:

TimeServer.java

 1 package com.studyio.nettyFixedLength;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.FixedLengthFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 /**
15  * 
16  * @author lgs
17  * 處理粘包/拆包問題-消息定長,固定長度處理
18  */
19 public class TimeServer {
20 
21     public static void main(String[] args) throws Exception {
22         int port=8080; //服務端默認端口
23         new TimeServer().bind(port);
24     }
25     public void bind(int port) throws Exception{
26         //Reactor線程組
27         //1用於服務端接受客戶端的連接
28         EventLoopGroup acceptorGroup = new NioEventLoopGroup();
29         //2用於進行SocketChannel的網絡讀寫
30         EventLoopGroup workerGroup = new NioEventLoopGroup();
31         try {
32             //Netty用於啟動NIO服務器的輔助啟動類
33             ServerBootstrap sb = new ServerBootstrap();
34             //將兩個NIO線程組傳入輔助啟動類中
35             sb.group(acceptorGroup, workerGroup)
36                 //設置創建的Channel為NioServerSocketChannel類型
37                 .channel(NioServerSocketChannel.class)
38                 //配置NioServerSocketChannel的TCP參數
39                 .option(ChannelOption.SO_BACKLOG, 1024)
40                 //設置綁定IO事件的處理類
41                 .childHandler(new ChannelInitializer<SocketChannel>() {
42                     @Override
43                     protected void initChannel(SocketChannel arg0) throws Exception {
44                         //處理粘包/拆包問題-固定長度處理
45                         arg0.pipeline().addLast(new FixedLengthFrameDecoder(16));
46                         arg0.pipeline().addLast(new StringDecoder());
47                         arg0.pipeline().addLast(new TimeServerHandler());
48                     }
49                 });
50             //綁定端口,同步等待成功(sync():同步阻塞方法)
51             //ChannelFuture主要用於異步操作的通知回調
52             ChannelFuture cf = sb.bind(port).sync();
53                 
54             //等待服務端監聽端口關閉
55             cf.channel().closeFuture().sync();
56         } finally {
57             //優雅退出,釋放線程池資源
58             acceptorGroup.shutdownGracefully();
59             workerGroup.shutdownGracefully();
60         }
61     }
62 }

TimeServerHandler.java

 1 package com.studyio.nettyFixedLength;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 /**
 9  * 
10  * @author lgs
11  * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。
12  * 
13  */
14 public class TimeServerHandler extends ChannelHandlerAdapter {
15 
16     private int counter;
17     @Override
18     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
19         String body = (String) msg;
20         System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
21         String currentTime = body;
22         
23         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
24         
25         //將待發送的消息放到發送緩存數組中
26         ctx.writeAndFlush(resp);
27     }
28 }

 

客戶端:

TimeClient.java

 1 package com.studyio.nettyFixedLength;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 import io.netty.handler.codec.FixedLengthFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 /**
15  * 
16  * @author lgs
17  * 處理粘包/拆包問題-消息定長,固定長度處理
18  */
19 public class TimeClient {
20     public static void main(String[] args) throws Exception {
21         int port=8080; //服務端默認端口
22         new TimeClient().connect(port, "127.0.0.1");
23     }
24     public void connect(int port, String host) throws Exception{
25         //配置客戶端NIO線程組
26         EventLoopGroup group = new NioEventLoopGroup();
27         try {
28             Bootstrap bs = new Bootstrap();
29             bs.group(group)
30                 .channel(NioSocketChannel.class)
31                 .option(ChannelOption.TCP_NODELAY, true)
32                 .handler(new ChannelInitializer<SocketChannel>() {
33                     @Override
34                     //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
35                     protected void initChannel(SocketChannel arg0) throws Exception {
36                         //處理粘包/拆包問題-固定長度處理
37                         arg0.pipeline().addLast(new FixedLengthFrameDecoder(16));
38                         arg0.pipeline().addLast(new StringDecoder());
39                         
40                         arg0.pipeline().addLast(new TimeClientHandler());
41                     }
42                 });
43             //發起異步連接操作
44             ChannelFuture cf = bs.connect(host, port).sync();
45             //等待客戶端鏈路關閉
46             cf.channel().closeFuture().sync();
47         } finally {
48             //優雅退出,釋放NIO線程組
49             group.shutdownGracefully();
50         }
51     }
52 }

TimeClientHandler.java

 1 package com.studyio.nettyFixedLength;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 /**
 9  * 
10  * @author lgs
11  * 
12  */
13 public class TimeClientHandler extends ChannelHandlerAdapter {
14 
15     private int counter;
16     private byte[] req;
17     
18     @Override
19     //向服務器發送指令
20     public void channelActive(ChannelHandlerContext ctx) throws Exception {
21         ByteBuf message=null;
22         //模擬一百次請求,發送重復內容
23         for (int i = 0; i < 100; i++) {
24             req = ("QUERY TIME ORDER").getBytes();
25             message=Unpooled.buffer(req.length);
26             message.writeBytes(req);
27             ctx.writeAndFlush(message);
28         }
29     }
30 
31     @Override
32     //接收服務器的響應
33     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
34         String body = (String) msg;
35         System.out.println("Now is : "+body+". the counter is : "+ ++counter);
36     }
37 
38     @Override
39     //異常處理
40     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
41         //釋放資源
42         ctx.close();
43     }
44     
45 }

2、在包尾增加回車換行符進行分割,例如FTP協議;

2.1 處理粘包/拆包問題-回車換行符進行分割

LineBasedFrameDecoder

服務端:

TimeServer.java

 1 package com.studyio.nettyLine;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 /**
15  * 
16  * @author lgs
17  * 處理粘包/拆包問題-回車換行符進行分割
18  * 
19  */
20 public class TimeServer {
21 
22     public static void main(String[] args) throws Exception {
23         int port=8080; //服務端默認端口
24         new TimeServer().bind(port);
25     }
26     public void bind(int port) throws Exception{
27         //Reactor線程組
28         //1用於服務端接受客戶端的連接
29         EventLoopGroup acceptorGroup = new NioEventLoopGroup();
30         //2用於進行SocketChannel的網絡讀寫
31         EventLoopGroup workerGroup = new NioEventLoopGroup();
32         try {
33             //Netty用於啟動NIO服務器的輔助啟動類
34             ServerBootstrap sb = new ServerBootstrap();
35             //將兩個NIO線程組傳入輔助啟動類中
36             sb.group(acceptorGroup, workerGroup)
37                 //設置創建的Channel為NioServerSocketChannel類型
38                 .channel(NioServerSocketChannel.class)
39                 //配置NioServerSocketChannel的TCP參數
40                 .option(ChannelOption.SO_BACKLOG, 1024)
41                 //設置綁定IO事件的處理類
42                 .childHandler(new ChannelInitializer<SocketChannel>() {
43                     @Override
44                     protected void initChannel(SocketChannel arg0) throws Exception {
45                         //處理粘包/拆包問題-換行符處理
46                         arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
47                         arg0.pipeline().addLast(new StringDecoder());
48                         
49                         arg0.pipeline().addLast(new TimeServerHandler());
50                     }
51                 });
52             //綁定端口,同步等待成功(sync():同步阻塞方法)
53             //ChannelFuture主要用於異步操作的通知回調
54             ChannelFuture cf = sb.bind(port).sync();
55                 
56             //等待服務端監聽端口關閉
57             cf.channel().closeFuture().sync();
58         } finally {
59             //優雅退出,釋放線程池資源
60             acceptorGroup.shutdownGracefully();
61             workerGroup.shutdownGracefully();
62         }
63     }
64 }

TimeServerHandler.java

 1 package com.studyio.nettyLine;
 2 
 3 import java.util.Date;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.buffer.Unpooled;
 7 import io.netty.channel.ChannelHandlerAdapter;
 8 import io.netty.channel.ChannelHandlerContext;
 9 
10 /**
11  * 
12  * @author lgs
13  * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。
14  * 處理粘包/拆包問題-回車換行符進行分割
15  */
16 public class TimeServerHandler extends ChannelHandlerAdapter {
17 
18     private int counter;
19     @Override
20     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
21 //        ByteBuf buf = (ByteBuf) msg;
22 //        //buf.readableBytes():獲取緩沖區中可讀的字節數;
23 //        //根據可讀字節數創建數組
24 //        byte[] req = new byte[buf.readableBytes()];
25 //        buf.readBytes(req);
26 //        String body = new String(req, "UTF-8");
27         String body = (String) msg;
28         System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
29         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
30         currentTime = currentTime + System.getProperty("line.separator");
31         
32         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
33         //將待發送的消息放到發送緩存數組中
34         ctx.writeAndFlush(resp);
35     }
36 
37 }

 

客戶端:

TimeClient.java

 1 package com.studyio.nettyLine;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 /**
15  * 
16  * @author lgs
17  * 處理粘包/拆包問題-回車換行符進行分割
18  * 
19  */
20 public class TimeClient {
21     public static void main(String[] args) throws Exception {
22         int port=8080; //服務端默認端口
23         new TimeClient().connect(port, "127.0.0.1");
24     }
25     public void connect(int port, String host) throws Exception{
26         //配置客戶端NIO線程組
27         EventLoopGroup group = new NioEventLoopGroup();
28         try {
29             Bootstrap bs = new Bootstrap();
30             bs.group(group)
31                 .channel(NioSocketChannel.class)
32                 .option(ChannelOption.TCP_NODELAY, true)
33                 .handler(new ChannelInitializer<SocketChannel>() {
34                     @Override
35                     //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
36                     protected void initChannel(SocketChannel arg0) throws Exception {
37                         //處理粘包/拆包問題-換行符處理
38                         arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
39                         arg0.pipeline().addLast(new StringDecoder());
40                         
41                         arg0.pipeline().addLast(new TimeClientHandler());
42                     }
43                 });
44             //發起異步連接操作
45             ChannelFuture cf = bs.connect(host, port).sync();
46             //等待客戶端鏈路關閉
47             cf.channel().closeFuture().sync();
48         } finally {
49             //優雅退出,釋放NIO線程組
50             group.shutdownGracefully();
51         }
52     }
53 }

TimeClientHandler.java

 1 package com.studyio.nettyLine;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 /**
 9  * 
10  * @author lgs
11  * 處理粘包/拆包問題-回車換行符進行分割
12  */
13 public class TimeClientHandler extends ChannelHandlerAdapter {
14 
15     private int counter;
16     private byte[] req;
17     
18     @Override
19     //向服務器發送指令
20     public void channelActive(ChannelHandlerContext ctx) throws Exception {
21         ByteBuf message=null;
22         //模擬一百次請求,發送重復內容
23         for (int i = 0; i < 200; i++) {
24             //回車換行符System.getProperty("line.separator")
25             req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
26             message=Unpooled.buffer(req.length);
27             message.writeBytes(req);
28             ctx.writeAndFlush(message);
29         }
30         
31     }
32 
33     @Override
34     //接收服務器的響應
35     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
36 //        ByteBuf buf = (ByteBuf) msg;
37 //        //buf.readableBytes():獲取緩沖區中可讀的字節數;
38 //        //根據可讀字節數創建數組
39 //        byte[] req = new byte[buf.readableBytes()];
40 //        buf.readBytes(req);
41 //        String body = new String(req, "UTF-8");
42         String body = (String) msg;
43         System.out.println("Now is : "+body+". the counter is : "+ ++counter);
44     }
45 
46     @Override
47     //異常處理
48     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
49         //釋放資源
50         ctx.close();
51     }
52     
53 }

2.2 處理粘包/拆包問題自定義分隔符進行分割

DelimiterBasedFrameDecoder

實現自定義分隔符作為消息的結束標志,完成解碼。

服務端:

TimeServer.java

 1 package com.studyio.nettyDelimiter;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelFuture;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelOption;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.SocketChannel;
12 import io.netty.channel.socket.nio.NioServerSocketChannel;
13 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
14 import io.netty.handler.codec.string.StringDecoder;
15 
16 /**
17  * 
18  * @author lgs
19  * 處理粘包/拆包問題-定義分隔符
20  * 
21  */
22 public class TimeServer {
23     public static void main(String[] args) throws Exception {
24         int port=8080; //服務端默認端口
25         new TimeServer().bind(port);
26     }
27 
28     public void bind(int port) throws Exception{
29         //Reactor線程組
30         //1用於服務端接受客戶端的連接
31         EventLoopGroup acceptorGroup = new NioEventLoopGroup();
32         //2用於進行SocketChannel的網絡讀寫
33         EventLoopGroup workerGroup = new NioEventLoopGroup();
34         try {
35             //Netty用於啟動NIO服務器的輔助啟動類
36             ServerBootstrap sb = new ServerBootstrap();
37             //將兩個NIO線程組傳入輔助啟動類中
38             sb.group(acceptorGroup, workerGroup)
39                 //設置創建的Channel為NioServerSocketChannel類型
40                 .channel(NioServerSocketChannel.class)
41                 //配置NioServerSocketChannel的TCP參數
42                 .option(ChannelOption.SO_BACKLOG, 1024)
43                 //設置綁定IO事件的處理類
44                 .childHandler(new ChannelInitializer<SocketChannel>() {
45                     @Override
46                     protected void initChannel(SocketChannel arg0) throws Exception {
47                         //處理粘包/拆包問題-自定義分隔符處理
48                         ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
49                         arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
50                         arg0.pipeline().addLast(new StringDecoder());
51                         arg0.pipeline().addLast(new TimeServerHandler());
52                     }
53                 });
54             //綁定端口,同步等待成功(sync():同步阻塞方法)
55             //ChannelFuture主要用於異步操作的通知回調
56             ChannelFuture cf = sb.bind(port).sync();
57                 
58             //等待服務端監聽端口關閉
59             cf.channel().closeFuture().sync();
60         } finally {
61             //優雅退出,釋放線程池資源
62             acceptorGroup.shutdownGracefully();
63             workerGroup.shutdownGracefully();
64         }
65     }
66 }

TimeServerHandler.java

 1 package com.studyio.nettyDelimiter;
 2 
 3 import java.util.Date;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.buffer.Unpooled;
 7 import io.netty.channel.ChannelHandlerAdapter;
 8 import io.netty.channel.ChannelHandlerContext;
 9 
10 /**
11  * 
12  * @author lgs
13  * 處理粘包/拆包問題-定義分隔符
14  * @readme 用於對網絡時間進行讀寫操作,通常我們只需要關注channelRead和exceptionCaught方法。
15  * 
16  */
17 public class TimeServerHandler extends ChannelHandlerAdapter {
18 
19     private int counter;
20     @Override
21     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
22         String body = (String) msg;
23         System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
24         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
25         //處理粘包/拆包問題-定義分隔符
26         currentTime += "$_";
27         
28         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
29         //將待發送的消息放到發送緩存數組中
30         ctx.writeAndFlush(resp);
31     }
32 }

 

客戶端:

TimeClient.java

 1 package com.studyio.nettyDelimiter;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelFuture;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelOption;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.SocketChannel;
12 import io.netty.channel.socket.nio.NioSocketChannel;
13 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
14 import io.netty.handler.codec.string.StringDecoder;
15 
16 /**
17  * 
18  * @author lgs
19  * 處理粘包/拆包問題-定義分隔符
20  * 
21  */
22 public class TimeClient {
23     public static void main(String[] args) throws Exception {
24         int port=8080; //服務端默認端口
25         new TimeClient().connect(port, "127.0.0.1");
26     }
27     public void connect(int port, String host) throws Exception{
28         //配置客戶端NIO線程組
29         EventLoopGroup group = new NioEventLoopGroup();
30         try {
31             Bootstrap bs = new Bootstrap();
32             bs.group(group)
33                 .channel(NioSocketChannel.class)
34                 .option(ChannelOption.TCP_NODELAY, true)
35                 .handler(new ChannelInitializer<SocketChannel>() {
36                     @Override
37                     //創建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡IO事件
38                     protected void initChannel(SocketChannel arg0) throws Exception {
39                         //處理粘包/拆包問題-自定義分隔符處理
40                         ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
41                         arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
42                         arg0.pipeline().addLast(new StringDecoder());
43                         
44                         arg0.pipeline().addLast(new TimeClientHandler());
45                     }
46                 });
47             //發起異步連接操作
48             ChannelFuture cf = bs.connect(host, port).sync();
49             //等待客戶端鏈路關閉
50             cf.channel().closeFuture().sync();
51         } finally {
52             //優雅退出,釋放NIO線程組
53             group.shutdownGracefully();
54         }
55     }
56 }

TimeClientHandler.java

 1 package com.studyio.nettyDelimiter;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 
 8 /**
 9  * 
10  * @author lgs
11  * 處理粘包/拆包問題-定義分隔符
12  * 
13  */
14 public class TimeClientHandler extends ChannelHandlerAdapter {
15     
16     private int counter;
17     private byte[] req;
18     
19     @Override
20     //向服務器發送指令
21     public void channelActive(ChannelHandlerContext ctx) throws Exception {
22         ByteBuf message=null;
23         //模擬一百次請求,發送重復內容
24         for (int i = 0; i < 200; i++) {
25             //處理粘包/拆包問題-定義分隔符
26             req = ("QUERY TIME ORDER"+"$_").getBytes();
27             message=Unpooled.buffer(req.length);
28             message.writeBytes(req);
29             ctx.writeAndFlush(message);
30         }
31         
32     }
33 
34     @Override
35     //接收服務器的響應
36     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
37         String body = (String) msg;
38         System.out.println("Now is : "+body+". the counter is : "+ ++counter);
39     }
40 
41     @Override
42     //異常處理
43     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
44         //釋放資源
45         ctx.close();
46     }
47     
48 }

3、將消息分為消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總程度;

4、更復雜的應用層協議;

 六、Netty高性能的原因

1、異步非阻塞通信

IO編程過程中,當需要同時處理多個客戶端接入請求時,可以利用多線程或者IO多路復用技術進行處理。IO多路復用技術通過把多個IO的阻塞復用到同一個Selector的阻塞上,從而使得系統在單線程的情況下可以同時處理多個客戶端請求。與傳統的多線程/多進程模型相比,IO多路復用的最大優勢是系統開銷小,系統不需要創建新的額外進程或者線程,也不需要維護這些進程和線程的運行,降低了系統的維護工作量,節省了系統資源。

Netty的IO線程NioEventLoop由於聚合了多路復用器Selector,可以同時並發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升IO線程的運行效率,避免由頻繁的IO阻塞導致的線程掛起。另外,由於Netty采用了異步通信模式,一個IO線程可以並發處理N個客戶端連接和讀寫操作,這從根本上解決了傳統同步阻塞IO 一連接一線程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升。

2、高效的Reactor線程模型

常用的Reactor線程模型有三種,分別如下:

Reactor單線程模型:

 

Reactor單線程模型,指的是所有的IO操作都在同一個NIO線程上面完成,NIO線程職責如下:

1、作為NIO服務端,接收客戶端的TCP連接;

2、作為NIO客戶端,向服務端發起TCP連接;

3、讀取通信對端的請求或者應答消息;

4、向通信對端發送請求消息或者應答消息;

由於Reactor模式使用的是異步非阻塞IO,所有的IO操作都不會導致阻塞,理論上一個線程可以獨立處理所有IO相關操作。從架構層面看,一個NIO線程確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連接請求消息,鏈路建立成功之后,通過Dispatch將對應的ByteBuffer派發到指定的Handler上進行消息編碼。用戶Handler可以通過NIO線程將消息發送給客戶端。

對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,主要原因如下:

1、一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;

2、當NIO線程負載過重后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,NIO線程會成為系統的性能瓶頸;

3、可靠性問題。一旦NIO線程意外進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障。

為了解決這些問題,從而演進出了Reactor多線程模型。

Reactor多線程模型:

 

Reactor多線程模型與單線程模型最大的區別就是有一組NIO線程處理IO操作,特點如下:

1、有一個專門的NIO線程——Acceptor線程用於監聽服務端,接收客戶端TCP連接請求;

2、網絡IO操作——讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發送;

3、1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。

在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端並發連接,或者服務端需要對客戶端的握手消息進行安全認證,認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。

主從Reactor多線程模型:

 

主從Reactor線程模型的特點是:服務端用於接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池只用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的IO線程上,由IO線程負責后續的IO操作。

利用主從NIO線程模型,可以解決1個服務端監聽線程無法有效處理所有客戶端連接的性能不足問題。Netty官方推薦使用該線程模型。它的工作流程總結如下:

1、從主線程池中隨機選擇一個Reactor線程作為Acceptor線程,用於綁定監聽端口,接收客戶端連接;

2、Acceptor線程接收客戶端連接請求之后,創建新的SocketChannel,將其注冊到主線程池的其他Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操作;

3、然后也業務層的鏈路正式建立成功,將SocketChannel從主線程池的Reactor線程的多路復用器上摘除,重新注冊到Sub線程池的線程上,用於處理IO的讀寫操作。

3、無鎖化的串行設計

在大多數場景下,並行多線程處理可以提升系統的並發性能。但是,如果對於共享資源的並發訪問處理不當,會帶來嚴重的鎖競爭,這最終會導致性能的下降。為了盡可能地避免鎖競爭帶來的性能損耗,可以通過串行化設計,既消息的處理盡可能在同一個線程內完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。

為了盡可能提升性能,Netty采用了串行無鎖化設計,在IO線程內部進行串行操作,避免多線程競爭導致的性能下降。表面上看,串行化設計似乎CPU利用率不高,並發程度不夠。但是,通過調整NIO線程池的線程參數,可以同時啟動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列——多個工作線程模型性能更優。

Netty串行化設計工作原理圖如下:

Netty的NioEventLoop讀取到消息后,直接調用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程導致的鎖競爭,從性能角度看是最優的。

4、高效的並發編程

Netty高效並發編程主要體現

1、volatile的大量、正確使用;

2、CAS和原子類的廣泛使用;

3、線程安全容器的使用;

4、通過讀寫鎖提升並發性能。

5、高性能的序列化框架

    影響序列化性能的關鍵因素總結如下:

    1、序列化后的碼流大小(網絡寬帶的占用);

     2、序列化與反序列化的性能(CPU資源占用);

    3、是否支持跨語言(異構系統的對接和開發語言切換)。

    Netty默認提供了對GoogleProtobuf的支持,通過擴展Netty的編解碼接口,用戶可以實現其他的高性能序列化框架

6、零拷貝

    Netty的“零拷貝”主要體現在三個方面:

    1)、Netty的接收和發送ByteBuffer采用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不需要進行字節緩沖區的二次拷貝。如果使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,然后才寫入Socket中。相比於堆外直接內存,消息在發送過程中多了一次緩沖區的內存拷貝。

    2)、第二種“零拷貝 ”的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝后的ByteBuf接口。

    3)、第三種“零拷貝”就是文件傳輸,Netty文件傳輸類DefaultFileRegion通過transferTo方法將文件發送到目標Channel中。很多操作系統直接將文件緩沖區的內容發送到目標Channel中,而不需要通過循環拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸性能,降低了CPU和內存占用,實現了文件傳輸的“零拷貝”。

7、內存池

    隨着JVM虛擬機和JIT即時編譯技術的發展,對象的分配和回收是個非常輕量級的工作。但是對於緩沖區Buffer,情況卻稍有不同,特別是對於堆外直接內存的分配和回收,是一件耗時的操作。為了盡量重用緩沖區,Netty提供了基於內存池的緩沖區重用機制。

 1 package com.studyio.netty;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.PooledByteBufAllocator;
 5 import io.netty.buffer.Unpooled;
 6 
 7 /**
 8  * 
 9  * @author lgs
10  * 通過內存池的方式構建直接緩沖區
11  */
12 public class PooledByteBufDemo {
13 
14     public static void main(String[] args) {
15         byte[] content = new byte[1024];
16         int loop = 3000000;
17         long startTime = System.currentTimeMillis();
18         
19         ByteBuf poolBuffer = null;
20         for (int i = 0; i < loop; i++) {
21             //通過內存池的方式構建直接緩沖區
22             poolBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
23             poolBuffer.writeBytes(content);
24             //釋放buffer
25             poolBuffer.release();
26         }
27         long startTime2 = System.currentTimeMillis();
28         ByteBuf buffer = null;
29         for (int i = 0; i < loop; i++) {
30             //通過非內存池的方式構建直接緩沖區
31             buffer = Unpooled.directBuffer(1024);
32             buffer.writeBytes(content);
33             buffer.release();
34         }
35         long endTime = System.currentTimeMillis();
36         System.out.println("The PooledByteBuf use time :"+(startTime2-startTime));
37         System.out.println("The UnpooledByteBuf use time :"+(endTime-startTime2));
38     }
39 }

 運行結果:內存池的方式構建直接緩沖區效率更高

The PooledByteBuf use time :740
The UnpooledByteBuf use time :1025

8、靈活的TCP參數配置能力

    Netty在啟動輔助類中可以靈活的配置TCP參數,滿足不同的用戶場景。合理設置TCP參數在某些場景下對於性能的提升可以起到的顯著的效果,總結一下對性能影響比較大的幾個配置項:

    1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;

    2)、TCP_NODELAY:NAGLE算法通過將緩沖區內的小封包自動相連,組成較大的封包,阻止大量小封包的發送阻塞網絡,從而提高網絡應用效率。但是對於時延敏感的應用場景需要關閉該優化算法;

    3)、軟中斷:如果Linux內核版本支持RPS(2.6.35以上版本),開啟RPS后可以實現軟中斷,提升網絡吞吐量。RPS根據數據包的源地址,目的地址以及目的和源端口,計算出一個hash值,然后根據這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每個連接和CPU綁定,並通過這個hash值,來均衡軟中斷在多個CPU上,提升網絡並行處理性能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM