Netty框架的簡單實現
一、Netty的原理分析圖
二、使用Netty框架的簡單實現(實現client和server的消息收發)
(1)NettyServer
1 public class NettyServer { 2 public static void main(String[] args) throws Exception{ 3 4 //創建兩個線程池 5 6 //創建一個線程組,接收客戶端的連接 7 EventLoopGroup bossGroup = new NioEventLoopGroup(); 8 //創建一個線程組,用於處理網絡操作 9 EventLoopGroup workerGroup = new NioEventLoopGroup(); 10 //創建服務器端啟動助手(用於配置參數) 11 ServerBootstrap serverBootstrap = new ServerBootstrap(); 12 serverBootstrap.group(bossGroup,workerGroup)//設置兩個線程組 13 .channel(NioServerSocketChannel.class)//精華部分,設置通道的底層實現, 14 //通過NioServerSocketChannel 15 //這也是Netty的與NIO搭配的地方(此處作為服務器端通道的實現) 16 .option(ChannelOption.SO_BACKLOG, 12)//設置線程隊列中等待連接的個數 17 .childOption(ChannelOption.SO_KEEPALIVE, true) 18 //是否啟用心跳保活機制。在雙方TCP套接字建立連接后(即都進入ESTABLISHED狀態)並 19 //且在兩個小時左右 20 //上層沒有任何數據傳輸的情況下,這套機制才會被激活。 21 * */ 22 .childHandler(new ChannelInitializer<SocketChannel>() {//(用內部類的方法) 23 //創建一個通道初始化對象 24 public void initChannel(SocketChannel sc){ 25 sc.pipeline().addLast(new NettyServerHandler());//往pipeline鏈中添加 26 //自定義的handler類 27 } 28 }); 29 System.out.println("...Server is Ready..."); 30 //ChannelFuture接口,用於在之后的某個時間點確定結果 31 ChannelFuture sf = serverBootstrap.bind(9999).sync();//綁定端口 非阻塞 異步 32 System.out.println("....Server is Start...."); 33 //關閉通道,關閉線程組 34 sf.channel().closeFuture().sync(); 35 bossGroup.shutdownGracefully(); 36 workerGroup.shutdownGracefully(); 37 } 38 }
(2)NettyServerHandler
1 //服務器中的業務處理類 2 public class NettyServerHandler extends ChannelInboundHandlerAdapter { 3 4 //數據讀取事件 5 public void channelRead(ChannelHandlerContext ctx,Object msg){ 6 //傳來的消息包裝成字節緩沖區 7 ByteBuf byteBuf = (ByteBuf) msg; 8 //Netty提供了字節緩沖區的toString方法,並且可以設置參數為編碼格式:CharsetUtil.UTF_8 9 System.out.println("客戶端發來的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); 10 } 11 12 //數據讀取完畢事件 13 public void channelReadComplete(ChannelHandlerContext ctx){ 14 //數據讀取完畢,將信息包裝成一個Buffer傳遞給下一個Handler,Unpooled.copiedBuffer會返回一個Buffer 15 //調用的是事件處理器的上下文對象的writeAndFlush方法 16 //意思就是說將 你好 傳遞給了下一個handler 17 ctx.writeAndFlush(Unpooled.copiedBuffer("你好!", CharsetUtil.UTF_8)); 18 } 19 20 //異常發生的事件 21 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){ 22 //異常發生時關閉上下文對象 23 ctx.close(); 24 } 25 }
(3)NettyClient
1 //網絡客戶端 2 public class NettyClient { 3 public static void main(String[] args) throws Exception{ 4 //創建一個線程組(不像服務端需要有連接等待的線程池) 5 EventLoopGroup group = new NioEventLoopGroup(); 6 //創建客戶端的服務啟動助手完成相應配置 7 Bootstrap b = new Bootstrap(); 8 b.group(group) 9 .channel(NioSocketChannel.class) 10 .handler(new ChannelInitializer<SocketChannel>() {//創建一個通道初始化對象 11 @Override 12 protected void initChannel(SocketChannel socketChannel) throws Exception { 13 socketChannel.pipeline().addLast(new NettyClientHandler());//往pipeline中添加自定義的handler 14 } 15 }); 16 System.out.println("...Client is Ready..."); 17 //啟動客戶端去連接服務器端(通過啟動助手) 18 ChannelFuture cf = b.connect("127.0.0.1", 9999).sync(); 19 //關閉連接(異步非阻塞) 20 cf.channel().closeFuture().sync(); 21 22 } 23 }
(4)NettyClientHandler
1 //客戶端業務處理類 2 public class NettyClientHandler extends ChannelInboundHandlerAdapter { 3 4 //通道就緒事件(就是在bootstrap啟動助手配置中addlast了handler之后就會觸發此事件) 5 //但我覺得也可能是當有客戶端連接上后才為一次通道就緒 6 public void channelActive(ChannelHandlerContext ctx){ 7 System.out.println("Client :" + ctx); 8 //向服務器端發消息 9 ctx.writeAndFlush(Unpooled.copiedBuffer("你好啊!", CharsetUtil.UTF_8)); 10 } 11 //數據讀取事件 12 public void channelRead(ChannelHandlerContext ctx,Object msg){ 13 //傳來的消息包裝成字節緩沖區 14 ByteBuf byteBuf = (ByteBuf) msg; 15 //Netty提供了字節緩沖區的toString方法,並且可以設置參數為編碼格式:CharsetUtil.UTF_8 16 System.out.println("服務器端發來的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); 17 } 18 19 }
通道的消息處理都是通過channelHandlerContext對象的writeAndFlush方法來處理的。