Netty 是一個 Java NIO 客戶端服務器框架,使用它可以快速簡單地開發網絡應用程序,比如服務器和客戶端的協議。Netty 大大簡化了網絡程序的開發過程比如 TCP 和 UDP 的 socket 服務的開發。更多關於 Netty 的知識,可以參閱《Netty 4.x 用戶指南》(https://github.com/waylau/netty-4-user-guide)
下面,就基於 Netty 快速實現一個聊天小程序。
准備
- JDK 7+
- Maven 3.2.x
- Netty 4.x
- Eclipse 4.x
服務端
讓我們從 handler (處理器)的實現開始,handler 是由 Netty 生成用來處理 I/O 事件的。
SimpleChatServerHandler.java
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1) public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n"); // A closed Channel is automatically removed from ChannelGroup, // so there is no need to do "channels.remove(ctx.channel());" } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4) Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n"); } else { channel.writeAndFlush("[you]" + s + "\n"); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常"); // 當出現異常就關閉連接 cause.printStackTrace(); ctx.close(); } }
-
SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個類實現了 ChannelInboundHandler 接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現接口方法。
-
覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端連接時,客戶端的 Channel 存入 ChannelGroup 列表中,並通知列表中的其他客戶端 Channel
-
覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 自動從 ChannelGroup 列表中移除了,並通知列表中的其他客戶端 Channel
-
覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()
-
覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端活動
-
覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動
-
exceptionCaught() 事件處理方法是當出現 Throwable 對象才會被調用,即當 Netty 由於 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。
SimpleChatServerInitializer.java
SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上"); } }
SimpleChatServer.java
編寫一個 main() 方法來啟動服務端。
public class SimpleChatServer { private int port; public SimpleChatServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new SimpleChatServerInitializer()) //(4) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("SimpleChatServer 啟動了"); // 綁定端口,開始接收進來的連接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服務器 socket 關閉 。 // 在這個例子中,這不會發生,但你可以優雅地關閉你的服務器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 關閉了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleChatServer(port).run(); } }
-
NioEventLoopGroup 是用來處理I/O操作的多線程事件循環器,Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經被使用,如何映射到已經創建的 Channel上都需要依賴於 EventLoopGroup 的實現,並且可以通過構造函數來配置他們的關系。
-
ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你並不需要這樣做。
-
這里我們指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連接。
-
這里的事件處理類經常會被用來處理一個最近的已經接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。
-
你可以設置這里指定的 Channel 實現的配置參數。我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。請參考 ChannelOption 和詳細的 ChannelConfig 實現的接口文檔以此可以對ChannelOption 的有一個大概的認識。
-
option() 是提供給NioServerSocketChannel 用來接收進來的連接。childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個例子中也是 NioServerSocketChannel。
-
我們繼續,剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網卡上的 8080 端口。當然現在你可以多次調用 bind() 方法(基於不同綁定地址)。
恭喜!你已經完成了基於 Netty 聊天服務端程序。
客戶端
SimpleChatClientHandler.java
客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(s); } }
SimpleChatClientInitializer.java
與服務端類似
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } }
SimpleChatClient.java
編寫一個 main() 方法來啟動客戶端。
public class SimpleChatClient { public static void main(String[] args) throws Exception{ new SimpleChatClient("localhost", 8080).run(); } private final String host; private final int port; public SimpleChatClient(String host, int port){ this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleChatClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true){ channel.writeAndFlush(in.readLine() + "\r\n"); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } }
運行效果
先運行 SimpleChatServer,再可以運行多個 SimpleChatClient,控制台輸入文本繼續測試