Netty入門(二)時間服務器及客戶端


 

  在這個例子中,我在服務器和客戶端連接被創立時發送一個消息,然后在客戶端解析收到的消息並輸出。並且,在這個項目中我使用 POJO 代替 ByteBuf 來作為傳輸對象。

一、服務器實現

1.  首先我們自定義傳輸數據對象

 1 package com.coder.client;
 2 
 3 import java.util.Date;
 4 
 5 /**
 6  * 自定義時間數據類
 7  * @author Coder
 8  *
 9  */
10 public class Time {
11     private final long value;
12 
13     public Time() {
14         // 除以1000是為了使時間精確到秒
15         this(System.currentTimeMillis() / 1000L);
16     }
17 
18     public Time(long value) {
19         this.value = value;
20     }
21 
22     public long value() {
23         return value;
24     }
25 
26     @Override
27     public String toString() {
28         return new Date((value()) * 1000L).toString();
29     }
30 }

 

 

2.  然后我們需要自定義服務器數據編碼類

 1 package com.coder.server;
 2 
 3 import com.coder.client.Time;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.handler.codec.MessageToByteEncoder;
 8 
 9 /**
10  * 服務器數據編碼類
11  * @author Coder
12  *
13  */
14 public class TimeEncoderPOJO extends MessageToByteEncoder<Time> {
15 
16     // 發送數據時調用
17     @Override
18     protected void encode(ChannelHandlerContext ctx, Time msg, ByteBuf out) throws Exception {
19         // 只傳輸當前時間,精確到秒
20         out.writeInt((int)msg.value());
21     }
22 
23 }

 

 

3. 也需要自定義服務器的業務邏輯類,如下:

 1 package com.coder.server;
 2 
 3 import com.coder.client.Time;
 4 
 5 import io.netty.channel.ChannelFuture;
 6 import io.netty.channel.ChannelFutureListener;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 
10 /**
11  * 服務器解碼器
12  * 連接建立時發送當前時間
13  * @author Coder
14  *
15  */
16 public class TimeServerHandlerPOJO extends ChannelInboundHandlerAdapter {
17     /**
18      * 連接建立的時候並且准備進行通信時被調用
19      */
20     @Override
21     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
22         // 發送當前時間信息
23         ChannelFuture f = ctx.writeAndFlush(new Time());
24         // 發送完畢之后關閉 Channel
25         f.addListener(ChannelFutureListener.CLOSE);
26     }
27     
28     @Override
29     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
30         cause.printStackTrace();
31         ctx.close();
32     }
33 }

 

 

4. 有了上面的代碼,我們就可以實現服務器程序了,如下:

 1 package com.coder.server;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 
12 public class TimeServerPOJO {
13 private int port;
14     
15     public TimeServerPOJO(int port) {
16         this.port = port;
17     }
18     
19     public void run() throws Exception {
20         EventLoopGroup bossGroup = new NioEventLoopGroup();        // 用來接收進來的連接
21         EventLoopGroup workerGroup = new NioEventLoopGroup();    // 用來處理已經被接收的連接
22         System.out.println("准備運行端口:" + port);
23         
24         try {
25             ServerBootstrap b = new ServerBootstrap();        // 啟動NIO服務的輔助啟動類
26             b.group(bossGroup, workerGroup)
27             .channel(NioServerSocketChannel.class)            // 這里告訴Channel如何接收新的連接
28             .childHandler( new ChannelInitializer<SocketChannel>() {
29                 @Override
30                 protected void initChannel(SocketChannel ch) throws Exception {
31                     // 自定義處理類
32                     // 注意添加順序
33                     ch.pipeline().addLast(new TimeEncoderPOJO(),new TimeServerHandlerPOJO());
34                 }
35             })
36             .option(ChannelOption.SO_BACKLOG, 128)
37             .childOption(ChannelOption.SO_KEEPALIVE, true);
38             
39             // 綁定端口,開始接收進來的連接
40             ChannelFuture f = b.bind(port).sync();
41             
42             // 等待服務器socket關閉
43             f.channel().closeFuture().sync();
44         } catch (Exception e) {
45             workerGroup.shutdownGracefully();
46             bossGroup.shutdownGracefully();
47         }
48     }
49     
50     public static void main(String[] args) throws Exception {
51         int port = 8080;
52         new TimeServer(port).run();
53     }
54 }

  執行代碼后如下:

  

  這時候服務器在等待客戶端的連接(非阻塞)。

 

二、客戶端實現

   客戶端的實現與服務器類似。

1. 自定義客戶端數據解碼類

 1 package com.coder.client;
 2 
 3 import java.util.List;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.channel.ChannelHandlerContext;
 7 import io.netty.handler.codec.ByteToMessageDecoder;
 8 
 9 public class TimeDecoderPOJO extends ByteToMessageDecoder {
10     /**
11      * 有新數據接收時調用
12      * 為防止分包現象,先將數據存入內部緩存,到達滿足條件之后再進行解碼
13      */
14     @Override
15     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
16         if(in.readableBytes() < 4) {
17             return;
18         }
19         
20         // out添加對象則表示解碼成功
21         out.add(new Time(in.readUnsignedInt()));
22     }
23 }

 

 

2. 自定義客戶端業務邏輯類

 1 package com.coder.client;
 2 
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 /**
 7  * 客戶端數據處理類
 8  * @author Coder
 9  *
10  */
11 public class TimeClientHandlerPOJO extends ChannelInboundHandlerAdapter {
12     @Override
13     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
14         // 直接將信息轉換成Time類型輸出即可
15         Time time = (Time)msg;
16         System.out.println(time);
17         ctx.close();
18     }
19     
20     @Override
21     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
22         cause.printStackTrace();
23         ctx.close();
24     }
25 }

 

 

3. 客戶端程序實現

   Netty 客戶端的通信步驟大致為:

  1.  創建一個 NIO 線程組,用於處理服務器與客戶端的連接,客戶端不需要用到 boss worker。
  2.  創建一個 Bootstrap 對象,配置 Netty 的一系列參數,由於客戶端 SocketChannel 沒有父親,所以不需要使用 childoption。
  3.  創建一個用於實際處理數據的類ChannelInitializer,進行初始化的准備工作,比如設置接受傳出數據的字符集、格式以及實際處理數據的接口。
  4.  配置服務器 IP 和端口號,建立與服務器的連接。
 1 package com.coder.client;
 2 
 3 import io.netty.bootstrap.Bootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11 
12 public class TimeClientPOJO {
13     public static void main(String[] args) throws Exception{
14         String host = "127.0.0.1";            // ip
15         int port = 8080;                    // 端口
16         EventLoopGroup workerGroup = new NioEventLoopGroup();
17         
18         try {
19             Bootstrap b = new Bootstrap();            // 與ServerBootstrap類似
20             b.group(workerGroup);                    // 客戶端不需要boss worker
21             b.channel(NioSocketChannel.class);
22             b.option(ChannelOption.SO_KEEPALIVE, true);    // 客戶端的socketChannel沒有父親
23             b.handler(new ChannelInitializer<SocketChannel>() {
24                 @Override
25                 protected void initChannel(SocketChannel ch) throws Exception {
26                     // POJO
27                     ch.pipeline().addLast(new TimeDecoderPOJO() ,new TimeClientHandlerPOJO());
28                 }
29             });
30             
31             // 啟動客戶端,客戶端用connect連接
32             ChannelFuture f = b.connect(host, port).sync();
33             
34             // 等待連接關閉
35             f.channel().closeFuture().sync();
36         } finally {
37             workerGroup.shutdownGracefully();
38         }
39     }
40 }

 

 

 

 

 三、測試

   先運行服務器程序,運行結果如下圖:

  

  然后運行客戶端程序,運行結果如下圖:

  

  需要注意的是,Eclipse 是可以同時運行多個 Java 程序的,可以通過點擊

  

  來切換不同程序的控制台輸出窗口。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM