1.前言
第7節初步學習了一下Java原本的線程池是如何工作的,以及Future的為什么能夠達到其效果,這些知識對於理解本章有很大的幫助,不了解的可以先看上一節。
Netty為什么會高效?回答就是良好的線程模型,和內存管理。在Java的NIO例子中就我將客戶端的操作單獨放在一個線程中處理了,這么做的原因在於如果將客戶端連接串起來,后來的連接就要等前一個處理完,當然這並不意味着多線程比單線程有優勢,而是在於每個客戶端都需要進行讀取准備好的緩存數據,再執行一些業務邏輯。如果業務邏輯耗時很久,那么順序執行的方式沒有多線程優勢大。另一個方面目前多核CPU很常見了,多線程是個不錯的選擇。這些在第一節就說明過,也提到過NIO並不是提升了IO操作的速度,而是減少了CPU的浪費時間,這些概念不能搞混。
本節不涉及內存管理,只介紹相關的線程模型。
2.核心類
上圖就是我們需要關注的體系內容了,主要從EventExecutorGroup開始往下看,再上層的父接口是JDK提供的並發包內的內容,基礎是線程池中可以執行周期任務的線程池服務。所以從這我們可以知道Netty可以實現周期任務,比如心跳檢測。接口定義下面將逐一介紹。
2.1 EventExecutorGroup
isShuttingDown():是否正在關閉,或者是已經關閉。
shutdownGracefully():優雅停機,等待所有執行中的任務執行完成,並不再接收新的任務。
terminationFuture():返回一個該線程池管理的所有線程都terminated的時候觸發的future。
shutdown():廢棄了的關閉方法,被shutdownGracefully取代。
next():返回一個被該Group管理的EventExecutor。
iterator():所有管理的EventExecutor的迭代器。
submit():提交一個線程任務。
schedule():周期執行一個任務。
上述方法基本上是對周期線程池的一個封裝,但是擴展了EventExecuotr概念,即分了若干個小組,處理事件。另外一個比較實用的就是優雅停機了。
2.2 EventLoopGroup
EventLoopGroup中的方法很少,其主要是和channel結合了,就多了一個將channel注冊到線程池中的方法。
2.3 EventExecutor
EventExecutor繼承自EventExecutorGroup,這個之前也提到過該類,相當於Group中的一個子集。
next():就是找group中下一個子集
parent():就是所屬group
inEventLoop():當前線程是否是在該子集中
newXXX():這個是下一節內容,此處不介紹。
2.4 EventLoop
該接口就一個方法,就是parent();EventLoop和EventLoopGroup與EventExecutor和EventExecutorGroup是一組相似的概念。了解這些就可以了。
3 實現細節
EventLoop和EventLoopGroup的實現十分簡單,簡單看下就可以了,這里介紹幾個重要的實現類。
3.1 AbstractEventExecutor
該類繼承自上節說過的AbstractExecutorService,其最重要的是execute方法未實現。該類是對AbstractExecutorService的一個進一步加工,添加了group的概念,和不同的Future創建方法。這里不要被之前的Java線程池模型所干擾,其不一定是線程池。回到上一節線程池的介紹,最終的樣子都是Execute方法決定的。
3.2 AbstractScheduledEventExecutor
該類是對AbstractEventExecutor的一個進一步實現,其實現了周期任務的執行。原理是內部持有一個優先隊列ScheduledFutureTask。所有周期任務都添加到這個隊列中,也實現了取出周期任務的方法,但是該抽象類並沒有具體執行周期任務的實現。
3.3 SingleThreadEventExecutor
該類是對AbstractScheduledEventExecutor的一個實現,其基本上是我們最終的一個EventLoop的雛形了,很多不同協議的EventLoop都是基於它實現的。
雖然名字叫做單線程執行器,但是其不一定是單個線程。Executor默認使用的是ThreadPerTaskExecutor,其executor會為每一個任務創建一個線程並執行,當然你也可以傳入自己的executor。Queue使用的是LinkedBlockingQueue,無容量限制的任務隊列。其提供了添加任務到任務隊列,從任務隊列中獲取任務的方法。
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; } protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task); task = pollTaskFrom(taskQueue); if (task == null) { return true; } } }
執行過程如上:1.先獲取所有的周期任務,放入taskQueue;2.不斷的執行taskQueue中的任務;3.afterRunningAllTasks就是一個自由發揮的方法。safeExecute就是直接執行run方法。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
上面是該Executor初始化過程,run方法又是交給子類進行初始化了。
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } if (unit == null) { throw new NullPointerException("unit"); } if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: newState = ST_SHUTTING_DOWN; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == ST_NOT_STARTED) { try { doStartThread(); } catch (Throwable cause) { STATE_UPDATER.set(this, ST_TERMINATED); terminationFuture.tryFailure(cause); if (!(cause instanceof Exception)) { // Also rethrow as it may be an OOME for example PlatformDependent.throwException(cause); } return terminationFuture; } } if (wakeup) { wakeup(inEventLoop); } return terminationFuture(); }
上面是一個優雅停機的過程,改變該Executor的狀態成ST_SHUTTING_DOWN,這里要注意addTask的時候只有shutdown狀態才會拒絕,所以此時這里的邏輯還不會拒絕新任務添加,然后返回了一個terminationFuture,這里不做介紹。
3.4 SingleThreadEventLoop
此類繼承自上面講解的SingleThreadEventExecutor,這里多了一個tailTask隊列,用於每次事件循環后置任務處理,暫且不管。重要的在於很早提到了register方法,將channel注冊到線程中。
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
實際上就是生成了一個DefaultChannelPromise,將channel和線程綁定,最后都放入了unsafe對象中。
3.5 NioEventLoop
上面講了一些雜亂無章的內容,這里借助NioEventLoop好好梳理一下整個設計流程。NioEventLoop繼承自SingleThreadEventLoop,先對之前的相關研究進行總結:
1.EventExecutorGroup接口是繼承自Java的周期任務接口,是一個事件處理器組的概念,其相關方法有:
是否正在關閉;優雅停機;獲取一個事件處理器;提交一個任務;提交一個周期任務
2.EventExecutor接口是事件處理器,其繼承自EventExecutorGroup,目的並不是說每一個事件處理器都是一個事件處理器組,而是為了復用接口定義的方法。每個處理器都應該具備優雅停機,提交任務,判斷是否關閉的方法,其它方法有:
獲取處理器組;獲取下一個處理器(覆蓋了父接口的next方法);判斷是否在事件循環內;創建Promise、ProgressivePromise、SucceededFuture、FailedFuture
3.EventLoopGroup繼承自EventExecutorGroup,更新了父接口的含義,EventExecutor的定位是處理單個事件,group就是處理事件組了。EventLoop的定位是處理一個連接的生命周期過程中的周期事件,group是多個EventLoop的集合了。這里又有一個尷尬的地方,group按照定義本不需要定義其它方法,但是由於Server端的設計(之前說過服務端的channel也是一個線程),使用的是group,所以group必須承擔單個EventLoop的職責。最終添加了額外的方法:
獲取下一個EventLoop;注冊channel;
4.EventLoop,事件循環,其也是一個處理器,最終繼承自EventExecutor和EventLoopGroup,方法只有一個:
獲取父事件循環組EventLoopGroup。
上述接口光看名稱很容易陷入誤解,實際上定義是想將單個loop和group分離,但是實現上由於Server端包含一個服務端監聽連接線程,一個客戶端連接線程,其Group承擔了單個的職責,所以定義了一些本該由單個執行器處理的方法,又為了復用方法,導致loop繼承了group,這樣看起來怪怪的,接口理解起來就混亂了。結合上面的描述,再看一遍繼承圖就更清楚了:
理解了上面的設定,我們再來看看客戶端的事件處理是如何設計的,即總結上訴抽象類做了哪些事情。
1. AbstractEventExecutor:入參就一個parent,該類完成了一個基本處理:
a.將next設置成自己(上面說過繼承的group,這個操作就和group區分開了)。
b.優雅停機調用的是帶有超時的停機方案,超時為15秒
c.覆蓋了Java提供的newTask包裝成FutureTask的方法,使用了自己的PromiseTask
d.提供安全執行方法:safeExecute,直接調用的run方法
該類是最基礎的一個抽象類,基本作用就是與group在定義混亂上做了一個區分。提供了執行器與Future關聯方法和一個基本的執行任務的方法。
2.AbstractScheduledEventExecutor:入參也是一個parent,該類對AbstractEventExecutor未處理的周期任務提供了具體的完成方法:
a.提供計算當前距服務啟動的時間
b.提供存儲ScheduledFutureTask的優先隊列
c.提供了取消所有周期任務的方法
d.提供了獲取一個符合的周期任務的方法,要滿足時間,並獲取后移除
e.提供了獲取距最近一個周期任務的時間多久
f.提供了移除一個周期任務的方法
g.提供添加周期任務的方法
該類提供了周期任務執行的一些基本方法,涉及添加周期任務,移除,獲取等方法。
3.SingleThreadEventExecutor:入參包括parent,addTaskWakesUp標志,maxPendingTasks最大任務隊列數(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)參數更大的那個值),executor執行器(默認是ThreadPerTaskExecutor,每個任務創建一個線程執行),taskQueue任務隊列(默認是LinkedBlockingQueue),rejectedExecutionHandler拒絕任務的處理類(默認直接拋出RejectedExecutionException)。該類主要完成了一個單線程的EventExecutor的基本操作:
a.創建一個taskQueue
b.中斷線程
c.從任務隊列中獲取一個任務,takeTask連同周期任務也會獲取
d.添加任務到任務隊列
e.移除任務隊列中指定任務
f.運行所有任務,會先將周期任務存入taskQueue,再使用safeExecute方法執行任務
g.實現了execute方法,會添加任務到任務隊列,如果當前線程不是事件循環線程,開啟一個線程。通過的就是持有的executor來開啟的線程任務。execute方法調用了run方法,該類沒有實現run方法。任務的添加都不是通過execute直接執行了,而是走的添加任務到taskQueue,由未實現的run線程來處理這些事件。
h.優雅停機
這樣就有了一個基礎的單線程模型了,開啟線程,保存,取出任務的方法都有了,只有在開啟線程中執行任務的run()方法還未實現。
4.SingleThreadEventLoop:入參和SingleThreadEventExecutor一致,不同的是多了一個tailTasks。該類主要是針對netty自身的事件循環的定義來實現方法了:
a.注冊channel,實際上是生成了一個DefaultChannelPromise對象,持有了channel,和運行該channel的EventExecutor,然后將該對象交給最底層的unsafe處理。
b.添加一個事件周期結束后執行的尾任務tailTasks
c.執行尾任務
d.刪除指定尾任務
該類就很簡單,沒有過多的內容,只是增加了一個每個事件周期后執行的任務而已。
回顧完了,上面4個父類構建了一個基本的帶定時任務,普通任務,事件循環后置任務的EventLoop,每個channel綁定了一個線程執行器,通過DefaultChannelPromise持有兩者,最終交給Unsafe操作。子類只需要實現run方法,處理任務隊列中的任務。下面就是重頭戲NioEventLoop這個客戶端的線程是如何設計的了:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }
調用的就是父類方法,不過在創建EventLoop的時候創建了selector,這個是NIO中也提到過的。該EventLoop是在Group中newChild創建的。
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
上面是我們需要關注的run方法,該方法被單獨的線程執行。通過一個策略來判斷執行哪個,默認的策略是任務隊列中有任務,select執行一下,返回當前的事件數,沒任務返回SelectStrategy.SELECT。簡單的說就是有任務就補充新到來的事件執行所有的任務,沒任務就執行新到的事件。先處理IO讀寫任務,再處理其他任務,ioRation設置的耗時比例是IO任務占一個執行周期的百分比,默認50,意思是IO執行了50秒,其他任務也會得到50秒的執行時間。后續操作就是獲取所有select的key,執行所有的任務了。這里就有一個判斷如果是停機狀態,就會closeAll(),之前說優雅停機的時候就是設置了一個這個標志,最后是在執行任務之后判斷。processSelectedKey環節都交給unsafe類完成了,這里就掛上了handler相關觸發,handler的執行也就說明都在該線程內了。
上面的描述雖然把整個過程都關聯上了,但是最主要的問題還是混亂的:如何做到一個channel創建一個線程的?上面只是說明了channel和EventExecutor是綁定在DefaultChannelPromise並交給了Unsafe類,並沒有看到是如何創建線程的。而且另一個問題在於,processSelectedKey是選擇了所有的key,這不是所有的channel共享了一個線程嗎?
要解決該問題要回到Bootstrap的channel建立過程:initAndRegister()方法中,通過channelFactory創建了一個channel對象,而后ChannelFuture regFuture = config().group().register(channel);主要就是觀察register方法,該類是設置的線程池NioEventLoopGroup提供的方法,其繼承的是MultithreadEventLoopGroup,是調用了next方法獲取的EventLoop,最后接上上面channel和eventloop綁定的內容。next中獲取的EventLoop早在類初始化的時候就生成了,在構造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不過是挑選了一個線程池而已,默認數量是CPU核數的2倍。這個也就是前面說的,線程數量不是越多越好。
這樣我們明白了,客戶端注冊的時候是分配了一個線程給它。客戶端並不需要多線程,但是還是繼續看后面的內容:AbstractUnsafe的register方法給出了相關解答。channel持有了該EventLoop,此時線程還是未運行狀態,只是有了這么一個對象而已。
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
這里execute方法,這個是之前我們講過的一個方法,其會判斷當前線程是否是該channel的線程,目前都沒有初始化線程肯定不是,將任務放入任務隊列,開啟一個線程(線程處於未啟動狀態才會開啟,否則不執行),這個設計使得execute的時候只會開啟一次線程,而所有的任務都會被放入任務隊列,由這個線程執行。再回到run方法,這個是channel線程執行的方法,目前每個NioEventLoop都執行了Select等方法啊,這不是處理了所有的channel的工作嗎?並沒有達到一個channel生命周期控制在一個線程中啊。
這里實際上是JAVA NIO的例子帶來的誤解,認為必須一個線程來使用select,然后遍歷事件分配線程給channel執行讀寫操作。實際上在Netty中不一樣,Netty所有線程都在執行select並獲取相關事件,但是實際上其並沒有執行所有的事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }
看這里,if (eventLoop != this || eventLoop == null) ,如果channel的eventLoop不是當前的eventLoop就不執行,這個就一小段代碼,但是就直接決定了EventLoop只對自己所綁定的channel感興趣,最終達到了只處理自己相關的任務的目的。
3.6 NioEventLoopGroup
該類沒有太多需要說明的內容,前面已經講解了很多。
1.AbstractEventExecutorGroup,實現了基本的方法,所有方法都是調用next()即挑選出一個線程執行器完成的。
2.MultithreadEventExecutorGroup,實現了一個基本的線程池,持有子線程,主要工作是初始化了子線程數組,提供了next方法。
3.MultithreadEventLoopGroup,實現了Netty的channel線程池,提供了register方法,雖然也是調用next的register方法。
4.NioEventLoopGroup,實現了創建子線程數組時newChild方法,所有的EventLoop都是這個方法創建的。
group就是兩個重點,一個next()挑選事件執行器,一個newChild()創建線程執行器對象。
4.總結
本節耗費了大量的篇幅講解了Netty的線程模型的設計思路,主要看點如下:
1.解釋了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup這四者之間的關系,和這么復雜混亂的繼承關系的原因。
2.解釋了group是如何初始化線程,並綁定channel的,next().register()
3.解釋了eventloop為什么和channel綁定了,execute()開啟線程,以及每個eventloop都在獲取IO事件,但是通過channel的eventloop是否等於當前過濾掉其它的事件,只處理自己綁定的channel事件。
由於每個線程都在獲取IO事件,所以這段邏輯變的非常復雜,這也就是我之前說的寫好很IO很困難。
最后附上一張對前面所有內容總結的一個圖,清醒一下頭腦,從復雜的代碼中脫身:
這個圖就是一個基本的執行過程圖了,可能有遺漏的地方,但是大體情況如圖所示。