在這個例子中,我在服務器和客戶端連接被創立時發送一個消息,然后在客戶端解析收到的消息並輸出。並且,在這個項目中我使用 POJO 代替 ByteBuf 來作為傳輸對象。
一、服務器實現
1. 首先我們自定義傳輸數據對象
1 package com.coder.client; 2 3 import java.util.Date; 4 5 /** 6 * 自定義時間數據類 7 * @author Coder 8 * 9 */ 10 public class Time { 11 private final long value; 12 13 public Time() { 14 // 除以1000是為了使時間精確到秒 15 this(System.currentTimeMillis() / 1000L); 16 } 17 18 public Time(long value) { 19 this.value = value; 20 } 21 22 public long value() { 23 return value; 24 } 25 26 @Override 27 public String toString() { 28 return new Date((value()) * 1000L).toString(); 29 } 30 }
2. 然后我們需要自定義服務器數據編碼類
1 package com.coder.server; 2 3 import com.coder.client.Time; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.handler.codec.MessageToByteEncoder; 8 9 /** 10 * 服務器數據編碼類 11 * @author Coder 12 * 13 */ 14 public class TimeEncoderPOJO extends MessageToByteEncoder<Time> { 15 16 // 發送數據時調用 17 @Override 18 protected void encode(ChannelHandlerContext ctx, Time msg, ByteBuf out) throws Exception { 19 // 只傳輸當前時間,精確到秒 20 out.writeInt((int)msg.value()); 21 } 22 23 }
3. 也需要自定義服務器的業務邏輯類,如下:
1 package com.coder.server; 2 3 import com.coder.client.Time; 4 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelFutureListener; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelInboundHandlerAdapter; 9 10 /** 11 * 服務器解碼器 12 * 連接建立時發送當前時間 13 * @author Coder 14 * 15 */ 16 public class TimeServerHandlerPOJO extends ChannelInboundHandlerAdapter { 17 /** 18 * 連接建立的時候並且准備進行通信時被調用 19 */ 20 @Override 21 public void channelActive(final ChannelHandlerContext ctx) throws Exception { 22 // 發送當前時間信息 23 ChannelFuture f = ctx.writeAndFlush(new Time()); 24 // 發送完畢之后關閉 Channel 25 f.addListener(ChannelFutureListener.CLOSE); 26 } 27 28 @Override 29 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 30 cause.printStackTrace(); 31 ctx.close(); 32 } 33 }
4. 有了上面的代碼,我們就可以實現服務器程序了,如下:
1 package com.coder.server; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 public class TimeServerPOJO { 13 private int port; 14 15 public TimeServerPOJO(int port) { 16 this.port = port; 17 } 18 19 public void run() throws Exception { 20 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用來接收進來的連接 21 EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用來處理已經被接收的連接 22 System.out.println("准備運行端口:" + port); 23 24 try { 25 ServerBootstrap b = new ServerBootstrap(); // 啟動NIO服務的輔助啟動類 26 b.group(bossGroup, workerGroup) 27 .channel(NioServerSocketChannel.class) // 這里告訴Channel如何接收新的連接 28 .childHandler( new ChannelInitializer<SocketChannel>() { 29 @Override 30 protected void initChannel(SocketChannel ch) throws Exception { 31 // 自定義處理類 32 // 注意添加順序 33 ch.pipeline().addLast(new TimeEncoderPOJO(),new TimeServerHandlerPOJO()); 34 } 35 }) 36 .option(ChannelOption.SO_BACKLOG, 128) 37 .childOption(ChannelOption.SO_KEEPALIVE, true); 38 39 // 綁定端口,開始接收進來的連接 40 ChannelFuture f = b.bind(port).sync(); 41 42 // 等待服務器socket關閉 43 f.channel().closeFuture().sync(); 44 } catch (Exception e) { 45 workerGroup.shutdownGracefully(); 46 bossGroup.shutdownGracefully(); 47 } 48 } 49 50 public static void main(String[] args) throws Exception { 51 int port = 8080; 52 new TimeServer(port).run(); 53 } 54 }
執行代碼后如下:
這時候服務器在等待客戶端的連接(非阻塞)。
二、客戶端實現
客戶端的實現與服務器類似。
1. 自定義客戶端數據解碼類
1 package com.coder.client; 2 3 import java.util.List; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.handler.codec.ByteToMessageDecoder; 8 9 public class TimeDecoderPOJO extends ByteToMessageDecoder { 10 /** 11 * 有新數據接收時調用 12 * 為防止分包現象,先將數據存入內部緩存,到達滿足條件之后再進行解碼 13 */ 14 @Override 15 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { 16 if(in.readableBytes() < 4) { 17 return; 18 } 19 20 // out添加對象則表示解碼成功 21 out.add(new Time(in.readUnsignedInt())); 22 } 23 }
2. 自定義客戶端業務邏輯類
1 package com.coder.client; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelInboundHandlerAdapter; 5 6 /** 7 * 客戶端數據處理類 8 * @author Coder 9 * 10 */ 11 public class TimeClientHandlerPOJO extends ChannelInboundHandlerAdapter { 12 @Override 13 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 14 // 直接將信息轉換成Time類型輸出即可 15 Time time = (Time)msg; 16 System.out.println(time); 17 ctx.close(); 18 } 19 20 @Override 21 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 22 cause.printStackTrace(); 23 ctx.close(); 24 } 25 }
3. 客戶端程序實現
Netty 客戶端的通信步驟大致為:
- 創建一個 NIO 線程組,用於處理服務器與客戶端的連接,客戶端不需要用到 boss worker。
- 創建一個 Bootstrap 對象,配置 Netty 的一系列參數,由於客戶端 SocketChannel 沒有父親,所以不需要使用 childoption。
- 創建一個用於實際處理數據的類ChannelInitializer,進行初始化的准備工作,比如設置接受傳出數據的字符集、格式以及實際處理數據的接口。
- 配置服務器 IP 和端口號,建立與服務器的連接。
1 package com.coder.client; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioSocketChannel; 11 12 public class TimeClientPOJO { 13 public static void main(String[] args) throws Exception{ 14 String host = "127.0.0.1"; // ip 15 int port = 8080; // 端口 16 EventLoopGroup workerGroup = new NioEventLoopGroup(); 17 18 try { 19 Bootstrap b = new Bootstrap(); // 與ServerBootstrap類似 20 b.group(workerGroup); // 客戶端不需要boss worker 21 b.channel(NioSocketChannel.class); 22 b.option(ChannelOption.SO_KEEPALIVE, true); // 客戶端的socketChannel沒有父親 23 b.handler(new ChannelInitializer<SocketChannel>() { 24 @Override 25 protected void initChannel(SocketChannel ch) throws Exception { 26 // POJO 27 ch.pipeline().addLast(new TimeDecoderPOJO() ,new TimeClientHandlerPOJO()); 28 } 29 }); 30 31 // 啟動客戶端,客戶端用connect連接 32 ChannelFuture f = b.connect(host, port).sync(); 33 34 // 等待連接關閉 35 f.channel().closeFuture().sync(); 36 } finally { 37 workerGroup.shutdownGracefully(); 38 } 39 } 40 }
三、測試
先運行服務器程序,運行結果如下圖:
然后運行客戶端程序,運行結果如下圖:
需要注意的是,Eclipse 是可以同時運行多個 Java 程序的,可以通過點擊
來切換不同程序的控制台輸出窗口。