Netty Client和Server端實現


本文基於Nett4.0.26.Final版本淺析Client與Server端通訊,先看服務器端:

public class Server {

    public static void run(int port) {
        /**Netty創建ServerSocketChannel,默認SelectionKey.OP_ACCEPT*/
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                     .channel(NioServerSocketChannel.class)                // 設置Channel Type
                     .option(ChannelOption.SO_BACKLOG, 1024)            // 設置Channel屬性
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                            pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
                            pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                            pipeline.addLast(new SimpleChannelHandler());
                        }
                    });            
            /**服務器端綁定監聽端口並對Channel進行初始化
             * 1-ChannelConfig由ChannelOption初始化
             * 2-ChannelPipeline(默認DefaultChannelPipeline)添加ChannelHandler
             * 3-注冊Channel並添加監聽器ChannelFutureListener.CLOSE_ON_FAILURE
             * 以異步的方式等待上述操作的完成
             * */
            ChannelFuture channelFuture = bootstrap.bind(port).sync();
            if (channelFuture.isDone()) {
                System.out.println(String.format("server bind port %s sucess", port));
            }
            /**CloseFuture異步方式關閉*/
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String []args) {
        Server.run(8080);
    }
}

public class SimpleChannelHandler implements ChannelInboundHandler {

    private static final Gson GSON = new GsonBuilder().create();
    
    /**
     * the method called when new connect come
     * */
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println(String.format("last channel handler [%s] add", ctx.pipeline().last().getClass().getSimpleName()));
    }

    /**
     * the method called when client close connect 
     * */
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx.disconnect(ctx.newPromise());
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * register port for connect channel
     * */
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        String connect = ctx.channel().remoteAddress().toString().substring(1);
        System.out.println(String.format("remote connecter address %s", connect));
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request req = GSON.fromJson(String.valueOf(msg), Request.class);
        String json = GSON.toJson(new Response(String.format("server get client status [%s]", req.getStatus()), new Random().nextInt(10)));
        ctx.write(json);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        
    }

}

服務器端的ChannelHandler的handlerRemoved方法是當客戶端關閉鏈接時該方法被觸發,服務器應當關閉當前與客戶端的連接,完成TCP的四次揮手過程。

客戶端的實現:

public class Client {

	public static void run(String host, int port) {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group)
					 .channel(NioSocketChannel.class)
					 .option(ChannelOption.TCP_NODELAY, true)
					 .handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ChannelPipeline pipeline = ch.pipeline();
							pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
							pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
							pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
							pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
							pipeline.addLast(new SimpleClientChannelHandler());
						}
						
					});
			/**客戶端向服務器發起連接請求
			 * 1-ChannelConfig由ChannelOption初始化
			 * 2-ChannelPipeline(默認DefaultChannelPipeline)添加ChannelHandler
			 * 3-注冊Channel並添加監聽器ChannelFutureListener.CLOSE_ON_FAILURE
			 * 以異步的方式等待上述操作的完成
			 * */
			ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
			if (channelFuture.isSuccess()) {
				System.out.println(String.format("connect server(%s:%s) sucess", host, port));
			}
			channelFuture.channel().closeFuture().sync();
			System.out.println("client close sucess");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			group.shutdownGracefully();
		}
	}
	
	public static void main(String []args) {
		for (int i = 0 ; i < 3 ; ++i) {
			Client.run("127.0.0.1", 8080);
			System.out.println();
		}
//		Client.run("127.0.0.1", 8080);
	}
}


public class SimpleClientChannelHandler implements ChannelInboundHandler{

	private static final Gson GSON = new GsonBuilder().create();
	
	/**
	 * the method called when client add channel handler(1)
	 * */
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		ChannelHandler channelHandler = ctx.channel().pipeline().last();
		System.out.println("client last channel handle " + channelHandler.getClass().getSimpleName());
	}

	/**
	 * the method called when server disconnect 
	 * */
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		Channel ch = ctx.channel();
		SocketAddress local = ch.localAddress();
		SocketAddress remote = ch.remoteAddress();
		System.out.println(String.format("server(%s) diconnect and client(%s) close connect", remote.toString().substring(1), local.toString().substring(1)));
		ctx.close();
	}

	/**
	 * the method called for register port before connect server(2)
	 * */
	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
		System.out.println("client start to register port");
	}

	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
		
	}

	/**
	 * the method called when channel active(3)
	 * */
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		String json = GSON.toJson(new Request("client status", new Random().nextInt(10)));
		ctx.writeAndFlush(json);
		System.out.println(String.format("connect established and send to server message [%s]", json));
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		
	}

	/**
	 * close after receive response from server(server also should close connect)
	 * */
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		System.out.println(String.format("client receive message [%s]", String.valueOf(msg)));
		ctx.disconnect(ctx.newPromise());
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		System.out.println("77777");
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		System.out.println("88888");
	}

	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		System.out.println("99999");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
	}

}

  在客戶端的ChannelHandler中有幾個關鍵方法:

channelActive方法:客戶端與服務器建立連接且Channel被激活時該方法被調用,本文在客戶端與服務器端建立連接就緒時向服務器發送數據

channelRead方法:當服務器端有數據發送時方法被調用,本文在收到服務器端響應時關閉當前連接(此時服務器端的handlerRemoved方法被調用)

 handlerRemoved方法:當服務器確認斷開連接時該方法被調用,客戶端應關閉Channel(TCP四次揮手結束)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM