Netty 實現聊天功能


Netty 是一個 Java NIO 客戶端服務器框架,使用它可以快速簡單地開發網絡應用程序,比如服務器和客戶端的協議。Netty 大大簡化了網絡程序的開發過程比如 TCP 和 UDP 的 socket 服務的開發。更多關於 Netty 的知識,可以參閱《Netty 4.x 用戶指南》(https://github.com/waylau/netty-4-user-guide

下面,就基於 Netty 快速實現一個聊天小程序。

准備

  • JDK 7+
  • Maven 3.2.x
  • Netty 4.x
  • Eclipse 4.x

服務端

讓我們從 handler (處理器)的實現開始,handler 是由 Netty 生成用來處理 I/O 事件的。

SimpleChatServerHandler.java

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1) public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 離開\n"); // A closed Channel is automatically removed from ChannelGroup, // so there is no need to do "channels.remove(ctx.channel());" } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4) Channel incoming = ctx.channel(); for (Channel channel : channels) { if (channel != incoming){ channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n"); } else { channel.writeAndFlush("[you]" + s + "\n"); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常"); // 當出現異常就關閉連接 cause.printStackTrace(); ctx.close(); } } 
  1. SimpleChatServerHandler 繼承自 SimpleChannelInboundHandler,這個類實現了 ChannelInboundHandler 接口,ChannelInboundHandler 提供了許多事件處理的接口方法,然后你可以覆蓋這些方法。現在僅僅只需要繼承 SimpleChannelInboundHandler 類而不是你自己去實現接口方法。

  2. 覆蓋了 handlerAdded() 事件處理方法。每當從服務端收到新的客戶端連接時,客戶端的 Channel 存入 ChannelGroup 列表中,並通知列表中的其他客戶端 Channel

  3. 覆蓋了 handlerRemoved() 事件處理方法。每當從服務端收到客戶端斷開時,客戶端的 Channel 自動從 ChannelGroup 列表中移除了,並通知列表中的其他客戶端 Channel

  4. 覆蓋了 channelRead0() 事件處理方法。每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的 Channel。其中如果你使用的是 Netty 5.x 版本時,需要把 channelRead0() 重命名為messageReceived()

  5. 覆蓋了 channelActive() 事件處理方法。服務端監聽到客戶端活動

  6. 覆蓋了 channelInactive() 事件處理方法。服務端監聽到客戶端不活動

  7. exceptionCaught() 事件處理方法是當出現 Throwable 對象才會被調用,即當 Netty 由於 IO 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的 channel 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。

SimpleChatServerInitializer.java

SimpleChatServerInitializer 用來增加多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleChatServerHandler 等。

public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); System.out.println("SimpleChatClient:"+ch.remoteAddress() +"連接上"); } } 

SimpleChatServer.java

編寫一個 main() 方法來啟動服務端。

public class SimpleChatServer { private int port; public SimpleChatServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new SimpleChatServerInitializer()) //(4) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("SimpleChatServer 啟動了"); // 綁定端口,開始接收進來的連接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服務器 socket 關閉 。 // 在這個例子中,這不會發生,但你可以優雅地關閉你的服務器。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 關閉了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleChatServer(port).run(); } } 
  1. NioEventLoopGroup 是用來處理I/O操作的多線程事件循環器,Netty 提供了許多不同的 EventLoopGroup 的實現用來處理不同的傳輸。在這個例子中我們實現了一個服務端的應用,因此會有2個 NioEventLoopGroup 會被使用。第一個經常被叫做‘boss’,用來接收進來的連接。第二個經常被叫做‘worker’,用來處理已經被接收的連接,一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。如何知道多少個線程已經被使用,如何映射到已經創建的 Channel上都需要依賴於 EventLoopGroup 的實現,並且可以通過構造函數來配置他們的關系。

  2. ServerBootstrap 是一個啟動 NIO 服務的輔助啟動類。你可以在這個服務中直接使用 Channel,但是這會是一個復雜的處理過程,在很多情況下你並不需要這樣做。

  3. 這里我們指定使用 NioServerSocketChannel 類來舉例說明一個新的 Channel 如何接收進來的連接。

  4. 這里的事件處理類經常會被用來處理一個最近的已經接收的 Channel。SimpleChatServerInitializer 繼承自ChannelInitializer 是一個特殊的處理類,他的目的是幫助使用者配置一個新的 Channel。也許你想通過增加一些處理類比如 SimpleChatServerHandler 來配置一個新的 Channel 或者其對應的ChannelPipeline 來實現你的網絡程序。當你的程序變的復雜時,可能你會增加更多的處理類到 pipline 上,然后提取這些匿名類到最頂層的類上。

  5. 你可以設置這里指定的 Channel 實現的配置參數。我們正在寫一個TCP/IP 的服務端,因此我們被允許設置 socket 的參數選項比如tcpNoDelay 和 keepAlive。請參考 ChannelOption 和詳細的 ChannelConfig 實現的接口文檔以此可以對ChannelOption 的有一個大概的認識。

  6. option() 是提供給NioServerSocketChannel 用來接收進來的連接。childOption() 是提供給由父管道 ServerChannel 接收到的連接,在這個例子中也是 NioServerSocketChannel。

  7. 我們繼續,剩下的就是綁定端口然后啟動服務。這里我們在機器上綁定了機器所有網卡上的 8080 端口。當然現在你可以多次調用 bind() 方法(基於不同綁定地址)。

恭喜!你已經完成了基於 Netty 聊天服務端程序。

客戶端

SimpleChatClientHandler.java

客戶端的處理類比較簡單,只需要將讀到的信息打印出來即可

public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(s); } } 

SimpleChatClientInitializer.java

與服務端類似

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } } 

SimpleChatClient.java

編寫一個 main() 方法來啟動客戶端。

public class SimpleChatClient { public static void main(String[] args) throws Exception{ new SimpleChatClient("localhost", 8080).run(); } private final String host; private final int port; public SimpleChatClient(String host, int port){ this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleChatClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true){ channel.writeAndFlush(in.readLine() + "\r\n"); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } } 

運行效果

先運行 SimpleChatServer,再可以運行多個 SimpleChatClient,控制台輸入文本繼續測試

源碼

見 https://github.com/waylau/netty-4-user-guide-demos 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM