7.Netty中 handler 的執行順序


1.Netty中handler的執行順序 

  Handler在Netty中,無疑占據着非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、

統一對日志錯誤進行處理、統一對請求進行計數、控制Handler執行與否。一句話,沒有它做不到的只有你想不到的

  Netty中的所有handler都實現自ChannelHandler接口。按照輸入輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類

ChannelInboundHandler對從客戶端發往服務器的報文進行處理,一般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler

對從服務器發往客戶端的報文進行處理,一般用來進行編碼、發送報文到客戶端

  Netty中可以注冊多個handler。ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行

如下圖所示,按照注冊的先后順序對Handler進行排序,request進入Netty后的執行順序為:

 

 

2.Netty中handler執行順序代碼示例:

客戶端代碼同上

服務端代碼及業務邏輯類:

 

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;/**
 * • 配置服務器功能,如線程、端口 • 實現服務器處理程序,它包含業務邏輯,決定當有一個請求連接或接收數據時該做什么
 */
public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup eventLoopGroup = null;
        try {
            //server端引導類
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //連接池處理數據
            eventLoopGroup = new NioEventLoopGroup();
            serverBootstrap.group(eventLoopGroup)
            .channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel
            .localAddress("localhost",port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。
            .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    // 注冊兩個InboundHandler,執行順序為注冊順序,所以應該是InboundHandler1 InboundHandler2
                    ch.pipeline().addLast(new EchoOutHandler1());
                    ch.pipeline().addLast(new EchoOutHandler2()); 
                    // 注冊兩個OutboundHandler,執行順序為注冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1
                    ch.pipeline().addLast(new EchoInHandler1());
                    ch.pipeline().addLast(new EchoInHandler2());
                }
                    });
            // 最后綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然后服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(20000).start();
    }
}

 

業務類EchoInHandler1

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

import cn.itcast_03_netty.sendobject.bean.Person;

public class EchoInHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in1");
         // 通知執行下一個InboundHandler
 ctx.fireChannelRead(msg);//ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新后才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

業務類 EchoInHandler2:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class EchoInHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("in2");
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("接收客戶端數據:" + body);
        //向客戶端寫數據
        System.out.println("server向client發送數據");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp); //ctx.write(msg) 將傳遞到 ChannelOutboundHandler
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新后才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

業務邏輯類 EchoOutHandler2

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {

     @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("out2");
            // 執行下一個OutboundHandler
            super.write(ctx, msg, promise);
        }
}

業務邏輯類 EchoOutHandler1:

import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    // 向client發送消息
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("out1");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp); ctx.flush(); //ctx.write()方法執行后,需要調用flush()方法才能令它立即執行
       }
}

運行結果:

開始監聽,端口為:/127.0.0.1:20000
in1
in2
接收客戶端數據:QUERY TIME ORDER
server向client發送數據
out2
out1

總結: 

在使用Handler的過程中,需要注意:

  1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler

  2、ctx.write()方法執行后,需要調用flush()方法才能令它立即執行。

  3、ChannelOutboundHandler 在注冊的時候需要放在最后一個ChannelInboundHandler之前,否則將無法傳遞到ChannelOutboundHandler

   (流水線pipeline中outhander不能放到最后,否則不生效)

  4、Handler的消費處理放在最后一個處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM