1、netty 是什么
2、netty 架構設計
2.1、線程模型
2.2、傳統阻塞 I/O 服務模型
2.3、Reactor 模式
2.4、單 Reactor 單線程
2.5、單 Reactor 多線程
2.6、主從 Reactor 多線程
2.7、Netty工作原理架構圖
3、Netty 編程之 helloworld
4、自定義 ChannelInboundHandlerAdapter 收發消息
5、任務隊列 taskQueue 和 scheduledTaskQueue
6、Netty 異步模型
7、Netty 入門案例--HTTP 服務
1、netty 是什么 <--返回目錄
Netty 是一個異步事件驅動的網絡應用框架,用於快速開發可維護的高性能服務器和客戶端。
下面是我總結的使用 Netty 不使用 JDK 原生 NIO 的原因
- 使用 JDK 自帶的 NIO 需要了解太多的概念,編程復雜,一不小心 bug 橫飛
- Netty 底層 IO 模型隨意切換,而這一切只需要做微小的改動,改改參數,Netty 可以直接從 NIO 模型變身為 IO 模型
- Netty 自帶的拆包解包,異常檢測等機制讓你從 NIO 的繁重細節中脫離出來,讓你只需要關心業務邏輯
- Netty 解決了 JDK 的很多包括空輪詢在內的 bug
- Netty 底層對線程,selector 做了很多細小的優化,精心設計的 reactor 線程模型做到非常高效的並發處理
- 自帶各種協議棧讓你處理任何一種通用協議都幾乎不用親自動手
- Netty 社區活躍,遇到問題隨時郵件列表或者 issue
- Netty 已經歷各大 rpc 框架,消息中間件,分布式通信中間件線上的廣泛驗證,健壯性無比強大
2、netty 架構設計 <--返回目錄
不同的線程模式,對程序的性能有很大影響,為了搞清 netty 線程模式,我們來系統分析下各個線程模式,最后看看 netty 線程模型有什么優越性。
2.1、線程模型 <--返回目錄
目前存在的線程模型有:
- 傳統阻塞 I/O 服務模型
- Reactor 模式(反應器模式、分發者模式 Dispatcher、通知者模式 Notifier)
根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現:
- 單 Reactor 單線程
- 單 Reactor 多線程
- 主從 Reactor 多線程
netty 線程模式:netty 主要基於主從 Reactor 多線程模型做了一定的改進,其中主從 Reactor 多線程模式有多個 Reactor
2.2、傳統阻塞 I/O 服務模型 <--返回目錄
模型特點:
- 采用阻塞 IO 模式 獲取輸入的數據
- 每個連接都需要獨立的線程完成數據的輸入,業務處理,數據返回
問題分析:
- 當並發數很大,就會創建大量的線程,占用很大系統資源
- 連接創建后,如果當前線程暫時沒有數據可讀,該線程回阻塞在 read 操作,造成線程資源的浪費
2.3、Reactor 模式 <--返回目錄
針對傳統阻塞 IO 服務模型的 2 個缺點,解決方案:
- 基於 IO 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象等待,無需阻塞等待所有連接。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。
- 基於線程池復用線程資源:不必為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理,一個線程可以處理多個連接的業務。
IO 復用結合線程池,就是 Reactor 模式基本設計思想。
2.4、單 Reactor 單線程 <--返回目錄
方案說明:
- select 是前面 IO 復用模型介紹的標准網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求
- Reactor 對象通過 Select 監控客戶端請求事件,收到事件后通過 Dispatch 進行分發
- 如果是建立連接請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 handler 對象處理連接完成后的后續業務處理
- 如果不是建立連接事件,則 Reactor 會分發調用連接對應的 handler 來響應
- handler 會完成 read -> 業務處理 -> send 的完整業務流程
服務器端用一個線程通過多路復用搞定所有的 IO 操作(包括連接、讀寫等),編碼簡單,清晰明了,但是如果客戶端連接數量較多,將無法支撐。
2.5、單 Reactor 多線程 <--返回目錄
方案說明:
- Reactor 對象通過 select 監控客戶端請求事件,收到事件后,通過 dispatch 進行分發
- 如果建立連接請求,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 handler 對象處理完成連接后的各種事件
- 如果不是連接請求,則由 reactor 分發調用連接對象對應的 handler 來處理
- handler 之負責響應事件,不做具體的業務處理,通過 read 讀取數據后,會分發給后面的 worker 線程池的某個線程處理業務
- worker 線程池會分配獨立線程完成真正的業務,並將結果返回給 handler
- handler 收到響應后,通過 send 將結果返回給 cliet
缺點:reactor 處理所有的事件的監聽和響應,在單線程運行,在高並發場景容易出現性能瓶頸。
2.6、主從 Reactor 多線程 <--返回目錄
主 Reactor 負責連接事件;子 Reactor 負責監聽讀寫事件
2.7、Netty工作原理架構圖 <--返回目錄
1)Netty 抽象出兩組線程池,BossGroup 專門負責接受客戶端的連接,WorkerGroup 專門負責網絡的讀寫
2)BossGroup 和 WorkGroup 類型都是 NioEventLoopGroup
3)NioEventLoopGroup 相當於一個事件循環組,這個組含有多個事件循環,每個事件循環時 NioEventLoop
4)NioEventLoop 表示一個不斷循環的執行處理任務的線程,每個 NioEventLoop 都有一個 Selector,用於監聽綁定在其上的 socket 的網絡通訊
5)NioEventLoopGroup 可以有多個線程,即可以含有多個 NioEventLoop
6)每個 Boss NioEventLoop 循環執行的步驟
- 輪詢 accept 事件
- 處理 accept 事件,與 client 建立連接,生成 NiobiumSocketChannel,並將其注冊到 worker NioEveltLoop 上的 Selector
- 處理任務隊列的任務,即 runAllTasks
7) 每個 Worker NioEventLoop 循環執行的步驟
- 輪詢 read/write 事件
- 處理 read/write 事件,在對應 NioSocketChannel 處理
- 處理任務隊列的任務,即 runAllTasks
3、Netty 編程之 helloworld <--返回目錄
pom 引入依賴

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.22.Final</version> </dependency>
NettyServer
package com.oy; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; public class NettyServer { public static void main(String[] args) throws Exception { // 1.創建 BossGroup 和 workerGroup NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); // 2.創建服務器端的啟動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3.鏈式編程,配置參數 serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) // 設置保持獲得連接狀態 .childHandler(new ChannelInitializer<NioSocketChannel>() { // 給 WorkerGroup 的 EventLoop 對應的管道設置處理器 protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }); // 4.綁定端口,運行服務器 ChannelFuture future = serverBootstrap.bind(8000).sync(); System.out.println("server started and listen " + 8000); // 5.對關閉通道進行監聽 future.channel().closeFuture().sync(); } }
NettyClient
package com.oy; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import java.util.Date; public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("127.0.0.1", 8000).channel(); while (true) { channel.writeAndFlush(new Date() + ": hello world11111111!"); Thread.sleep(2000); } } }
4、自定義 ChannelInboundHandlerAdapter 收發消息 <--返回目錄
NettyServer
package com.oy; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; public class NettyServer { private int port; public static void main(String[] args) { new NettyServer(8080).start(); } public NettyServer(int port) { this.port = port; } public void start() { /** * 創建兩個EventLoopGroup,即兩個線程池,boss線程池用於接收客戶端的連接,一個線程監聽一個端口,一般只會監聽一個端口所以只需一個線程 * work池用於處理網絡連接數據讀寫或者后續的業務處理(可指定另外的線程處理業務,work完成數據讀寫) */ EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { /** * 實例化一個服務端啟動類, * group()指定線程組 * channel()指定用於接收客戶端連接的類,對應java.nio.ServerSocketChannel * childHandler()設置編碼解碼及處理連接的類 */ ServerBootstrap server = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //.addLast("decoder", new StringDecoder()) //.addLast("encoder", new StringEncoder()) .addLast(new NettyServerHandler()); } }); // 綁定端口 ChannelFuture future = server.bind().sync(); System.out.println("server started and listen " + port); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } public static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldServerHandler active"); } /** * 讀取客戶端發送的數據 * ChannelHandlerContext ctx: 上下文對象,含有管道 pipeline,通道 channel,連接地址 * Object msg: 客戶端發送的數據 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead..."); // 讀取客戶端發送的數據 ByteBuf buf = (ByteBuf) msg; System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8)); //System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString()); // 返回消息 //ctx.write("server write, 收到消息" + msg); //ctx.flush(); } /** * 數據讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端", CharsetUtil.UTF_8)); } /** * 處理異常,關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } } }
NettyClient
package com.oy; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class NettyClient { private static final String HOST = "127.0.0.1"; private static final int PORT= 8080; public static void main(String[] args){ new NettyClient().start(HOST, PORT); } public void start(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //.addLast("decoder", new StringDecoder()) //.addLast("encoder", new StringEncoder()) .addLast(new NettyClientHandler()); } }); ChannelFuture future = client.connect(host, port).sync(); //future.channel().writeAndFlush("Hello Netty Server ,I am a netty client"); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 通道就緒觸發該方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldClientHandler Active"); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器", CharsetUtil.UTF_8)); } /** * 當通道有讀取事件時觸發該方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服務器響應: " + buf.toString(CharsetUtil.UTF_8)); } } }
5、任務隊列 taskQueue 和 scheduledTaskQueue <--返回目錄
任務隊列中的 Task 有三種典型使用場景
- 用戶程序自定義的普通任務
- 用戶自定義定時任務
- 非當前 Reactor 線程調用 Channel 的各種方法。例如在推送系統的業務線程里,根據用戶的標識,找到對應的 Channel 引用,然后調用 Write 類方法向該用戶推送消息,就會進入到這種場景。最終的 Write 會提交到任務隊列中被異步消費
public static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldServerHandler active"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead..."); // 讀取客戶端發送的數據 ByteBuf buf = (ByteBuf) msg; System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8)); // 模擬業務處理耗時 //Thread.sleep(5 * 1000); //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端1\n", CharsetUtil.UTF_8)); // 用戶自定義的任務,任務添加到 taskQueue 中 ctx.channel().eventLoop().execute(new Runnable() { public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端1\n", CharsetUtil.UTF_8)); } catch (Exception e) { e.printStackTrace(); } } }); // 用戶自定義定義任務, 任務添加到 scheduledTaskQueue 中 ctx.channel().eventLoop().schedule(new Runnable() { public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端 shedule\n", CharsetUtil.UTF_8)); } catch (Exception e) { e.printStackTrace(); } } },5 , TimeUnit.SECONDS); } /** * 數據讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端2", CharsetUtil.UTF_8)); } /** * 處理異常,關閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
6、Netty 異步模型 <--返回目錄
異步的概念和同步相對。當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的組件在完成后,通過狀態、通知和回調來通知調用者。
Netty 中的 IO 操作時異步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture。
調用者並不能立刻獲得結果,而是通過 Future-Listener 機制,用戶可以方便的主動獲取或者通過通知機制獲得 IO 操作結果。
Netty 的異步模型是建立在 Future 和 callback 之上的,callback 就是回調。重點說 Future,它的核心思想是:假設一個方法 fun,計算過程可能非常耗時,等待 fun 返回顯然不合適,那么可以在調用 fun 的時候,立馬返回一個 Future,后續可以通過 Future 取監控方法 fun 的處理過程(即 Future-Listener 機制)。
// 綁定端口 final ChannelFuture future = server.bind(8080).sync(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isDone()) { System.out.println("監聽端口 8080 成功"); } else { System.out.println("監聽端口 8080 失敗"); } } });
7、Netty 入門案例--HTTP 服務 <--返回目錄
HttpServer

package com.oy.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class HttpServer { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap server = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) //.localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new MyChannelInitializer()); // 綁定端口 ChannelFuture future = server.bind(8080).sync(); System.out.println("server started and listen " + 8080); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
MyChannelInitializer

package com.oy.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入處理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); // HttpServerCodec: netty 提供的處理 http 的編-解碼器 pipeline.addLast("MyHttpServerCodec", new HttpServerCodec()); // 添加自定義的處理器 pipeline.addLast("MyHttpServerHandler", new MyHttpServerHandler()); } }
MyHttpServerHandler

package com.oy.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { System.out.println("==================================="); System.out.println("msg 類型: " + msg.getClass().getName()); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); // 判斷 msg 是否是 http request 請求 if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; URI uri = new URI(request.uri()); System.out.println("請求 uri: " + uri.getPath()); if ("/favicon.ico".equals(uri.getPath())) { System.out.println("請求 favicon.icon,不做響應"); return; } // 回復信息給瀏覽器 ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務器", CharsetUtil.UTF_8); // 構造一個 http 的響應,即 http response DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); ctx.writeAndFlush(response); } } }
參考:
1)netty 官網:https://netty.io/
3)掘金小冊:Netty 入門與實戰:仿寫微信 IM 即時通訊系統