一、EventLoop、EventLoopGroup概述
由下圖所示,NioEventLop是EventLoop的一個具體實現,EventLoop是EventLoopGroup的一個屬性,NioEventLoopGroup是EventLoopGroup的具體實現,都是基於ExecutorService進行的線程池管理,因此EventLoop、EventLoopGroup組件的核心作用就是進行Selector的維護以及線程池的維護。
其中EventLoop進行的是Selector的維護,如下圖左;EventLoopGroup用於線程組維護,並發控制,任務處理,如下圖右。
關於EventLoop以及EventLoopGroup的映射關系為:
- 一個EventLoopGroup 包含一個或者多個EventLoop;
- 一個EventLoop 在它的生命周期內只和一個Thread 綁定;
- 所有由EventLoop 處理的I/O 事件都將在它專有的Thread 上被處理;
- 一個Channel 在它的生命周期內只注冊於一個EventLoop;
- 一個EventLoop 可能會被分配給一個或多個Channel。
Channel 為Netty 網絡操作抽象類,EventLoop 主要是為Channel 處理 I/O 操作,兩者配合參與 I/O 操作。當一個連接到達時,Netty 就會注冊一個 Channel,然后從 EventLoopGroup 中分配一個 EventLoop 綁定到這個Channel上,在該Channel的整個生命周期中都是有這個綁定的 EventLoop 來服務的。
二、NioEventLoopGroup實現
NioEventLoopGroup對象可以理解為一個線程池,內部維護了一組線程,每個線程負責處理多個Channel上的事件,而一個Channel只對應於一個線程,這樣可以回避多線程下的數據同步問題。如下代碼
// 服務器端應用程序使用兩個NioEventLoopGroup創建兩個EventLoop的組,EventLoop這個相當於一個處理線程,是Netty接收請求和處理IO請求的線程。 // 主線程組, 用於接受客戶端的連接,但是不做任何處理,跟老板一樣,不做事 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 從線程組, 當boss接受連接並注冊被接受的連接到worker時,處理被接受連接的流量。 EventLoopGroup workerGroup = new NioEventLoopGroup();
其職責如下
:
- 作為服務端
Acceptor 線程
,負責處理客戶端的請求接入。 - 作為客戶端
Connector 線程
,負責注冊監聽連接操作位,用於判斷異步連接結果。 - 作為
IO 線程
,監聽網絡讀操作位,負責從 SocketChannel 中讀取報文。 - 作為
IO
線程,負責向 SocketChannel 寫入報文發送給對方,如果發生寫半包,會自動注冊監聽寫事件,用 於后續繼續發送半包數據,直到數據全部發送完成。 - 作為
定時任務線程
,可以執行定時任務,例如鏈路空閑檢測和發送心跳消息等。 - 作為線程執行器可以執行普通的任務線程(Runnable)。
上面的代碼 創建bossGroup及workerGroup時,使用了NioEventLoopGroup的無參構造方法,因此我們查看下NioEventLoopGroup的具體實現:
/** * 1、首先我們看看NioEventLoopGroup的無參構造方法: * 作用:線程數為0 */ public NioEventLoopGroup() { this(0); } /** * 2、繼續調用構造函數。 * 作用:指定線程為0,且Executor為null */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } /** * 3、繼續調用構造函數 * 作用:此構造方法它會指定selector的輔助類 "SelectorProvider.provider()" */ public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } /** * 4、繼續調用構造函數 * 作用:初始化了一個默認的選擇策略工廠,用於生成select策略 */ public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } /** * 5、繼續調用構造函數 * 作用:指定拒絕策略:RejectedExecutionHandlers.reject() */ public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
經過上面一系列的構造方法調用,此時參數值對應如下:
- nThreads: 0
- executor: null
- selectorProvider: SelectorProvider.provider()
- selectStrategyFactory: DefaultSelectStrategyFactory.INSTANCE
- 以及指定了拒絕策略: RejectedExecutionHandlers.reject()
繼續分析剩余代碼:
/** * 6、從這里開始 調用父類 MultithreadEventLoopGroup 的構造函數 * 作用: 就是當指定的線程數為0時,使用默認的線程數DEFAULT_EVENT_LOOP_THREADS, * 而DEFAULT_EVENT_LOOP_THREAD是在靜態代碼塊中就被執行。 */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } /** * 6.1 我們看下靜態代碼塊 * 作用:到這一步得出關鍵的一點:`如果初始化NioEventLoopGroup未指定線程數,默認是CPU核心數*2`。 */ private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)) } /** * 7、繼續調用父類 MultithreadEventLoopGroup 構造函數 * 作用:指定了一個EventExecutor的選擇工廠DefaultEventExecutorChooserFactory, * 此工廠主要是用於選擇下一個可用的EventExecutor */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } /** * 8、繼續調用父類 MultithreadEventLoopGroup 構造函數 這里就是核心代碼 刪除部分非核心代碼 * 作用單獨分析 */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //1、 //executor校驗非空, 如果為空就創建ThreadPerTaskExecutor, 該類實現了 Executor接口 // 這個executor 是用來執行線程池中的所有的線程,也就是所有的NioEventLoop,其實從 //NioEventLoop構造器中也可以知道,NioEventLoop構造器中都傳入了executor這個參數。 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //2、 //這里的children數組, 其實就是線程池的核心實現,線程池中就是通過指定的線程數組來實現線程池; //數組中每個元素其實就是一個EventLoop,EventLoop是EventExecutor的子接口。 children = new EventExecutor[nThreads]; //for循環實例化children數組,NioEventLoop對象 for (int i = 0; i < nThreads; i++) { boolean success = false; //3、 //newChild(executor, args) 函數在NioEventLoopGroup類中實現了, // 實質就是就是存入了一個 NIOEventLoop類實例 children[i] = newChild(executor, args); success = true; } //4、實例化線程工廠執行器選擇器: 根據children獲取選擇器 chooser = chooserFactory.newChooser(children); //5、為每個EventLoop線程添加 線程終止監聽器 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //6、將children 添加到對應的set集合中去重, 表示只可讀。 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } } /** * 8.3.1 我們再來看下 newChild(executor, args) 里的方法 * 我們可以看到 返回的就是一個 NioEventLoop */ @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
總結以上:
1. NioEventLoopGroup初始化時未指定線程數,那么會使用默認線程數,即 `線程數 = CPU核心數 * 2`; 2. 每個NioEventLoopGroup對象內部都有一組可執行的`NioEventLoop數組`,其大小是 nThreads, 這樣就構成了一個線程池, `一個NIOEventLoop可以理解成就是一個線程`。 3. 所有的NIOEventLoop線程是使用相同的 executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler以及是屬於某一個 NIOEventLoopGroup的。這一點從 newChild(executor, args); 方法就可以看出:newChild()的實現是在NIOEventLoopGroup中實現的。 4. 當有IO事件來時,需要從線程池中選擇一個線程出來執行,這時候的NioEventLoop選擇策略是由GenericEventExecutorChooser實現的,並調用該類的next()方法。 5. 每個NioEventLoopGroup對象都有一個NioEventLoop選擇器與之對應,其會根據NioEventLoop的個數,動態選擇chooser(如果是2的冪次方,則按位運算,否則使用普通的輪詢)
綜上所述,得出NioEventLoopGroup主要功能就是為了創建一定數量的NioEventLoop,而真正的重點就在NioEventLoop中,它是整個netty線程執行的關鍵。