Netty4.0學習筆記系列之二:Handler的執行順序


Handler在netty中,無疑占據着非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、統一對日志錯誤進行處理、統一對請求進行計數、控制Handler執行與否。一句話,沒有它做不到的只有你想不到的。

Netty中的所有handler都實現自ChannelHandler接口。按照輸出輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類。ChannelInboundHandler對從客戶端發往服務器的報文進行處理,一般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler對從服務器發往客戶端的報文進行處理,一般用來進行編碼、發送報文到客戶端。

Netty中,可以注冊多個handler。ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行,如下圖所示,按照注冊的先后順序對Handler進行排序,request進入Netty后的執行順序為:

基本的概念就說到這,下面用一個例子來進行驗證。該例子模擬Client與Server間的通訊,Server端注冊了2個ChannelInboundHandler、2個ChannelOutboundHandler。當Client連接到Server后,會向Server發送一條消息。Server端通過ChannelInboundHandler 對Client發送的消息進行讀取,通過ChannelOutboundHandler向client發送消息。最后Client把接收到的信息打印出來。

Server端一共有5個類:HelloServer InboundHandler1 InboundHandler2 OutboundHandler1 OutboundHandler2

1、HelloServer 代碼如下

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.bootstrap.ServerBootstrap;  
  4. import io.netty.channel.ChannelFuture;  
  5. import io.netty.channel.ChannelInitializer;  
  6. import io.netty.channel.ChannelOption;  
  7. import io.netty.channel.EventLoopGroup;  
  8. import io.netty.channel.nio.NioEventLoopGroup;  
  9. import io.netty.channel.socket.SocketChannel;  
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  11.   
  12. public class HelloServer {  
  13.     public void start(int port) throws Exception {  
  14.         EventLoopGroup bossGroup = new NioEventLoopGroup();   
  15.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  16.         try {  
  17.             ServerBootstrap b = new ServerBootstrap();   
  18.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)   
  19.                     .childHandler(new ChannelInitializer<SocketChannel>() {   
  20.                                 @Override  
  21.                                 public void initChannel(SocketChannel ch) throws Exception {  
  22.                                     // 注冊兩個OutboundHandler,執行順序為注冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1  
  23.                                     ch.pipeline().addLast(new OutboundHandler1());  
  24.                                     ch.pipeline().addLast(new OutboundHandler2());  
  25.                                     // 注冊兩個InboundHandler,執行順序為注冊順序,所以應該是InboundHandler1 InboundHandler2  
  26.                                     ch.pipeline().addLast(new InboundHandler1());  
  27.                                     ch.pipeline().addLast(new InboundHandler2());  
  28.                                 }  
  29.                             }).option(ChannelOption.SO_BACKLOG, 128)   
  30.                     .childOption(ChannelOption.SO_KEEPALIVE, true);   
  31.   
  32.             ChannelFuture f = b.bind(port).sync();   
  33.   
  34.             f.channel().closeFuture().sync();  
  35.         } finally {  
  36.             workerGroup.shutdownGracefully();  
  37.             bossGroup.shutdownGracefully();  
  38.         }  
  39.     }  
  40.   
  41.     public static void main(String[] args) throws Exception {  
  42.         HelloServer server = new HelloServer();  
  43.         server.start(8000);  
  44.     }  
  45. }  


2、InboundHandler1

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.channel.ChannelHandlerContext;  
  4. import io.netty.channel.ChannelInboundHandlerAdapter;  
  5.   
  6. import org.slf4j.Logger;  
  7. import org.slf4j.LoggerFactory;  
  8.   
  9. public class InboundHandler1 extends ChannelInboundHandlerAdapter {  
  10.     private static Logger   logger  = LoggerFactory.getLogger(InboundHandler1.class);  
  11.   
  12.     @Override  
  13.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
  14.         logger.info("InboundHandler1.channelRead: ctx :" + ctx);  
  15.         // 通知執行下一個InboundHandler  
  16.         ctx.fireChannelRead(msg);  
  17.     }  
  18.   
  19.     @Override  
  20.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
  21.         logger.info("InboundHandler1.channelReadComplete");  
  22.         ctx.flush();  
  23.     }  
  24. }  

3、InboundHandler2

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.ChannelInboundHandlerAdapter;  
  6.   
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. public class InboundHandler2 extends ChannelInboundHandlerAdapter {  
  11.     private static Logger   logger  = LoggerFactory.getLogger(InboundHandler2.class);  
  12.   
  13.     @Override  
  14.     // 讀取Client發送的信息,並打印出來  
  15.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
  16.         logger.info("InboundHandler2.channelRead: ctx :" + ctx);  
  17.         ByteBuf result = (ByteBuf) msg;  
  18.         byte[] result1 = new byte[result.readableBytes()];  
  19.         result.readBytes(result1);  
  20.         String resultStr = new String(result1);  
  21.         System.out.println("Client said:" + resultStr);  
  22.         result.release();  
  23.   
  24.         ctx.write(msg);  
  25.     }  
  26.   
  27.     @Override  
  28.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
  29.         logger.info("InboundHandler2.channelReadComplete");  
  30.         ctx.flush();  
  31.     }  
  32.   
  33. }  

 

4、OutboundHandler1

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.ChannelOutboundHandlerAdapter;  
  6. import io.netty.channel.ChannelPromise;  
  7.   
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10.   
  11. public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {  
  12.     private static Logger   logger  = LoggerFactory.getLogger(OutboundHandler1.class);  
  13.     @Override  
  14.     // 向client發送消息  
  15.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
  16.         logger.info("OutboundHandler1.write");  
  17.         String response = "I am ok!";  
  18.         ByteBuf encoded = ctx.alloc().buffer(4 * response.length());  
  19.         encoded.writeBytes(response.getBytes());  
  20.         ctx.write(encoded);  
  21.         ctx.flush();  
  22.     }  
  23.       
  24.       
  25. }  

5、OutboundHandler2

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.channel.ChannelHandlerContext;  
  4. import io.netty.channel.ChannelOutboundHandlerAdapter;  
  5. import io.netty.channel.ChannelPromise;  
  6.   
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {  
  11.     private static Logger   logger  = LoggerFactory.getLogger(OutboundHandler2.class);  
  12.       
  13.     @Override  
  14.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
  15.         logger.info("OutboundHandler2.write");  
  16.         // 執行下一個OutboundHandler  
  17.         super.write(ctx, msg, promise);  
  18.     }  
  19. }  


Client端有兩個類:HelloClient  HelloClientIntHandler

1、HelloClient  

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.bootstrap.Bootstrap;  
  4. import io.netty.channel.ChannelFuture;  
  5. import io.netty.channel.ChannelInitializer;  
  6. import io.netty.channel.ChannelOption;  
  7. import io.netty.channel.EventLoopGroup;  
  8. import io.netty.channel.nio.NioEventLoopGroup;  
  9. import io.netty.channel.socket.SocketChannel;  
  10. import io.netty.channel.socket.nio.NioSocketChannel;  
  11.   
  12. public class HelloClient {  
  13.     public void connect(String host, int port) throws Exception {  
  14.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  15.   
  16.         try {  
  17.             Bootstrap b = new Bootstrap();  
  18.             b.group(workerGroup);  
  19.             b.channel(NioSocketChannel.class);  
  20.             b.option(ChannelOption.SO_KEEPALIVE, true);  
  21.             b.handler(new ChannelInitializer<SocketChannel>() {  
  22.                 @Override  
  23.                 public void initChannel(SocketChannel ch) throws Exception {  
  24.                     ch.pipeline().addLast(new HelloClientIntHandler());  
  25.                 }  
  26.             });  
  27.   
  28.             // Start the client.  
  29.             ChannelFuture f = b.connect(host, port).sync();  
  30.             f.channel().closeFuture().sync();  
  31.         } finally {  
  32.             workerGroup.shutdownGracefully();  
  33.         }  
  34.     }  
  35.   
  36.     public static void main(String[] args) throws Exception {  
  37.         HelloClient client = new HelloClient();  
  38.         client.connect("127.0.0.1", 8000);  
  39.     }  
  40. }  

 

2、HelloClientIntHandler

[java]  view plain copy
  1. package com.guowl.testmultihandler;  
  2.   
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.ChannelInboundHandlerAdapter;  
  6.   
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. public class HelloClientIntHandler extends ChannelInboundHandlerAdapter {  
  11.     private static Logger   logger  = LoggerFactory.getLogger(HelloClientIntHandler.class);  
  12.     @Override  
  13.     // 讀取服務端的信息  
  14.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
  15.         logger.info("HelloClientIntHandler.channelRead");  
  16.         ByteBuf result = (ByteBuf) msg;  
  17.         byte[] result1 = new byte[result.readableBytes()];  
  18.         result.readBytes(result1);  
  19.         result.release();  
  20.         ctx.close();  
  21.         System.out.println("Server said:" + new String(result1));  
  22.     }  
  23.     @Override  
  24.     // 當連接建立的時候向服務端發送消息 ,channelActive 事件當連接建立的時候會觸發  
  25.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  26.         logger.info("HelloClientIntHandler.channelActive");  
  27.         String msg = "Are you ok?";  
  28.         ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());  
  29.         encoded.writeBytes(msg.getBytes());  
  30.         ctx.write(encoded);  
  31.         ctx.flush();  
  32.     }  
  33. }  

 

server端執行結果為:

 

在使用Handler的過程中,需要注意:

1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler。

2、ctx.write()方法執行后,需要調用flush()方法才能令它立即執行。

3、ChannelOutboundHandler 在注冊的時候需要放在最后一個ChannelInboundHandler之前,否則將無法傳遞到ChannelOutboundHandler。

 

  • 如果調用的是ctx.channel().write()是從尾開始執行,不會有博主說的問題,如果是直接用ctx.write()時則會有博主說的問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM