netty4.1.32學習
官方api:https://netty.io/4.1/api/index.html
Netty 實戰(精髓):https://waylau.gitbooks.io/essential-netty-in-action/
一、簡單介紹
本文是根據李林峰書籍《Netty權威指南》(第2版)總結而來。
二、Netty簡介
Netty是一個高性能、異步事件驅動的NIO框架,提供了對TCP、UDP和文件傳輸的支持,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結果。
作為當前最流行的NIO框架,Netty在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源組件也基於Netty構建,比如RPC框架、zookeeper等。
三、Netty學習路線(與書本順序基本一致)
1、NIO入門
2、Netty入門應用介紹(helloworld:時間服務器)
3、TCP粘包/拆包問題解決方法:
(1)LineBasedFromeDecoder+StringDecoder:按行切換的文本解碼器
(2)DelimiterBasedFrameDecoder+StringDecoder:自動完成以分隔符做結束標志的消息的解碼器
(3)FixedLengthFrameDecoder+StringDecoder:自動完成對定長消息的解碼器
4、Netty序列化問題解決方法:
(1)介紹Java序列化的缺點
(2)MessagePack
(3)Google的ProtoBuf
(4)Facebook的Thrift
(5)JBoss Marshalling
5、Netty多協議開發和應用
(1)HTTP協議介紹
(2)Netty HTTP+XML協議棧開發
四、Netty實例(源碼)
1、解決粘包/拆包問題(LineBasedFromeDecoder實現)

1 package com.rs.test.timeserver; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.LineBasedFrameDecoder; 12 import io.netty.handler.codec.string.StringDecoder; 13 14 public class TimeServer { 15 16 public void bind(int port) throws Exception { 17 // 配置服務端的NIO線程組 18 EventLoopGroup bossGroup = new NioEventLoopGroup(); 19 EventLoopGroup workerGroup = new NioEventLoopGroup(); 20 21 try { 22 ServerBootstrap b = new ServerBootstrap(); 23 b.group(bossGroup, workerGroup) 24 .channel(NioServerSocketChannel.class) 25 .option(ChannelOption.SO_BACKLOG, 1024) 26 .childHandler(new ChildChannelHandler()); 27 28 // 綁定端口,同步等待成功 29 ChannelFuture f = b.bind(port).sync(); 30 31 // 等待服務端監聽端口關閉 32 f.channel().closeFuture().sync(); 33 } finally { 34 // 優雅退出,釋放線程池資源 35 bossGroup.shutdownGracefully(); 36 workerGroup.shutdownGracefully(); 37 } 38 } 39 40 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 41 42 @Override 43 protected void initChannel(SocketChannel ch) throws Exception { 44 // 核心在下面兩行,加入了LineBasedFrameDecoder和StringDecoder兩個解碼器 45 // 所以當消息到達我們的業務處理handler即TimerServerHandler,所看到的消息 46 // 都是前面兩個解碼器經過處理之后的結果 47 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 48 ch.pipeline().addLast(new StringDecoder()); 49 ch.pipeline().addLast(new TimeServerHandler()); 50 } 51 52 } 53 54 public static void main(String[] args) throws Exception { 55 int port = 8080; 56 if(args != null && args.length > 0) { 57 try { 58 port = Integer.valueOf(port); 59 } catch (NumberFormatException e) { 60 // TODO: handle exception 61 } 62 } 63 new TimeServer().bind(port); 64 } 65 66 }

1 package com.rs.test.timeserver; 2 3 import java.sql.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 import io.netty.channel.ChannelInboundHandlerAdapter; 10 11 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 12 13 private int counter = 0; 14 15 @Override 16 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 17 String body = (String) msg; 18 // counter的作用是標記這是第幾次收到客戶端的請求 19 System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter); 20 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? 21 new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 22 currentTime = currentTime + System.getProperty("line.separator"); 23 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 24 ctx.write(resp); 25 } 26 27 @Override 28 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 29 ctx.flush(); 30 } 31 32 @Override 33 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 34 ctx.close(); 35 } 36 }

1 package com.rs.test.timeserver; 2 import io.netty.bootstrap.Bootstrap; 3 import io.netty.channel.ChannelFuture; 4 import io.netty.channel.ChannelInitializer; 5 import io.netty.channel.ChannelOption; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel; 10 import io.netty.handler.codec.LineBasedFrameDecoder; 11 import io.netty.handler.codec.string.StringDecoder; 12 13 public class TimeClient { 14 15 public void connect(int port, String host) throws Exception { 16 // 配置客戶端NIO線程組 17 EventLoopGroup group = new NioEventLoopGroup(); 18 try { 19 Bootstrap b = new Bootstrap(); 20 b.group(group).channel(NioSocketChannel.class) 21 .option(ChannelOption.TCP_NODELAY, true) 22 .handler(new ChannelInitializer<SocketChannel>() { 23 24 @Override 25 protected void initChannel(SocketChannel ch) throws Exception { 26 // 核心在下面兩行,加入了LineBasedFrameDecoder和StringDecoder兩個解碼器 27 // 所以當消息到達我們的業務處理handler即TimerServerHandler,所看到的消息 28 // 都是前面兩個解碼器經過處理之后的結果 29 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 30 ch.pipeline().addLast(new StringDecoder()); 31 ch.pipeline().addLast(new TimeClientHandler()); 32 } 33 }); 34 // 發起異步連接操作 35 ChannelFuture f = b.connect(host, port).sync(); 36 37 // 等待客戶端鏈路關閉 38 f.channel().closeFuture().sync(); 39 } finally { 40 // 優雅退出,釋放NIO線程組 41 group.shutdownGracefully(); 42 } 43 } 44 45 public static void main(String[] args) throws Exception { 46 int port = 8080; 47 if(args != null && args.length > 0) { 48 try { 49 port = Integer.valueOf(port); 50 } catch (NumberFormatException e) { 51 // 采用默認值 52 } 53 } 54 new TimeClient().connect(port, "localhost"); 55 } 56 }

1 package com.rs.test.timeserver; 2 import java.util.logging.Logger; 3 4 import io.netty.buffer.ByteBuf; 5 import io.netty.buffer.Unpooled; 6 import io.netty.channel.ChannelHandlerAdapter; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelInboundHandlerAdapter; 9 10 public class TimeClientHandler extends ChannelInboundHandlerAdapter { 11 12 private static final Logger logger = Logger.getLogger(TimeServerHandler.class.getName()); 13 14 private int counter; 15 16 private byte[] req; 17 18 public TimeClientHandler() { 19 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 20 } 21 22 @Override 23 public void channelActive(ChannelHandlerContext ctx) { 24 ByteBuf message = null; 25 for(int i = 0; i < 100; i++) { 26 message = Unpooled.buffer(req.length); 27 message.writeBytes(req); 28 ctx.writeAndFlush(message); 29 } 30 } 31 32 @Override 33 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 34 String body = (String) msg; 35 // counter的作用是標記這是第幾次收到客戶端的請求 36 System.out.println("Now is : " + body + " ; the counter is : " + ++counter); 37 } 38 39 @Override 40 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 41 logger.warning("Unexpected exception from downstream : "); 42 ctx.close(); 43 } 44 45 }
2.使用protobuf作為編解碼技術(包括半包處理及序列化),心跳機制動態將客戶端關閉
https://github.com/carsonWuu/Netty/tree/master/src/com/rs/test/protobuf
五、相關技術學習
1、Netty斷線重連解決方案(client端):①使用心跳機制去檢測②啟動時連接重試③運行中連接斷開時重試