NioEventLoop的啟動時機是在服務端的NioServerSocketChannel中的ServerSocketChannel初始化完成,且注冊在NioEventLoop后執行的, 下一步就是去綁定端口,但是在綁定端口前,需要完成NioEventLoop的啟動工作, 因為程序運行到這個階段為止,依然只有MainThread一條線程,下面就開始閱讀源碼看NioEventLoop如何開啟新的線程自立家門的
總想說 NioEventLoop的整體結構,像極了這個圖
該圖為,是我畫的NioEventLoop啟動的流程草圖,很糙,但是不畫它,總覺的少了點啥...
NioEventLoop的繼承體系圖
NioEventLoop
的線程開啟之路
程序的入口是AbstractBootStrap, 這個抽象的啟動輔助類, 找到它准備綁定端口的doBind0()
方法,下面是源碼:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// todo 此方法在觸發 channelRegistered() 之前調用, 給用戶一個機會,在 channelRegistered() 中設置pipeline
// todo 這是 eventLoop啟動的邏輯 , 下面的Runable就是一個 task任務, 什么任務的呢? 綁定端口
// todo 進入exeute()
System.out.println("00000");
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// todo channel綁定端口並且添加了一個listenner
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
我們關注上面的channel.execute(Runable)
方法, 如果我們直接使用鼠標點擊進去,會進入java.util.concurrent
包下的Executor
接口, 原因是因為,它是NioEventLoop繼承體系的超頂級接口,見上圖, 我們進入它的實現類,SingleThreadEventExcutor
, 也就是NioEventLoop
的間接父類, 源碼如下:
// todo eventLoop事件循環里面的task,會在本類SingleThreadEventExecutor里面: execute() 執行
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// todo 同樣判斷當前線程是不是 eventLoop里面的那條唯一的線程, 如果是的話, 就把當前任務放到任務隊列里面等着當前的線程執行
// todo ,不是的話就開啟新的線程去執行這個新的任務
// todo , eventLoop一生只會綁定一個線程,服務器啟動時只有一條主線程,一直都是在做初始化的工作,並沒有任何一次start()
// todo 所以走的是else, 在else中首先開啟新的線程,而后把任務添加進去
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// todo 開啟線程 , 進入查看
startThread();
// todo 把任務丟進隊列
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
現在執行這些代碼的線程依然是主線程,主線程手上有綁定端口任務,但是它想把這個任務提交給NioEventLoop去執行,於是它就做出下面的判斷
boolean inEventLoop = inEventLoop();
// 方法實現
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
但是發現,主線程並不是NioEventLoop唯一綁定的那個線程, 於是他就准備下面兩件事:
- 開啟激活當前NioEventLoop中的線程
- 把綁定端口的任務添加到任務隊列
開啟新線程的邏輯在下面,我刪除了一些收尾,以及判斷的代碼,保留了主要的邏輯
private void doStartThread() {
assert thread == null;
// todo 斷言線程為空, 然后才創建新的線程
executor.execute(new Runnable() { // todo 每次Execute 都是在使用 默認的線程工廠,創建一個線程並執行 Runable里面的任務
@Override
public void run() {
// todo 獲取剛才創建出來的線程,保存在NioEventLoop中的 thread 變量里面, 這里其實就是在進行那個唯一的綁定
thread = Thread.currentThread();
updateLastExecutionTime();
try {
// todo 實際啟動線程, 到這里 NioEventLoop 就啟動完成了
SingleThreadEventExecutor.this.run();
}
}
主要做了兩件事第一波高潮來了 1. 調用了NioEventLoop的線程執行器的execute
,這個方法的源碼在下面,可以看到,excute,其實就是在創建線程, 線程創建完成后,立即把新創建出來的線程當作是NioEventLoop
相伴終生的線程;
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
// todo 必須實現 Executor 里面唯一的抽象方法, execute , 執行性 任務
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
創建/綁定完成了新的線程后,第二波高潮來了, SingleThreadEventExecutor.this.run();
這行代碼的意思是,調用本類的Run()
方法,這個Run()
方法就是真正在干活的事件循環,但是呢, 在本類中,Run()
是一個抽象方法,因此我們要去找他的子類,那么是誰重寫的這個Run()
呢? 就是NioEventLoop, 它根據自己需求,重寫了這個方法
小結: 到現在,NioEventLoop
的線程已經開啟了,下面的重頭戲就是看他是如何進行事件循環的
NioEventLoop
的事件循環run()
我們來到了NioEventLoop
的run()
, 他是個無限for循環, 主要完成了下面三件事
- 輪詢IO事件
- 處理IO事件
- 處理非IO任務
這是NioEventLoop
的run()
的源碼,刪除了部分注解和收尾工作,
/**
* todo select() 檢查是否有IO事件
* todo ProcessorSelectedKeys() 處理IO事件
* todo RunAllTask() 處理異步任務隊列
*/
@Override
protected void run() {
for (; ; ) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// todo 輪詢IO事件, 等待事件的發生, 本方法下面的代碼是處理接受到的感性趣的事件, 進入查看本方法
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio; // todo 默認50
// todo 如果ioRatio==100 就調用第一個 processSelectedKeys(); 否則就調用第二個
if (ioRatio == 100) {
try {
// todo 處理 處理發生的感性趣的事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// todo 用於處理 本 eventLoop外的線程 扔到taskQueue中的任務
runAllTasks();
}
} else {// todo 因為ioRatio默認是50 , 所以來else
// todo 記錄下開始的時間
final long ioStartTime = System.nanoTime();
try {
// todo 處理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// todo 根據處理IO事件耗時 ,控制 下面的runAllTasks執行任務不能超過 ioTime 時間
final long ioTime = System.nanoTime() - ioStartTime;
// todo 這里面有聚合任務的邏輯
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
下面進入它的select()
,我們把select()
稱作: 基於deadline的任務穿插處理邏輯
下面直接貼出它的源碼:下面的代碼中我寫了一些注解了, 主要是分如下幾步走
- 根據當前時間計算出本次for()的最遲截止時間, 也就是他的deadline
- 判斷1 如果超過了 截止時間,
selector.selectNow();
直接退出 - 判斷2 如果任務隊列中出現了新的任務
selector.selectNow();
直接退出 - 經過了上面12兩次判斷后, netty 進行阻塞式
select(time)
,默認是1秒這時可會會出現空輪詢的Bug - 判斷3 如果經過阻塞式的輪詢之后,出現的感興趣的事件,或者任務隊列又有新任務了,或者定時任務中有新任務了,或者被外部線程喚醒了 都直接退出循環
- 如果前面都沒出問題,最后檢驗是否出現了JDK空輪詢的BUG
// todo 循環接受IO事件
// todo 每次進行 select() 操作時, oldWakenUp被標記為false
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
///todo ----------------------------------------- 如下部分代碼, 是 select()的deadLine及任務穿插處理邏輯-----------------------------------------------------
// todo selectCnt這個變量記錄了 循環 select的次數
int selectCnt = 0;
// todo 記錄當前時間
long currentTimeNanos = System.nanoTime();
// todo 計算出估算的截止時間, 意思是, select()操作不能超過selectDeadLineNanos這個時間, 不讓它一直耗着,外面也可能有任務等着當前線程處理
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
// -------for 循環開始 -------
for (; ; ) {
// todo 計算超時時間
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {// todo 如果超時了 , 並且selectCnt==0 , 就進行非阻塞的 select() , break, 跳出for循環
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// todo 判斷任務隊列中時候還有別的任務, 如果有任務的話, 進入代碼塊, 非阻塞的select() 並且 break; 跳出循環
//todo 通過cas 把線程安全的把 wakenU設置成true表示退出select()方法, 已進入時,我們設置oldWakenUp是false
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
///todo ----------------------------------------- 如上部分代碼, 是 select()的deadLine及任務穿插處理邏輯-----------------------------------------------------
///todo ----------------------------------------- 如下, 是 阻塞式的select() -----------------------------------------------------
// todo 上面設置的超時時間沒到,而且任務為空,進行阻塞式的 select() , timeoutMillis 默認1
// todo netty任務,現在可以放心大膽的 阻塞1秒去輪詢 channel連接上是否發生的 selector感性的事件
int selectedKeys = selector.select(timeoutMillis);
// todo 表示當前已經輪詢了SelectCnt次了
selectCnt++;
// todo 阻塞完成輪詢后,馬上進一步判斷 只要滿足下面的任意一條. 也將退出無限for循環, select()
// todo selectedKeys != 0 表示輪詢到了事件
// todo oldWakenUp 當前的操作是否需要喚醒
// todo wakenUp.get() 可能被外部線程喚醒
// todo hasTasks() 任務隊列中又有新任務了
// todo hasScheduledTasks() 當時定時任務隊列里面也有任務
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
///todo ----------------------------------------- 如上, 是 阻塞式的select() -----------------------------------------------------
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// todo 每次執行到這里就說明,已經進行了一次阻塞式操作 ,並且還沒有監聽到任何感興趣的事件,也沒有新的任務添加到隊列, 記錄當前的時間
long time = System.nanoTime();
// todo 如果 當前的時間 - 超時時間 >= 開始時間 把 selectCnt設置為1 , 表明已經進行了一次阻塞式操作
// todo 每次for循環都會判斷, 當前時間 currentTimeNanos 不能超過預訂的超時時間 timeoutMillis
// todo 但是,現在的情況是, 雖然已經進行了一次 時長為timeoutMillis時間的阻塞式select了,
// todo 然而, 我執行到當前代碼的 時間 - 開始的時間 >= 超時的時間
// todo 但是 如果 當前時間- 超時時間< 開始時間, 也就是說,並沒有阻塞select, 而是立即返回了, 就表明這是一次空輪詢
// todo 而每次輪詢 selectCnt ++; 於是有了下面的判斷,
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
// todo selectCnt如果大於 512 表示cpu確實在空輪詢, 於是rebuild Selector
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// todo 它的邏輯創建一個新的selectKey , 把老的Selector上面的key注冊進這個新的selector上面 , 進入查看
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
// todo 解決了Select空輪詢的bug
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
//// -----------for 循環結束 --------------
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
什么是Jdk的Selector空輪詢
我們可以看到,上面的run()
方法,經過兩次判斷后進入了指定時長的阻塞式輪詢,而我們常說的空輪詢bug,指的就是本來該阻塞住輪詢,但是卻直接返回了, 在這個死循環中,它的暢通執行很可能使得CPU的使用率飆升, 於是把這種情況說是jdk的selector的空輪詢的bug
Netty 如何解決了Jdk的Selector空輪詢bug?
一個分支語句 if(){}else{}
, 首先他記錄下,現在執行判斷時的時間, 然后用下面的公式判斷
當前的時間t1 - 預訂的deadLine截止時間t2 >= 開始進入for循環的時間t3
我們想, 如果說,上面的阻塞式select(t2)
沒出現任何問題,那么 我現在來檢驗是否出現了空輪詢是時間t1 = t2+執行其他代碼的時間, 如果是這樣, 上面的等式肯定是成立的, 等式成立說沒bug, 順道把selectCnt = 1;
但是如果出現了空輪詢,select(t2)
並沒有阻塞,而是之間返回了, 那么現在的時間 t1 = 0+執行其他代碼的時間, 這時的t1相對於上一個沒有bug的大小,明顯少了一個t2, 這時再用t1-t2 都可能是一個負數, 等式不成立,就進入了else的代碼塊, netty接着判斷,是否是真的在空輪詢, 如果說循環的次數達到了512次, netty就確定真的出現了空輪詢, 於是nettyrebuild()
Selector ,從新開啟一個Selector, 循環老的Selector上面的上面的注冊的時間,重新注冊進新的 Selector上,用這個中替換Selector的方法,解決了空輪詢的bug
感性趣的事件,是何時添加到selectedkeys
中的?
ok, run()
的三部曲第一步輪詢已經完成了, 下一步就是處理輪詢出來的感興趣的IO事件,processSelectedKeys()
,下面我們進入這個方法, 如果這個selectedKeys不為空,就進去 processSelectedKeysOptimized();
繼續處理IO事件,
比較有趣的是,這個selectedKeys是誰? ,別忘了我們是在NioEventLoop
中,是它開啟了Selector,也是他使用反射的手段將Selector,存放感興趣事件的HashSet集合替換成了SelectedSelectionKeySet
這個名叫set,實為數組的數據結構, 當時的情況如下:
- 創建出
SelectedSelectionKeySet
的實例selectedKeySet
- 使用反射,將
unwrappedSelector
中的selectedKeysField
字段,替換成selectedKeySet
- 最后一步, 也很重要
selectedKeys = selectedKeySet;
看到第三步沒? 也就是說,我們現在再想獲取裝有感興趣Key的 HashSet集合,已經不可能了,取而代之的是更優秀的selectedKeySet
,也就是下面我們使用的selectedKeys
,於是我們想處理感性趣的事件,直接從selectedKeys
中取, Selector輪詢到感興趣的事件,也會直接往selectedKeys
中放
private void processSelectedKeys() {
// todo selectedKeys 就是經過優化后的keys(底層是數組)
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
下面接着跟進processSelectedKeysOptimized();
,關於這個方法的有趣的地方,我寫在這段代碼的下面
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// todo 數組輸出空項, 從而允許在channel 關閉時對其進行垃圾回收
// See https://github.com/netty/netty/issues/2363
// todo 數組中當前循環對應的keys質空, 這種感興趣的事件只處理一次就行
selectedKeys.keys[i] = null;
// todo 獲取出 attachment,默認情況下就是注冊進Selector時,傳入的第三個參數 this===> NioServerSocketChannel
// todo 一個Selector中可能被綁定上了成千上萬個Channel, 通過K+attachment 的手段, 精確的取出發生指定事件的channel, 進而獲取channel中的unsafe類進行下一步處理
final Object a = k.attachment();
// todo
if (a instanceof AbstractNioChannel) {
// todo 進入這個方法, 傳進入 感興趣的key + NioSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
NioEventLoop
是如何在千百條channel中,精確獲取出現指定感興趣事件的channel的?
上面這個方法,就是在真真正正的處理IO事件, 看看這段代碼, 我們發現了這樣一行代碼
final Object a = k.attachment();
並且,判斷出Key的類型后,執行處理邏輯的代碼中的入參都是一樣的processSelectedKey(a,k)
, 這是在干什么呢?
其實,我們知道,每個NioEventLoop
開始干活后,會有很多客戶端的連接channel前來和它建立連接,一個事件循環同時為多條channel服務,而且一條channel的整個生命周期都只和一個NioEventLoop關聯
現在好了,事件循環的選擇器輪詢出了諸多的channel中有channel出現了感興趣的事件,下一步處理這個事件的前提得知道,究竟是哪個channel?
使用的attachment特性,早在Channel注冊進Selector時,進存放進去了,下面是Netty中,Channel注冊進Selector的源碼
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// todo javaChannel() -- 返回SelectableChanel 可選擇的Channel,換句話說,可以和Selector搭配使用,他是channel體系的頂級抽象類, 實際的類型是 ServerSocketChannel
// todo eventLoop().unwrappedSelector(), -- > 獲取選擇器, 現在在AbstractNioChannel中 獲取到的eventLoop是BossGroup里面的
// todo 到目前看, 他是把ServerSocketChannel(系統創建的) 注冊進了 EventLoop的選擇器
// todo 到目前為止, 雖然注冊上了,但是它不關心任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
這里的 最后一個參數是 this是當前的channel , 意思是把當前的Channel當成是一個 attachment(附件) 綁定到selector上 作用如下:
- 當channel在這里注冊進 selector中返回一個selectionKey, 這個key告訴selector 這個channel是自己的
- 當selector輪詢到 有channel出現了自己的感興趣的事件時, 需要從成百上千的channel精確的匹配出 出現Io事件的channel,於是seleor就在這里提前把channel存放入 attachment中, 后來使用
- 最后一個 this 參數, 如果是服務啟動時, 他就是NioServerSocketChannel 如果是客戶端他就是 NioSocketChannel
ok, 現在就捋清楚了,挖坑,填坑的過程; 下面進入processSelectedKey(SelectionKey k, AbstractNioChannel ch)
執行IO任務, 源碼如下: 我們可以看到,具體的處理IO的任務都是用Channel的內部類unSafe()完成的, 到這里就不往下跟進了, 后續寫新博客連載
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// todo 這個unsafe 也是可channel 也是和Channel進行唯一綁定的對象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { // todo 確保Key的合法
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) { // todo 確保多線程下的安全性
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
// todo NioServerSocketChannel和selectKey都合法的話, 就進入下面的 處理階段
try {
// todo 獲取SelectedKey 的 關心的選項
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// todo 在read() write()之前我們需要調用 finishConnect() 方法, 否則 NIO JDK拋出異常
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps( );
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// todo 同樣是檢查 readOps是否為零, 來檢查是否出現了 jdk 空輪詢的bug
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
處理非IO任務
上面的處理IO事件結束后,第三波高潮就來了,處理任務隊列中的任務, runAllTask(timeOutMinils)
, 他也是有生命時長限制的 deadline, 它主要完成了如下的幾步:
- 聚合任務, 把到期的定時任務轉移到普通任務隊列
- 循環從普通隊列獲取任務
- 執行任務
- 每執行完64個任務,判斷是否到期了
- 收尾工作
源碼如下:
protected boolean runAllTasks(long timeoutNanos) {
// todo 聚合任務, 會把定時任務放入普通的任務隊列中 進入查看
fetchFromScheduledTaskQueue();
// todo 從普通的隊列中拿出一個任務
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// todo 計算截止時間, 表示任務的執行,最好別超過這個時間
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// todo for循環執行任務
for (;;) {
// todo 執行任務, 方法里調用 task.run();
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// todo 因為 nanoTime();的執行也是個相對耗時的操作,因此沒執行完64個任務后,檢查有沒有超時
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// todo 拿新的任務
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// todo 每個任務執行結束都有個收尾的構造
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
NioEventLoop
如何聚合任務?
聚合任務就是把已經到執行時間的任務從定時任務隊列中全部取出 ,放入普通任務隊列然后執行, 我們進入上的第一個方法fetchFromScheduledTaskQueue
,源碼如下,
private boolean fetchFromScheduledTaskQueue() {
// todo 拉取第一個聚合任務
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// todo 從任務丟列中取出 截止時間是 nanoTime的定時任務 ,
// todo 往定時隊列中添加 ScheduledFutureTask任務, 排序的基准是 ScheduledFutureTask 的compare方法,按照時間,從小到大
// todo 於是當我們發現隊列中的第一個任務,也就是截止時間最近的任務的截止時間比我們的
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
// todo scheduledTask != null表示定時任務該被執行了, 於是將定時任務添加到 普通任務隊列
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// todo 如果添加失敗了, 把這個任務從新放入到定時任務隊列中, 再嘗試添加
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// todo 循環,嘗試拉取定時任務 , 循環結束后,所有的任務全部會被添加到 task里面
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
根據指定的截止時間,從定時任務隊列中取出任務,定時任務隊列中任務按照時間排序,時間越短的,排在前面, 時間相同,按照添加的順序排序, 現在的任務就是檢查定時任務隊列中任務,嘗試把里面的任務挨個取出來,於是netty使用這個方法 Runnable scheduledTask = pollScheduledTask(nanoTime);
然后馬上在while(){}
循環中判斷是否存在, 這個方法實現源碼如下, 不難看出,他是在根據時間判斷
/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
* todo 根據給定的納秒值,返回 Runable定時任務 , 並且,每次使用都要沖洗使用是nanoTime() 來矯正時間
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
// todo 如果定時任務的截止時間<= 我們穿進來的時間, 就把他返回
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
// todo 否則返回kong,表示當前所有的定時任務都沒到期, 沒有可以執行的
return null;
}
經過循環之后,到期的任務,全被添加到 taskQueue里面了,下面就是執行TaskQueue里面的任務
任務隊列中的任務是怎么執行的?
safeExecute(task);
方法,執行任務隊列中的任務
源碼如下: 實際上就行執行了 task這個Runable的Run方法
/**
* Try to execute the given {@link Runnable} and just log if it throws a {@link Throwable}.
*/
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
總結一下: 到現在為止,EventLoop已經啟動了, 一說到NioEventLoop總是想起上圖, 現在他可以接受新的連接接入,輪詢,處理任務...