本文基於Nett4.0.26.Final版本淺析Client與Server端通訊,先看服務器端:
public class Server { public static void run(int port) { /**Netty創建ServerSocketChannel,默認SelectionKey.OP_ACCEPT*/ EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) // 設置Channel Type .option(ChannelOption.SO_BACKLOG, 1024) // 設置Channel屬性 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new SimpleChannelHandler()); } }); /**服務器端綁定監聽端口並對Channel進行初始化 * 1-ChannelConfig由ChannelOption初始化 * 2-ChannelPipeline(默認DefaultChannelPipeline)添加ChannelHandler * 3-注冊Channel並添加監聽器ChannelFutureListener.CLOSE_ON_FAILURE * 以異步的方式等待上述操作的完成 * */ ChannelFuture channelFuture = bootstrap.bind(port).sync(); if (channelFuture.isDone()) { System.out.println(String.format("server bind port %s sucess", port)); } /**CloseFuture異步方式關閉*/ channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } public static void main(String []args) { Server.run(8080); } } public class SimpleChannelHandler implements ChannelInboundHandler { private static final Gson GSON = new GsonBuilder().create(); /** * the method called when new connect come * */ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println(String.format("last channel handler [%s] add", ctx.pipeline().last().getClass().getSimpleName())); } /** * the method called when client close connect * */ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { ctx.disconnect(ctx.newPromise()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } /** * register port for connect channel * */ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { String connect = ctx.channel().remoteAddress().toString().substring(1); System.out.println(String.format("remote connecter address %s", connect)); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request req = GSON.fromJson(String.valueOf(msg), Request.class); String json = GSON.toJson(new Response(String.format("server get client status [%s]", req.getStatus()), new Random().nextInt(10))); ctx.write(json); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } }
服務器端的ChannelHandler的handlerRemoved方法是當客戶端關閉鏈接時該方法被觸發,服務器應當關閉當前與客戶端的連接,完成TCP的四次揮手過程。
客戶端的實現:
public class Client { public static void run(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new SimpleClientChannelHandler()); } }); /**客戶端向服務器發起連接請求 * 1-ChannelConfig由ChannelOption初始化 * 2-ChannelPipeline(默認DefaultChannelPipeline)添加ChannelHandler * 3-注冊Channel並添加監聽器ChannelFutureListener.CLOSE_ON_FAILURE * 以異步的方式等待上述操作的完成 * */ ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); if (channelFuture.isSuccess()) { System.out.println(String.format("connect server(%s:%s) sucess", host, port)); } channelFuture.channel().closeFuture().sync(); System.out.println("client close sucess"); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String []args) { for (int i = 0 ; i < 3 ; ++i) { Client.run("127.0.0.1", 8080); System.out.println(); } // Client.run("127.0.0.1", 8080); } } public class SimpleClientChannelHandler implements ChannelInboundHandler{ private static final Gson GSON = new GsonBuilder().create(); /** * the method called when client add channel handler(1) * */ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ChannelHandler channelHandler = ctx.channel().pipeline().last(); System.out.println("client last channel handle " + channelHandler.getClass().getSimpleName()); } /** * the method called when server disconnect * */ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); SocketAddress local = ch.localAddress(); SocketAddress remote = ch.remoteAddress(); System.out.println(String.format("server(%s) diconnect and client(%s) close connect", remote.toString().substring(1), local.toString().substring(1))); ctx.close(); } /** * the method called for register port before connect server(2) * */ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("client start to register port"); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } /** * the method called when channel active(3) * */ public void channelActive(ChannelHandlerContext ctx) throws Exception { String json = GSON.toJson(new Request("client status", new Random().nextInt(10))); ctx.writeAndFlush(json); System.out.println(String.format("connect established and send to server message [%s]", json)); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } /** * close after receive response from server(server also should close connect) * */ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(String.format("client receive message [%s]", String.valueOf(msg))); ctx.disconnect(ctx.newPromise()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("77777"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("88888"); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("99999"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
在客戶端的ChannelHandler中有幾個關鍵方法:
channelActive方法:客戶端與服務器建立連接且Channel被激活時該方法被調用,本文在客戶端與服務器端建立連接就緒時向服務器發送數據
channelRead方法:當服務器端有數據發送時方法被調用,本文在收到服務器端響應時關閉當前連接(此時服務器端的handlerRemoved方法被調用)
handlerRemoved方法:當服務器確認斷開連接時該方法被調用,客戶端應關閉Channel(TCP四次揮手結束)