Netty框架的主要線程就是I/O線程,線程模型設計的好壞,決定了系統的吞吐量、並發性和安全性等架構質量屬性。Netty的線程模型被精心地設計,既提升了框架的並發性能,又能在很大程度避免鎖,局部實現了無鎖化設計。
線程模型
一般首先會想到的是經典的Reactor線程模型,盡管不同的NIO框架對於Reactor模式的實現存在差異,但本質上還是遵循了Reactor的基礎線程模型。
Reactor單線程模型
Reactor單線程模型,是指所有的I/O操作都在同一個NIO線程上面完成。
NIO線程的職責如下:
- 作為NIO服務端,接收客戶端的TCP連接;
- 作為NIO客戶端,向服務端發起TCP連接;
- 讀取通信對端的請求或者應答消息;
- 向通信對端發送消息請求或者應答消息。
由於Reactor模式使用的是異步非阻塞I/O,所有的I/O操作都不會導致阻塞,理論上一個線程可以獨立處理所有I/O相關的操作。從架構層面看,一個NIO線程確實可以完成其承擔的職責。例如,通過Acceptor類接收客戶端的TCP連接請求消息,當鏈路建立成功之后,通過Dispatch將對應的ByteBuffer派發到指定的Handler上,進行消息解碼。用戶線程消息編碼后通過NIO線程將消息發送給客戶端。
在一些小容量應用場景下,可以使用單線程模型。但是這對於高負載、大並發的應用場景卻不合適,主要原因如下:
- 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送。
- 當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸。
- 可靠性問題:一旦NIO線程意外跑飛,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障。
Reactor多線程模型
Rector多線程模型與單線程模型最大的區別就是有一組NIO線程來處理I/O操作。
Reactor多線程模型的特點如下:
- 有專門一個NIO線程——Acceptor線程用於監聽服務端,接收客戶端的TCP連接請求。
- 網絡I/O操作——讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送。
- 一個NIO線程可以同時處理N條鏈路,但是一個鏈路只對應一個NIO線程,防止發生並發操作問題。
在絕大多數場景下,Reactor多線程模型可以滿足性能需求。但是,在個別特殊場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如並發百萬客戶端連接,或者服務端需要對客戶端握手進行安全認證,但是認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足的問題,為了解決性能問題,產生了第三種Reactor線程模型——主從Reactor多線程模型。
主從Reactor多線程模型
主從Reactor線程模型的特點是:服務端用於接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求並處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池僅僅用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的I/O線程上,由I/O線程負責后續的I/O操作。
利用主從NIO線程模型,可以解決一個服務端監聽線程無法有效處理所有客戶端連接的性能不足問題。因此,在Netty的官方demo中,推薦使用該線程模型。
Netty的線程模型
Netty的線程模型並不是一成不變的,它實際取決於用戶的啟動參數配置。通過設置不同的啟動參數,Netty可以同時支持Reactor單線程模型、多線程模型和主從Reactor多線層模型。
下面讓我們通過一張原理圖(圖18-4)來快速了解Netty的線程模型。
可以通過Netty服務端啟動代碼來了解它的線程模型:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch)throws IOException{ ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler",new HeartBeatRespHandler()); } }); // 綁定端口,同步等待成功 b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
服務端啟動的時候,創建了兩個NioEventLoopGroup,它們實際是兩個獨立的Reactor線程池。一個用於接收客戶端的TCP連接,另一個用於處理I/O相關的讀寫操作,或者執行系統Task、定時任務Task等。
Netty用於接收客戶端請求的線程池職責如下。
(1)接收客戶端TCP連接,初始化Channel參數;
(2)將鏈路狀態變更事件通知給ChannelPipeline。
Netty處理I/O操作的Reactor線程池職責如下。
(1)異步讀取通信對端的數據報,發送讀事件到ChannelPipeline;
(2)異步發送消息到通信對端,調用ChannelPipeline的消息發送接口;
(3)執行系統調用Task;
(4)執行定時任務Task,例如鏈路空閑狀態監測定時任務。
通過調整線程池的線程個數、是否共享線程池等方式,Netty的Reactor線程模型可以在單線程、多線程和主從多線程間切換,這種靈活的配置方式可以最大程度地滿足不同用戶的個性化定制。
為了盡可能地提升性能,Netty在很多地方進行了無鎖化的設計,例如在I/O線程內部進行串行操作,避免多線程競爭導致的性能下降問題。表面上看,串行化設計似乎CPU利用率不高,並發程度不夠。但是,通過調整NIO線程池的線程參數,可以同時啟動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列—多個工作線程的模型性能更優。
它的設計原理如圖:
Netty的NioEventLoop讀取到消息之后,直接調用ChannelPipeline的fireChannelRead (Object msg)。只要用戶不主動切換線程,一直都是由NioEventLoop調用用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程操作導致的鎖的競爭,從性能角度看是最優的。
最佳實踐
Netty的多線程編程最佳實踐如下。
(1)創建兩個NioEventLoopGroup,用於邏輯隔離NIO Acceptor和NIO I/O線程。
(2)盡量不要在ChannelHandler中啟動用戶線程(解碼后用於將POJO消息派發到后端業務線程的除外)。
(3)解碼要放在NIO線程調用的解碼Handler中進行,不要切換到用戶線程中完成消息的解碼。
(4)如果業務邏輯操作非常簡單,沒有復雜的業務邏輯計算,沒有可能會導致線程被阻塞的磁盤操作、數據庫操作、網路操作等,可以直接在NIO線程上完成業務邏輯編排,不需要切換到用戶線程。
(5)如果業務邏輯處理復雜,不要在NIO線程上完成,建議將解碼后的POJO消息封裝成Task,派發到業務線程池中由業務線程執行,以保證NIO線程盡快被釋放,處理其他的I/O操作。
NioEventLoop
Netty的NioEventLoop並不是一個純粹的I/O線程,它除了負責I/O的讀寫之外,還兼顧處理以下兩類任務:
- 系統Task:通過調用NioEventLoop的execute(Runnable task)方法實現,Netty有很多系統Task,創建它們的主要原因是:當I/O線程和用戶線程同時操作網絡資源時,為了防止並發操作導致的鎖競爭,將用戶線程的操作封裝成Task放入消息隊列中,由I/O線程負責執行,這樣就實現了局部無鎖化。
- 定時任務:通過調用NioEventLoop的schedule(Runnable command, long delay, TimeUnit unit)方法實現。
正是因為NioEventLoop具備多種職責,所以它的實現比較特殊,它並不是個簡單的Runnable。
它實現了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是因為這種設計,導致NioEventLoop和其父類功能實現非常復雜。
NioEventLoop源碼分析
Selector的初始化
作為NIO框架的Reactor線程,NioEventLoop需要處理網絡I/O讀寫事件,因此它必須聚合一個多路復用器對象。
Selector的初始化非常簡單,直接調用Selector.open()方法就能創建並打開一個新的Selector。Netty對Selector的selectedKeys進行了優化,用戶可以通過io.netty.noKeySetOptimization開關決定是否啟用該優化項。默認不打開selectedKeys優化功能。
Selector selector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) { super(parent, executor, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { //通過provider.openSelector()創建並打開多路復用器 selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //如果沒有開啟selectedKeys優化開關,就立即返回。 if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } //如果開啟了優化開關,需要通過反射的方式從Selector實例中獲取selectedKeys和publicSelectedKeys, //將上述兩個成員變量設置為可寫,通過反射的方式使用Netty構造的selectedKeys包裝類selectedKeySet將原JDK的selectedKeys替換掉。 try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader()); // Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
run方法的實現
@Override protected void run() { //所有的邏輯操作都在for循環體內進行,只有當NioEventLoop接收到退出指令的時候,才退出循環,否則一直執行下去,這也是通用的NIO線程實現方式。 for (;;) { //首先需要將wakenUp還原為false,並將之前的wakeup狀態保存到oldWakenUp變量中。 oldWakenUp = wakenUp.getAndSet(false); try { //通過hasTasks()方法判斷當前的消息隊列中是否有消息尚未處理 if (hasTasks()) { //如果有則調用selectNow()方法立即進行一次select操作,看是否有准備就緒的Channel需要處理。 //Selector的selectNow()方法會立即觸發Selector的選擇操作,如果有准備就緒的Channel,則返回就緒Channel的集合,否則返回0。 //選擇完成之后,再次判斷用戶是否調用了Selector的wakeup方法,如果調用,則執行selector.wakeup()操作。 selectNow(); } else { //執行select()方法,由Selector多路復用器輪詢,看是否有准備就緒的Channel。 //取當前系統的納秒時間,調用delayNanos()方法計算獲得NioEventLoop中定時任務的觸發時間。 //計算下一個將要觸發的定時任務的剩余超時時間,將它轉換成毫秒,為超時時間增加0.5毫秒的調整值。 //對剩余的超時時間進行判斷,如果需要立即執行或者已經超時,則調用selector.selectNow()進行輪詢操作,將selectCnt設置為1,並退出當前循環。 //將定時任務剩余的超時時間作為參數進行select操作,每完成一次select操作,對select計數器selectCnt加1。 //Select操作完成之后,需要對結果進行判斷,如果存在下列任意一種情況,則退出當前循環: //if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {break} //1.有Channel處於就緒狀態,selectedKeys不為0,說明有讀寫事件需要處理; //2.oldWakenUp為true; //3.系統或者用戶調用了wakeup操作,喚醒當前的多路復用器; //4.消息隊列中有新的任務需要處理。 //如果本次Selector的輪詢結果為空,也沒有wakeup操作或是新的消息需要處理,則說明是個空輪詢. //有可能觸發了JDK的epoll bug,它會導致Selector的空輪詢,使I/O線程一直處於100%狀態。 //截止到當前最新的JDK7版本,該bug仍然沒有被完全修復。所以Netty需要對該bug進行規避和修正。 //該Bug的修復策略如下: //(1)對Selector的select操作周期進行統計; //(2)每完成一次空的select操作進行一次計數; //(3)在某個周期(例如100ms)內如果連續發生N次空輪詢,說明觸發了JDK NIO的epoll()死循環bug。 //監測到Selector處於死循環后,需要通過重建Selector的方式讓系統恢復正常,重建步驟如下: //(1)首先通過inEventLoop()方法判斷是否是其他線程發起的rebuildSelector, //如果由其他線程發起,為了避免多線程並發操作Selector和其他資源,需要將rebuildSelector封裝成Task, //放到NioEventLoop的消息隊列中,由NioEventLoop線程負責調用,這樣就避免了多線程並發操作導致的線程安全問題。 //(2)調用openSelector方法創建並打開新的Selector //(3)通過循環,將原Selector上注冊的SocketChannel從舊的Selector上去注冊,重新注冊到新的Selector上,並將老的Selector關閉。 //過銷毀舊的、有問題的多路復用器,使用新建的Selector,就可以解決空輪詢Selector導致的I/O線程CPU占用100%的問題。 select(); //判斷用戶是否調用了Selector的wakeup方法 if (wakenUp.get()) { //如果調用,則執行selector.wakeup()操作。 selector.wakeup(); } } cancelledKeys = 0; final long ioStartTime = System.nanoTime(); needsToSelectAgain = false; //如果輪詢到了處於就緒狀態的SocketChannel,則需要處理網絡I/O事件 if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { //由於默認未開啟selectedKeys優化功能,所以會進入processSelectedKeysPlain分支執行。 //1.對SelectionKey進行保護性判斷,如果為空則返回。 //2.獲取SelectionKey的迭代器進行循環操作,通過迭代器獲取SelectionKey和SocketChannel的附件對象, //3.將已選擇的選擇鍵從迭代器中刪除,防止下次被重復選擇和處理 //4.對SocketChannel的附件類型進行判讀, //如果是AbstractNioChannel類型,說明它是NioServerSocketChannel或者NioSocketChannel,需要進行I/O讀寫相關的操作 //步驟如下: //--首先從NioServerSocketChannel或者NioSocketChannel中獲取其內部類Unsafe,判斷當前選擇鍵是否可用, //--如果不可用,則調用Unsafe的close方法,釋放連接資源。 //--如果選擇鍵可用,則繼續對網絡操作位進行判斷,如下: //----如果是讀或者連接操作,則調用Unsafe的read方法。此處Unsafe的實現是個多態 //對於NioServerSocketChannel,它的讀操作就是接收客戶端的TCP連接。 //對於NioSocketChannel,它的讀操作就是從SocketChannel中讀取ByteBuffer。 //----如果網絡操作位為寫,則說明有半包消息尚未發送完成,需要繼續調用flush方法進行發送 //----如果網絡操作位為連接狀態,則需要對連接結果進行判讀,在進行finishConnect判斷之前,需要將網絡操作位進行修改,注銷掉SelectionKey.OP_CONNECT。 //如果它是NioTask,則對其進行類型轉換,調用processSelectedKey進行處理。由於Netty自身沒實現NioTask接口,所以通常情況下系統不會執行該分支,除非用戶自行注冊該Task到多路復用器。 processSelectedKeysPlain(selector.selectedKeys()); } //由於NioEventLoop需要同時處理I/O事件和非I/O任務,為了保證兩者都能得到足夠的CPU時間被執行,Netty提供了I/O比例供用戶定制。 //如果I/O操作多於定時任務和Task,則可以將I/O比例調大,反之則調小,默認值為50%。 //Task的執行時間根據本次I/O操作的執行時間計算得來。 final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio;//50% //處理完I/O事件之后,NioEventLoop需要執行非I/O操作的系統Task和定時任務 //首先從定時任務消息隊列中彈出消息進行處理,如果消息隊列為空,則退出循環。 //根據當前的時間戳進行判斷,如果該定時任務已經或者正處於超時狀態,則將其加入到執行Task Queue中,同時從延時隊列中刪除。 //定時任務如果沒有超時,說明本輪循環不需要處理,直接退出即可。 //執行Task Queue中原有的任務和從延時隊列中復制的已經超時或者正處於超時狀態的定時任務 //由於獲取系統納秒時間是個耗時的操作,每次循環都獲取當前系統納秒時間進行超時判斷會降低性能。 //為了提升性能,每執行60次循環判斷一次,如果當前系統時間已經到了分配給非I/O操作的超時時間,則退出循環。 //這是為了防止由於非I/O任務過多導致I/O操作被長時間阻塞。 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); //判斷系統是否進入優雅停機狀態,如果處於關閉狀態。 if (isShuttingDown()) { //調用closeAll方法,釋放資源。遍歷獲取所有的Channel,調用它的Unsafe.close()方法關閉所有鏈路,釋放線程池、ChannelPipeline和ChannelHandler等資源。 closeAll(); //並讓NioEventLoop線程退出循環,結束運行。 if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }