EventLoopGroup 與Reactor:
前面的章節中我們已經知道了,一個Netty 程序啟動時,至少要指定一個EventLoopGroup(如果使用到的是NIO,通常是指NioEventLoopGroup),那么,這個NioEventLoopGroup 在Netty 中到底扮演着什么角色呢? 我們知道,Netty是Reactor 模型的一個實現,我們就從Reactor 的線程模型開始。
淺談Reactor 線程模型:
Reactor 的線程模型有三種:單線程模型、多線程模型、主從多線程模型。首先來看一下單線程模型,如下圖所示:
所謂單線程, 即Acceptor 處理和andler 處理都在同一個線程中處理。這個模型的壞處顯而易見:當其中某個Handler阻塞時, 會導致其他所有的Client 的Handler 都得不到執行,並且更嚴重的是,Handler 的阻塞也會導致整個服務不能接收新的Client 請求(因為Acceptor 也被阻塞了)。因為有這么多的缺陷,因此單線程Reactor 模型應用場景比較少。
那么,什么是多線程模型呢? Reactor 的多線程模型與單線程模型的區別就是Acceptor 是一個單獨的線程處理,並且有一組特定的NIO 線程來負責各個客戶端連接的IO 操作。Reactor 多線程模型如下圖所示:
Reactor 多線程模型有如下特點:
- 有專門一個線程,即Acceptor 線程用於監聽客戶端的TCP 連接請求。
- 客戶端連接的IO 操作都由一個特定的NIO 線程池負責.每個客戶端連接都與一個特定的NIO 線程綁定,因此在這個客戶端連接中的所有IO 操作都是在同一個線程中完成的。
- 客戶端連接有很多,但是NIO 線程數是比較少的,因此一個NIO 線程可以同時綁定到多個客戶端連接中。
接下來我們再來看一下Reactor 的主從多線程模型。一般情況下, Reactor 的多線程模式已經可以很好的工作了,但是我們想象一個這樣的場景:如果我們的服務器需要同時處理大量的客戶端連接請求或我們需要在客戶端連接時,進行一些權限的校驗,那么單線程的Acceptor 很有可能就處理不過來,造成了大量的客戶端不能連接到服務器。Reactor 的主從多線程模型就是在這樣的情況下提出來的,它的特點是:服務器端接收客戶端的連接請求不再是一個線程,而是由一個獨立的線程池組成。其線程模型如下圖所示:
可以看到,Reactor 的主從多線程模型和Reactor 多線程模型很類似,只不過Reactor 的主從多線程模型的Acceptor使用了線程池來處理大量的客戶端請求。
EventLoopGroup 與Reactor 關聯:
我們介紹了三種Reactor 的線程模型, 那么它們和NioEventLoopGroup 又有什么關系呢? 其實, 不同的設置NioEventLoopGroup 的方式就對應了不同的Reactor 的線程模型。
1、單線程模型,來看下面的應用代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup);
注意,我們實例化了一個NioEventLoopGroup,然后接着我們調用server.group(bossGroup)設置了服務器端的EventLoopGroup。有人可能會有疑惑;我記得在啟動服務器端的Netty 程序時, 需要設置bossGroup 和workerGroup,為何這里只設置一個bossGroup?其實原因很簡單,ServerBootstrap 重寫了group 方法:
public ServerBootstrap group(EventLoopGroup group) { return group(group, group); }
因此當傳入一個group 時,那么bossGroup 和workerGroup 就是同一個NioEventLoopGroup 了。這時,因為bossGroup 和workerGroup 就是同一個NioEventLoopGroup,並且這個NioEventLoopGroup 線程池數量只設置了1個線程,也就是說Netty 中的Acceptor 和后續的所有客戶端連接的IO 操作都是在一個線程中處理的。那么對應到Reactor 的線程模型中,我們這樣設置NioEventLoopGroup 時,就相當於Reactor 的單線程模型。
2、多線程模型,再來看下面的應用代碼:
EventLoopGroup bossGroup = new NioEventLoopGroup(128);
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup);
從上面代碼中可以看出,我們只需要將bossGroup 的參數就設置為大於1 的數,其實就是Reactor 多線程模型。
3、主從線程模型,到這里相信大家都已經想到了, 實現主從線程模型的代碼如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup);
bossGroup 為主線程,而workerGroup 中的線程是CPU 核心數乘以2,因此對應的到Reactor 線程模型中,我們知道, 這樣設置的NioEventLoopGroup 其實就是Reactor 主從多線程模型。
EventLoopGroup 的實例化:
首先,我們先縱覽一下EventLoopGroup 的類結構圖,如下圖所示:
在前面的章節中我們已經簡單地介紹了一下NioEventLoopGroup 初始化的基本過程,這里我們再回顧一下時序圖:
基本步驟如下:
- EventLoopGroup(其實是MultithreadEventExecutorGroup)內部維護一個類為EventExecutor children 數組,其大小是nThreads,這樣就初始化了一個線程池。
- 如果我們在實例化NioEventLoopGroup 時,如果指定線程池大小,則nThreads 就是指定的值,否則是CPU核數* 2。
- 在MultithreadEventExecutorGroup 中會調用newChild()抽象方法來初始化children 數組.
- 抽象方法newChild()實際是在NioEventLoopGroup 中實現的,由它返回一個NioEventLoop 實例。
- 初始化NioEventLoop 主要屬性:
- provider:在NioEventLoopGroup 構造器中通過SelectorProvider 的provider()方法獲取SelectorProvider。
- selector:在NioEventLoop 構造器中調用selector = provider.openSelector()方法獲取Selector 對象。
任務執行者EventLoop:
NioEventLoop 繼承自SingleThreadEventLoop,而SingleThreadEventLoop 又繼承自SingleThreadEventExecutor。而SingleThreadEventExecutor 是Netty 中對本地線程的抽象,它內部有一個Thread thread 屬性,存儲了一個本地Java線程。因此我們可以簡單地認為,一個NioEventLoop 其實就是和一個特定的線程綁定,並且在其生命周期內,綁定的線程都不會再改變。
NioEventLoop 的類層次結構圖還是有些復雜的,不過我們只需要關注幾個重要點即可。首先來看NioEventLoop 的繼承鏈:NioEventLoop->SingleThreadEventLoop->SingleThreadEventExecutor->AbstractScheduledEventExecutor。在AbstractScheduledEventExecutor 中, Netty 實現了NioEventLoop 的schedule 功能,即我們可以通過調用一個NioEventLoop 實例的schedule 方法來運行一些定時任務。而在SingleThreadEventLoop 中,又實現了任務隊列的功能,通過它,我們可以調用一個NioEventLoop 實例的execute()方法來向任務隊列中添加一個task,並由NioEventLoop進行調度執行。通常來說,NioEventLoop 負責執行兩個任務:第一個任務是作為IO 線程,執行與Channel 相關的IO 操作,包括調用Selector 等待就緒的IO 事件、讀寫數據與數據的處理等;而第二個任務是作為任務隊列,執行taskQueue 中的任務,例如用戶調用eventLoop.schedule 提交的定時任務也是這個線程執行的。
NioEventLoop 的實例化過程:
之前的章節我們分析過,SingleThreadEventExecutor 啟動時會調用doStartThread()方法,然后調用executor.execute()方法,將當前線程賦值給thread。在這個線程中所做的事情主要就是調用SingleThreadEventExecutor.this.run()方法,而因為NioEventLoop 實現了這個方法,因此根據多態性,其實調用的是NioEventLoop.run()方法。
EventLoop 與Channel 的關聯:
在Netty 中, 每個Channel 都有且僅有一個EventLoop 與之關聯, 它們的關聯過程如下(紅色部分):
從上圖中我們可以看到,當調用AbstractChannel$AbstractUnsafe.register()方法后,就完成了Channel 和EventLoop的關聯。在AbstractChannel$AbstractUnsafe.register() 方法中, 會將一個EventLoop 賦值給AbstractChannel 內部的eventLoop 字段,這句代碼就是完成EventLoop 與Channel 的關聯過程。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; register0(promise); }
EventLoop 的啟動:
在前面我們已經知道了,NioEventLoop 本身就是一個SingleThreadEventExecutor,因此NioEventLoop 的啟動,其實就是NioEventLoop 所綁定的本地Java 線程的啟動。按照這個思路,我們只需要找到在哪里調用了SingleThreadEventExecutor 中thread 字段的start()方法就可以知道是在哪里啟動的這個線程了。從前面章節的分析中, 其實我們已經清楚: thread.start() 被封裝到了SingleThreadEventExecutor.startThread()方法中,來看代碼:
private void startThread() { if (STATE_UPDATER.get(this) == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { this.doStartThread(); } }
STATE_UPDATER 是SingleThreadEventExecutor 內部維護的一個屬性,它的作用是標識當前的thread 的狀態。在初始的時候,STATE_UPDATER == ST_NOT_STARTED,因此第一次調用startThread()方法時,就會進入到if 語句內,進而調用到thread.start()方法。我們發現,startThread 是在SingleThreadEventExecutor 的execute()方法中調用的。既然如此,那現在我們的工作就變為了尋找在哪里第一次調用了SingleThreadEventExecutor 的execute()方法。
如果細心的小伙伴可能已想到了, 我們在前面章節中, 我們有提到到在注冊channel 的過程中, 會在AbstractChannel$AbstractUnsafe 的register()中調用eventLoop.execute()方法,在EventLoop 中進行Channel 注冊代碼的執行,AbstractChannel$AbstractUnsafe 的register()部分代碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } }
很顯然,一路從Bootstrap 的bind()方法跟蹤到AbstractChannel$AbstractUnsafe 的register()方法,整個代碼都是在主線程中運行的,因此上面的eventLoop.inEventLoop()返回為false,於是進入到else 分支,在這個分支中調用了eventLoop.execute()方法,而NioEventLoop 沒有實現execute()方法,因此調用的是SingleThreadEventExecutor 的execute()方法:
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); if (inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if (this.isShutdown() && this.removeTask(task)) { reject(); } } if (!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } }
我們已經分析過了, inEventLoop == false , 因此執行到else 分支, 在這里就調用startThread() 方法來啟動SingleThreadEventExecutor 內部關聯的Java 本地線程了。
總結一句話:當EventLoop 的execute()第一次被調用時,就會觸發startThread()方法的調用,進而導致EventLoop所對應的Java 本地線程啟動。