關於Netty的入門使用


Netty介紹:

Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發網絡應用程序,比如客戶端和服務端的協議。Netty大大簡化了網絡程序的開發過程比如TCP和UDP的 Socket的開發。

“快速和簡單”並不意味着應用程序會有難維護和性能低的問題,Netty是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗比如FTP、SMTP、HTTP、許多二進制和基於文本的傳統協議,Netty在不降低開發效率、性能、穩定性、靈活性情況下,成功地找到了解決方案。

有一些用戶可能已經發現其他的一些網絡框架也聲稱自己有同樣的優勢,所以你可能會問是Netty和它們的不同之處。答案就是Netty的哲學設計理念。Netty從第一天開始就為用戶提供了用戶體驗最好的API以及實現設計。正是因為Netty的設計理念,才讓我們得以輕松地閱讀本指南並使用Netty。

接下來,我們看下Client端的代碼實現:

 

 1 package ruizhan.hjf.netty;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelFuture;
 6 import io.netty.channel.ChannelInitializer;
 7 import io.netty.channel.ChannelOption;
 8 import io.netty.channel.EventLoopGroup;
 9 import io.netty.channel.nio.NioEventLoopGroup;
10 import io.netty.channel.socket.SocketChannel;
11 import io.netty.channel.socket.nio.NioSocketChannel;
12 /**
13  * Netty客戶端的程序
14  * @author huangjianfei
15  */
16 public class Client {
17     /*IP地址*/
18     static final String HOST = System.getProperty("host", "127.0.0.1");
19     /*端口號*/
20     static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
21     
22     static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
23     
24     public static void main(String[] args) throws Exception {
25         EventLoopGroup workgroup = new NioEventLoopGroup();
26         Bootstrap b = new Bootstrap();//客戶端
27         b.group(workgroup)
28         .channel(NioSocketChannel.class)//客戶端 -->NioSocketChannel
29         .option(ChannelOption.SO_KEEPALIVE, true)
30         .handler(new ChannelInitializer<SocketChannel>() {//handler
31             @Override
32             protected void initChannel(SocketChannel sc) throws Exception {
33                 sc.pipeline().addLast(new ClientHandler());  
34             }
35         });  
36         //創建異步連接 可添加多個端口
37         ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
38         ChannelFuture cf2 = b.connect(HOST, PORT2).sync();   
39         
40         //buf
41         //client向server端發送數據  Buffer形式
42         cf1.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes()));
43         cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));
44         
45         
46         cf1.channel().closeFuture().sync();
47         cf2.channel().closeFuture().sync();
48         
49         workgroup.shutdownGracefully();
50     }
51 }

Servler端代碼實現:

 1 package ruizhan.hjf.netty;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelFuture;
 6 import io.netty.channel.ChannelInitializer;
 7 import io.netty.channel.ChannelOption;
 8 import io.netty.channel.EventLoopGroup;
 9 import io.netty.channel.nio.NioEventLoopGroup;
10 import io.netty.channel.socket.SocketChannel;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 /**
13  * Netty實現的服務端程序
14  * @author huangjianfei
15  */
16 public class Server
17 {
18     /*端口號*/
19     static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765"));
20     
21     static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764"));
22     
23     public static void main(String[] args)
24     {
25         EventLoopGroup bossGroup = null;
26         EventLoopGroup workerGroup = null;
27         ServerBootstrap b = null;
28         try{
29             //1:第一個線程組是用於接收Client連接的
30             bossGroup = new NioEventLoopGroup(); //(1)
31             //2:第二個線程組是用於實際的業務處理操作的
32             workerGroup = new NioEventLoopGroup();
33             //3:創建一個啟動NIO服務的輔助啟動類ServerBootstrap 就是對我們的Server進行一系列的配置
34             b = new ServerBootstrap();//(2)
35             //4:綁定兩個線程組
36             b.group(bossGroup, workerGroup)
37             //5:需要指定使用NioServerSocketChannel這種類型的通道
38             .channel(NioServerSocketChannel.class)//(3) 服務端 -->NioServerSocketChannel
39             //6:一定要使用childHandler 去綁定具體的事件處理器
40             .childHandler(new ChannelInitializer<SocketChannel>() //(4)   childHandler
41             {
42                 @Override
43                 protected void initChannel(SocketChannel sc) throws Exception
44                 {
45                     //7:將自定義的serverHandler加入到管道中去(多個)
46                     sc.pipeline().addLast(new ServerHandler());//handler中實現真正的業務邏輯
47 //                    sc.pipeline().addLast(new ServerHandler());
48 //                    sc.pipeline().addLast(new ServerHandler());
49                 }
50             })
51             /**
52              * 服務器端TCP內核模塊維護兩個隊列,我們稱之為A,B吧
53              * 客戶端向服務端connect的時候,會發送帶有SYN標志的包(第一次握手)
54              * 服務端收到客戶端發來的SYN時,向客戶端發送SYN ACK確認(第二次握手)
55              * 此時TCP內核模塊把客戶端連接加入到A隊列中,最后服務端收到客戶端發來的ACK時(第三次握手)
56              * TCP內核模塊把客戶端連接從A隊列移到B隊列,連接成功,應用程序的accept會返回
57              * 也就是說accept從B隊列中取出完成三次握手的連接
58              * A隊列和B隊列的長度之和是backLog,當A,B隊列的長度之和大於backLog時,新連接將會被TCP內核拒絕
59              * 所以,如果backLog過小,可能會出現accept速度跟不上,A,B隊列滿了,導致新的客戶端無法連接,
60              * 要注意的是,backLog對程序支持的連接數並無影響,backLog影響的只是還沒有被accept取出的連接
61              */
62             //8:設置TCP連接的緩沖區
63             .option(ChannelOption.SO_BACKLOG, 128)//(5)
64 //            .option(ChannelOption.SO_SNDBUF, 32*1024) //設置發送緩沖大小
65 //            .option(ChannelOption.SO_RCVBUF, 32*1024) //設置接收緩沖大小
66             //9:保持連接
67             .childOption(ChannelOption.SO_KEEPALIVE, true);//(6)
68             //10:綁定指定的端口 進行監聽
69             //此處端口號先寫死  也可以綁定多個端口  
70             ChannelFuture cf2= b.bind(PORT1).sync(); // (7)  
71             
72             ChannelFuture cf3= b.bind(PORT2).sync(); // (7)   綁定多個端口 
73 
74             //Thread.sleep(10000);
75             cf2.channel().closeFuture().sync(); //異步等待關閉 
76             cf3.channel().closeFuture().sync(); //異步等待關閉
77             
78         }catch(Exception e){
79             e.printStackTrace();
80         }finally{
81             workerGroup.shutdownGracefully();
82             bossGroup.shutdownGracefully();
83         }
84     }
85 }

接下來,就是真正去實現數據傳輸的業務邏輯層代碼的實現,在這里也就是ClientHanlder和ServlerHandler:

 1 package ruizhan.hjf.netty;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelHandlerAdapter;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.util.ReferenceCountUtil;
 8 
 9 /**
10  * 客戶端業務處理類
11  * (編寫主要的業務邏輯)
12  * @author huangjianfei
13  */
14 public class ClientHandler extends ChannelHandlerAdapter
15 {
16     // ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。
17     // 請記住處理器的職責是釋放所有傳遞到處理器的引用計數對象。
18     @Override
19     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
20     {
21         try{
22             //do something
23             //接收服務端發來的數據 ByteBuf
24             ByteBuf  buf = (ByteBuf)msg;
25             //創建一個和buf一樣長度的字節空數組
26             byte[] data = new byte[buf.readableBytes()];
27             //將buf中的數據讀取到data數組中
28             buf.readBytes(data);
29             //將data數組驚醒包裝 以String格式輸出
30             String response = new String(data,"utf-8");
31             System.out.println("client :"+response);
32             
33             //以上代碼是接收服務端發來的反饋數據//
34             
35             ctx.close();
36         }finally{
37             // Discard the received data silently.
38             ReferenceCountUtil.release(msg);
39         }
40     }
41 
42     @Override
43     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
44     {
45         // Close the connection when an exception is raised.
46         cause.printStackTrace();
47         ctx.close();
48     }
49 }
 1 package ruizhan.hjf.netty;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.buffer.Unpooled;
 5 import io.netty.channel.ChannelFutureListener;
 6 import io.netty.channel.ChannelHandlerAdapter;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.util.ReferenceCountUtil;
 9 
10 /**
11  * 服務端業務處理類
12  * (編寫主要的業務邏輯)
13  * @author huangjianfei
14  */
15 public class ServerHandler extends ChannelHandlerAdapter
16 {
17 
18     /**
19      * 每當從客戶端收到新的數據時,這個方法會在收到消息時被調用
20      * ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。
21      * 請記住處理器的職責是釋放所有傳遞到處理器的引用計數對象。
22      */
23     @Override
24     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
25     {
26         try{
27             //do something
28             //接收客戶端發送的數據 ByteBuf
29             ByteBuf buf = (ByteBuf)msg;
30             //創建一個和buf長度一樣的空字節數組
31             byte[] data = new byte[buf.readableBytes()];
32             //將buf中的數據讀取到data數組中
33             buf.readBytes(data);
34             //將data數據包裝成string輸出
35             String request = new String(data,"utf-8");
36             System.out.println("server :"+request);
37     
38             //以上代碼是接收客戶端信息//
39             
40             //server端向client發送反饋數據
41             //如果是綁定了多個端口 那么都會進行發送
42             ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
43             .addListener(ChannelFutureListener.CLOSE);//添加監聽 當服務端向客戶端發送完數據后,關閉connect連接
44             /**
45              * ChannelFutureListener,當一個寫請求完成時通知並且關閉Channel
46              * 加上監聽 意味着服務端回送數據到客戶端時 連接關閉(短連接)
47              * 不加監聽 意味着客戶端與服務端一直保持連接狀態(長連接)
48              */
49             
50             
51             ctx.close();
52         }finally{
53             // Discard the received data silently.
54             ReferenceCountUtil.release(msg);
55         }
56     }
57 
58     /**
59      * exceptionCaught()事件處理方法是當出現Throwable對象才會被調用
60      * 即當Netty由於IO錯誤或者處理器在處理事件時拋出的異常時
61      */
62     @Override
63     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
64     {
65         // Close the connection when an exception is raised.
66         cause.printStackTrace();
67         ctx.close();
68     }
69  
70 }

以上是Netty的基礎入門實現,詳見並發編程網,http://ifeve.com/netty5-user-guide/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM