一.Netty介紹
1.什么是netty
Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序,是目前最流行的 NIO 框架,Netty 在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,知名的 Elasticsearch 、Dubbo 框架內部都采用了 Netty。
2.為什么要用netty
原生 NIO 存在問題:
1.NIO 的類庫和 API 繁雜
2.需要熟悉 Java 多線程編程,因為 NIO 編程涉及到 Reactor 模式,必須對多線程和網絡編程非常熟悉, 才能編寫出高質量的 NIO 程序
3.開發工作量和難度都非常大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常 流的處理等等處理起來難度會比較大。
4.JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。直到 JDK 1.7 版本該問題仍舊存在,沒有被根本解決。
3.Netty的優點
Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了上述問題。
1.設計優雅:適用於各種傳輸類型的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴展的事件模型,可以清晰地分離關注點;高度可定制的線程模型 - 單線程,一個或多個線程池.
2.使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其他依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。
3.高性能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的內存復制。
4.安全:完整的 SSL/TLS 和 StartTLS 支持。
5.社區活躍、不斷更新:社區活躍,版本迭代周期短,發現的 Bug 可以被及時修復,同時更多的新功能會被加入
二.Reactor三種線程模型
1.現有的三種線程模型
不同的線程模式,對程序的性能有很大影響,目前存在的線程模型有:
①.傳統阻塞 I/O 服務模型
②Reactor 模式
Reactor 模式又有 3 種典型的實現
單 Reactor 單線程;
單 Reactor 多線程;
主從 Reactor 多線程
Netty 的線程模型是主要是基於主從 Reactor 多線程模型改成了主從 Reactor 多線程模型有多個 Reactor模式
2.傳統阻塞 I/O 服務模型介紹
特點:
采用阻塞IO模式獲取輸入的數據
每個連接都需要創建單獨的線程完成數據的輸入,業務處理和數據的返回
缺點:
當並發數很大,就會創建大量的線程,占用很大系統資源,在線程開銷和上下文切換上降低處理性能
當連接創建后,如果當前線程暫時沒有數據可讀,該線程會阻塞在read 操作,造成線程資源的浪費。
黃色的框表示對象, 藍色的框表示線程 白色的框表示方法(API)
3. Reactor 模式
針對傳統阻塞 I/O 服務模型的 2 個缺點,解決方案:
I/O 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象上等待,無需阻塞等待所有連接。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。
Reactor 對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher) 3. 通知者模式(notifier)
基於線程池復用線程資源模式:不必再為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理,一個線程可以處理多個連接的業務。
I/O 復用結合線程池,就是 Reactor 模式基本設計思想
Reactor 模式,通過一個或多個輸入同時傳遞給服務處理器的模式,(基於事件驅動)
服務器端程序處理傳入的多個請求,並將它們同步分派到相應的處理線程, 因此Reactor模式也叫 Dispatcher模式
Reactor 模式使用IO復用監聽事件, 收到事件后,分發給某個線程(進程), 這點就是網絡服務器高並發處理關鍵
4.單 Reactor 單線程
1.工作原理:
①Select 是前面 I/O 復用模型介紹的標准網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求
②Reactor 對象通過 Select 監控客戶端請求事件,收到事件后通過 Dispatch 進行分發
③如果是建立連接請求事件,則由 Acceptor 通過 Accept 處理連接請求,然后創建一個 Handler 對象處理連接完成后的后續業務處理
④如果不是建立連接事件,則 Reactor 會分發調用連接對應的 Handler 來響應
⑤Handler 會完成 Read→業務處理→Send 的完整業務流程
2.優點:
模型簡單,沒有多線程、進程通信、競爭的問題,全部都在一個線程中完成
3.缺點:
①性能問題,只有一個線程,無法完全發揮多核 CPU 的性能。
②可靠性問題,線程意外終止,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障
③服務器端用一個線程通過多路復用搞定所有的 IO 操作(包括連接,讀、寫等),編碼簡單,清晰明了,但是如果客戶端連接數量較多時,當對應多個讀時,還是會出現阻塞現象,當這種情況發生時將無法支撐高並發的場景。
4.應用場景:
客戶端的數量有限,業務處理非常快速(比如 Redis在業務處理的時間復雜度 O(1) 的情況)
5.單Reactor多線程
1.工作原理:
①Reactor 對象通過select 監控客戶端請求事件, 收到事件后,通過dispatch進行分發
②如果是建立連接請求, 則由Acceptor 通過accept 處理連接請求, 然后創建一個Handler對象處理完成連接后的各種事件
③如果不是連接請求,則由Reactor分發調用連接對應的handler 來處理
④handler 只負責響應事件,不做具體的業務處理, 通過read 讀取數據后,會分發給后面的worker線程池的某個線程處理業務。
⑤worker 線程池會分配獨立線程完成真正的業務,並將結果返回給handler,handler收到響應后,通過send 將結果返回給client.
2.優點:
可以充分的利用多核cpu 的處理能力
3.缺點:
多線程數據共享和訪問比較復雜,Reactor處理所有的事件的監聽和響應,在單線程運行時,在高並發場景容易出現性能瓶頸.
6.主從 Reactor 多線程
1.工作原理:
①Reactor主線程 MainReactor 對象通過select 監聽連接事件, 收到事件后,通過Acceptor 處理連接事件
②當 Acceptor 處理連接事件后,MainReactor 將連接分配給SubReactor
③subReactor 將連接加入到連接隊列進行監聽,並創建handler進行各種事件處理
④當有新事件發生時, subreactor 就會調用對應的handler處理
⑤handler 通過read 讀取數據,分發給后面的worker 線程處理
⑥worker 線程池分配獨立的worker 線程進行業務處理,並返回結果
⑦handler 收到響應的結果后,再通過send 將結果返回給client
⑧Reactor 主線程可以對應多個Reactor 子線程, 即MainRecator 可以關聯多個SubReactor
三.Netty線程模型
1.工作原理
Netty抽象出兩組線程池 BossGroup 專門負責接收客戶端的連接, WorkerGroup 專門負責網絡的讀寫
BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup
NioEventLoopGroup 相當於一個事件循環組, 這個組中含有多個事件循環 ,每一個事件循環是 NioEventLoop
NioEventLoop 表示一個不斷循環的執行處理任務的線程, 每個NioEventLoop 都有一個selector , 用於監聽綁定在該通道上的socket的網絡通訊
NioEventLoopGroup 可以有多個線程, 即可以含有多個NioEventLoop
每個Boss NioEventLoop 循環執行的步驟有3步
輪詢accept 事件
處理accept 事件 , 與client端建立連接 , 生成NioScocketChannel , 並將其注冊到某個worker NIOEventLoop 上的 selector 上
處理任務隊列的任務 , 即 runAllTasks
每個 Worker NIOEventLoop 循環執行的步驟
輪詢read, write 事件
處理i/o事件, 即read , write 事件,在對應NioScocketChannel 處理
處理任務隊列的任務 , 即 runAllTasks
每個Worker NIOEventLoop 處理業務時,會使用pipeline(管道), pipeline 中包含了boss group上NioEventLoop注冊到worker 的selector 的channel , 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的處理器
四.Netty入門
1.引入java包
(JDK 5(Netty 3.x)或 6(Netty 4.x))
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
2.hello world 編寫
入門的編寫一共需要4個類
2.1.netty server 端編寫
package com.zpb.netty.netty.helloWorld; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoop; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @dec : netty入門 * @Date: 2019/11/24 * @Auther: pengbo.zhao * @version: 1.0 * @demand: * * {@link #main(String[] args)} * */ public class NettyServer { public static void main(String[] args) throws Exception{ //1.創建BossGroup 和 WorkerGroup //1.1 創建2個線程組 //bossGroup只處理連接請求 //workerGroup 處理客戶端的業務邏輯 //2個都是無限循環 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); //2.創建服務端的啟動對象,可以為服務端啟動配置一些服務參數 ServerBootstrap bootStrap = new ServerBootstrap(); //2.1使用鏈式編程來配置服務參數 bootStrap.group(bossGroup,workerGroup) //設置2個線程組 .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作為服務器的通道 .option(ChannelOption.SO_BACKLOG,128) //設置線程等待的連接個數 .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE) //設置保持活動連接狀態 .childHandler(new ChannelInitializer<SocketChannel>() { //給PipeLine設置處理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //通過socketChannel得到pipeLine,然后向pipeLine中添加處理的handle socketChannel.pipeline().addLast(new NettyServerHandle()); } }); //給workerGroup 的EventLoop對應的管道設置處理器(可以自定義/也可使用netty的) System.err.println("server is ready......"); //啟動服務器,並綁定1個端口且同步生成一個ChannelFuture 對象 ChannelFuture channelFuture = bootStrap.bind(8888).sync(); //對關閉通道進行監聽(netty異步模型) //當通道進行關閉時,才會觸發這個關閉動作 channelFuture.channel().closeFuture().sync(); } }
2.2.netty server handler編寫
package com.zpb.netty.netty.helloWorld; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil; /** * @dec : * @Date: 2019/11/24 * @Auther: pengbo.zhao * @version: 1.0 * @demand: */ public class NettyServerHandle extends ChannelInboundHandlerAdapter { /** * 讀取數據 * * @param: 1.ChannelHandlerContext ctx:上下文對象, 含有 管道 pipeline , 通道 channel, 地址 * @param: 2. Object msg: 就是客戶端發送的數據 默認 Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.err.println("服務器讀取線程 " + Thread.currentThread().getName()); System.out.println("server ctx =" + ctx); System.out.println("看看 channel 和 pipeline 的關系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈接, 出站入站 //將 msg 轉成一個 ByteBuf,ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + channel.remoteAddress()); } /** * 讀取數據完成后 * * @param: * @return: * @auther: * @date: */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //將數據寫入到緩存,並刷新 //一般講,我們對這個發送的數據進行編碼 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵", CharsetUtil.UTF_8)); } //處理異常, 一般是需要關閉通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
2.3.netty client端編寫
package com.zpb.netty.netty.helloWorld; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @dec : * @Date: 2019/11/24 * @Auther: pengbo.zhao * @version: 1.0 * @demand: */ public class NettyClient { public static void main(String[] args) throws Exception { //1.客戶端定義一個循環事件組 EventLoopGroup group = new NioEventLoopGroup(); try { //2.創建客戶端啟動對象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) //設置線程組 .channel(NioSocketChannel.class) //設置客戶端通道實現類 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandle()); } }); System.err.println("client is ready......"); //3.啟動客戶端去連接服務端 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync(); //4.設置通道關閉監聽(當監聽到通道關閉時,關閉client) channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
2.4.netty client handler端編寫
package com.zpb.netty.netty.helloWorld; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @dec : * @Date: 2019/11/24 * @Auther: pengbo.zhao * @version: 1.0 * @demand: */ public class NettyClientHandle extends ChannelInboundHandlerAdapter{ //如果client 端服務啟動完成后 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("client "+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,netty server...",CharsetUtil.UTF_8)); } //當通道有讀事件時 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.err.println("服務器端回復消息:"+byteBuf.toString(CharsetUtil.UTF_8)); System.err.println("服務器端地址是:"+ctx.channel().remoteAddress()); } //當通道有異常時 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
五.Netty三種任務隊列的使用
當我們在處理的handle中如果出現了阻塞的情況,或者處理業務邏輯比較耗時,我們不能讓程序處於阻塞,
當有客戶端請求時,我們想讓程序定時的去執行業務邏輯,
當需要對一些用戶需要進行推送活動時,根據用戶標識,找到對應的 Channel 引用,向該用戶推送特定消息時
可以采用以下三種任務隊列:
1.提交到execute(Runnable command)中時
ctx.channel().eventLoop().execute(new Runnable() { }) 業務邏輯交給線程去處理,線程不會阻塞在這里,而是直接返回,直到有數據才返回給客戶端,如果有多個線程runnable需要處理,那么只能等上一個處理完才會處理下一個,(假如第1個任務需要10S,第2個需要20s,執行完共需30S)
2.提交到 scheduledTaskQueue中
schedule(Runnable command, long delay, TimeUnit unit) ①Runnable command:執行業務邏輯處理的線程 ② long delay:定時時長 ③TimeUnit unit:定時類型 業務邏輯交給定時線程去處理。
3.通過傳輸的內容的標識
在解碼客戶端發送的內容中,讀取到客戶端的特殊標識,利用這個標識來進行推送消息處理,這個在粘包、拆包中進行說明