一、什么是Netty?
Netty 是一個廣泛使用的 Java 網絡編程框架(Netty 在 2011 年獲得了Duke's Choice Award,見 https://www.java.net/dukeschoice/2011)。它活躍和成長於用戶社區,像大型公司 Facebook 和 Instagram 以及流行 開源項目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其強大的對於網絡抽象的核心代碼。
二、Netty和Tomcat有什么區別?
Netty和Tomcat最大的區別就在於通信協議,Tomcat是基於Http協議的,他的實質是一個基於http協議的web容器,但是Netty不一樣,他能通過編程自定義各種協議,因為netty能夠通過codec自己來編碼/解碼字節流,完成類似redis訪問的功能,這就是netty和tomcat最大的不同。
有人說netty的性能就一定比tomcat性能高,其實不然,tomcat從6.x開始就支持了nio模式,並且后續還有APR模式——一種通過jni調用apache網絡庫的模式,相比於舊的bio模式,並發性能得到了很大提高,特別是APR模式,而netty是否比tomcat性能更高,則要取決於netty程序作者的技術實力了。
三、為什么Netty受歡迎?
如第一部分所述,netty是一款收到大公司青睞的框架,在我看來,netty能夠受到青睞的原因有三:
- 並發高
- 傳輸快
- 封裝好
四、Netty為什么並發高

Selector
。
當一個連接建立之后,他有兩個步驟要做,第一步是接收完客戶端發過來的全部數據,第二步是服務端處理完請求業務之后返回response給客戶端。NIO和BIO的區別主要是在第一步。
在BIO中,等待客戶端發數據這個過程是阻塞的,這樣就造成了一個線程只能處理一個請求的情況,而機器能支持的最大線程數是有限的,這就是為什么BIO不能支持高並發的原因。
而NIO中,當一個Socket建立好之后,Thread並不會阻塞去接受這個Socket,而是將這個請求交給Selector,Selector會不斷的去遍歷所有的Socket,一旦有一個Socket建立完成,他會通知Thread,然后Thread處理完數據再返回給客戶端—— 這個過程是不阻塞的,這樣就能讓一個Thread處理更多的請求了。
下面兩張圖是基於BIO的處理流程和netty的處理流程,輔助你理解兩種方式的差別:

除了BIO和NIO之外,還有一些其他的IO模型,下面這張圖就表示了五種IO模型的處理流程:

- BIO,同步阻塞IO,阻塞整個步驟,如果連接少,他的延遲是最低的,因為一個線程只處理一個連接,適用於少連接且延遲低的場景,比如說數據庫連接。
- NIO,同步非阻塞IO,阻塞業務處理但不阻塞數據接收,適用於高並發且處理簡單的場景,比如聊天軟件。
- 多路復用IO,他的兩個步驟處理是分開的,也就是說,一個連接可能他的數據接收是線程a完成的,數據處理是線程b完成的,他比BIO能處理更多請求。
- 信號驅動IO,這種IO模型主要用在嵌入式開發,不參與討論。
- 異步IO,他的數據請求和數據處理都是異步的,數據請求一次返回一次,適用於長連接的業務場景。
五、Netty為什么傳輸快
Netty針對這種情況,使用了NIO中的另一大特性——零拷貝,當他需要接收數據的時候,他會在堆內存之外開辟一塊內存,數據就直接從IO讀到了那塊內存中去,在netty里面通過ByteBuf可以直接對這些數據進行直接操作,從而加快了傳輸速度。

六、為什么說Netty封裝好?
要說Netty為什么封裝好,這種用文字是說不清的,直接上代碼:
- 阻塞I/O
public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); //1 try { for (;;) { final Socket clientSocket = socket.accept(); //2 System.out.println("Accepted connection from " + clientSocket); new Thread(new Runnable() { //3 @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4 out.flush(); clientSocket.close(); //5 } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); //6 } } catch (IOException e) { e.printStackTrace(); } } }
- 非阻塞IO
public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); //1 Selector selector = Selector.open(); //2 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3 final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (;;) { try { selector.select(); //4 } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); //5 Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { //6 ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //7 System.out.println( "Accepted connection from " + client); } if (key.isWritable()) { //8 SocketChannel client = (SocketChannel)key.channel(); ByteBuffer buffer = (ByteBuffer)key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { //9 break; } } client.close(); //10 } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // 在關閉時忽略 } } } } } }
- Netty
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //1 b.group(group) //2 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() {//3 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5 } }); } }); ChannelFuture f = b.bind().sync(); //6 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //7 } } }
從代碼量上來看,Netty就已經秒殺傳統Socket編程了,但是這一部分博大精深,僅僅貼幾個代碼豈能說明問題,在這里給大家介紹一下Netty的一些重要概念,讓大家更理解Netty。
-
Channel
數據傳輸流,與channel相關的概念有以下四個,上一張圖讓你了解netty里面的Channel。
Channel一覽- Channel,表示一個連接,可以理解為每一個請求,就是一個Channel。
- ChannelHandler,核心處理業務就在這里,用於處理業務請求。
- ChannelHandlerContext,用於傳輸業務數據。
- ChannelPipeline,用於保存處理過程需要用到的ChannelHandler和ChannelHandlerContext。
- ByteBuf
ByteBuf是一個存儲字節的容器,最大特點就是使用方便,它既有自己的讀索引和寫索引,方便你對整段字節緩存進行讀寫,也支持get/set,方便你對其中每一個字節進行讀寫,他的數據結構如下圖所示:

他有三種使用模式:
- Heap Buffer 堆緩沖區
堆緩沖區是ByteBuf最常用的模式,他將數據存儲在堆空間。 - Direct Buffer 直接緩沖區
直接緩沖區是ByteBuf的另外一種常用模式,他的內存分配都不發生在堆,jdk1.4引入的nio的ByteBuffer類允許jvm通過本地方法調用分配內存,這樣做有兩個好處- 通過免去中間交換的內存拷貝, 提升IO處理速度; 直接緩沖區的內容可以駐留在垃圾回收掃描的堆區以外。
- DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的內存, GC對此”無能為力”,也就意味着規避了在高負載下頻繁的GC過程對應用線程的中斷影響.
- Composite Buffer 復合緩沖區
復合緩沖區相當於多個不同ByteBuf的視圖,這是netty提供的,jdk不提供這樣的功能。
除此之外,他還提供一大堆api方便你使用,在這里我就不一一列出了,具體參見ByteBuf字節緩存。
- Codec
Netty中的編碼/解碼器,通過他你能完成字節與pojo、pojo與pojo的相互轉換,從而達到自定義協議的目的。
在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。
七、實戰
1、引入相關依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.pubing</groupId> <artifactId>helloNetty</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </project>
2、服務端
package com.zhouzhiyao.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; //實現客戶端發送請求,服務器端會返回Hello Netty public class HelloNettyServer { public static void main(String[] args) throws InterruptedException { /** * 定義一對線程組(兩個線程池) * */ //主線程組,用於接收客戶端的鏈接,但不做任何處理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //定義從線程組,主線程組會把任務轉給從線程組進行處理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * 服務啟動類,任務分配自動處理 * */ ServerBootstrap serverBootstrap = new ServerBootstrap(); //需要去針對一個之前的線程模型(上面定義的是主從線程) serverBootstrap.group(bossGroup, workerGroup) //設置NIO的雙向通道 .channel(NioServerSocketChannel.class) //子處理器,用於處理workerGroup /** * 設置chanel初始化器 * 每一個chanel由多個handler共同組成管道(pipeline) */ .childHandler(new HelloNettyServerInitializer()); /** * 啟動 * */ //綁定端口,並設置為同步方式,是一個異步的chanel ChannelFuture future = serverBootstrap.bind(8888).sync(); /** * 關閉 */ //獲取某個客戶端所對應的chanel,關閉並設置同步方式 future.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); }finally { //使用一種優雅的方式進行關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3、建立初始化器HelloNettyServerInitializer
package com.zhouzhiyao.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; /** * 這是一個初始化器,chanel注冊后會執行里面相應的初始化方法(也就是將handler逐個進行添加) * * @author phubing * */ public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{ //對chanel進行初始化 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //通過socketChannel去獲得對應的管道 ChannelPipeline channelPipeline = socketChannel.pipeline(); /** * pipeline中會有很多handler類(也稱之攔截器類) * 獲得pipeline之后,可以直接.add,添加不管是自己開發的handler還是netty提供的handler * */ //一般來講添加到last即可,添加了一個handler並且取名為HttpServerCodec //當請求到達服務端,要做解碼,響應到客戶端做編碼 channelPipeline.addLast("HttpServerCodec", new HttpServerCodec()); //添加自定義的CustomHandler這個handler,返回Hello Netty channelPipeline.addLast("customHandler", new CustomHandler()); } }
4、建立自定義處理器CustomHandler
package com.zhouzhiyao.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; /** * 創建自定義助手類 * @author phubing * */ //處理的請求是:客戶端向服務端發起送數據,先把數據放在緩沖區,服務器端再從緩沖區讀取,類似於[ 入棧, 入境 ] public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http請求,所以使用HttpObject @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注冊"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注冊"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel活躍狀態"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端與服務端斷開連接之后"); super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel讀取數據完畢"); super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("用戶事件觸發"); super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channel可寫事件更改"); super.channelWritabilityChanged(ctx); } @Override //channel發生異常,若不關閉,隨着異常channel的逐漸增多,性能也就隨之下降 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("捕獲channel異常"); super.exceptionCaught(ctx, cause); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("助手類添加"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("助手類移除"); super.handlerRemoved(ctx); } /** * ChannelHandlerContext:上下文對象 * * */ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //獲取當前channel Channel currentChannel = ctx.channel(); //判斷msg是否為一個HttpRequest的請求類型 if(msg instanceof HttpRequest) { //客戶端遠程地址 System.out.println(currentChannel.remoteAddress()); /** * * 未加判斷類型,控制台打印的遠端地址如下: * /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5503 /0:0:0:0:0:0:0:1:5503 * * 原因是接收的MSG沒有進行類型判斷 * * * 增加了判斷,為何還會打印兩次? * /0:0:0:0:0:0:0:1:5605 /0:0:0:0:0:0:0:1:5605 * * 打開瀏覽器的network會發現,客戶端對服務端進行了兩次請求: * 1、第一次是所需的 * 2、第二次是一個icon * 因為沒有加路由(相當於Springmvc中的requestMapping),只要發起請求,就都會到handler中去 * */ /** * 在Linux中也可以通過CURL 本機Ip:端口號 發送請求(只打印一次,干凈的請求) */ //定義發送的消息(不是直接發送,而是要把數據拷貝到緩沖區,通過緩沖區) //Unpooed:是一個專門用於拷貝Buffer的深拷貝,可以有一個或多個 //CharsetUtil.UTF_8:Netty提供 ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8); //構建一個HttpResponse,響應客戶端 FullHttpResponse response = /** * params1:針對Http的版本號 * params2:狀態(響應成功或失敗) * params3:內容 */ //HttpVersion.HTTP_1_1:默認開啟keep-alive new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); //設置當前內容長度、類型等 response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); //readableBytes:可讀長度 response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //通過長下文對象,把響應刷到客戶端 ctx.writeAndFlush(response); } } }
5、客戶端
public class NettyClient extends SimpleChannelInboundHandler<Response> { private final String ip; private final int port; private Response response; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 創建並初始化 Netty 客戶端 Bootstrap 對象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Response.class)); pipeline.addLast(new RpcEncoder(Request.class)); pipeline.addLast(NettyClient.this); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); // String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":"); // 連接 RPC 服務器 ChannelFuture future = bootstrap.connect(ip, port).sync(); // 寫入 RPC 請求數據並關閉連接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } }
6、心跳機制
服務端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;
設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,如果五秒內ChannelRead()方法未被調用則觸發一次userEventTrigger()方法
ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new HeartBeatServerHandler()); } });
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現其userEventTriggered()方法,在出現超時事件時會被觸發,包括讀空閑超時或者寫空閑超時;
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { private int lossConnectCount = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("已經5秒未收到客戶端的消息了!"); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.READER_IDLE){ lossConnectCount++; if (lossConnectCount>2){ System.out.println("關閉這個不活躍通道!"); ctx.channel().close(); } } }else { super.userEventTriggered(ctx,evt); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lossConnectCount = 0; System.out.println("client says: "+msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客戶端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;
設定IdleStateHandler心跳檢測每四秒進行一次寫檢測,如果四秒內write()方法未被調用則觸發一次userEventTrigger()方法,實現客戶端每四秒向服務端發送一次消息;
Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new HeartBeatClientHandler()); } });
- 自定義處理類Handler繼承ChannlInboundHandlerAdapter,實現自定義userEventTrigger()方法,如果出現超時時間就會被觸發,包括讀空閑超時或者寫空閑超時;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("客戶端循環心跳監測發送: "+new Date()); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.WRITER_IDLE){ if (curTime<beatTime){ curTime++; ctx.writeAndFlush("biubiu"); } } } }
7、客戶端重連
/** * 連接服務端 and 重連 */ protected void doConnect() { if (channel != null && channel.isActive()){ return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081); //實現監聽通道連接的方法 connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ channel = channelFuture.channel(); System.out.println("連接服務端成功"); }else{ System.out.println("每隔2s重連...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } },2,TimeUnit.SECONDS); } } }); }