eventLoop是基於事件系統機制,主要技術由線程池同隊列組成,是由生產/消費者模型設計,那么先搞清楚誰是生產者,消費者內容
SingleThreadEventLoop 實現
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { private final Queue<Runnable> tailTasks; @Override protected void afterRunningAllTasks() { runAllTasksFrom(tailTasks); } }
SingleThreadEventLoop是個抽象類,從實現代碼上看出很簡單的邏輯邊界判斷
SingleThreadEventExecutor也是個抽象類,代碼量比較大,我們先看重要的成員屬性
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //事件隊列 private final Queue<Runnable> taskQueue; //執行事件線程,可以看出只有一個線程只要用來記錄executor的當前線程 private volatile Thread thread; //主要負責監控該線程的生命周期,提取出當前線程然后用thread記錄 private final Executor executor; //用Atomic*技術記錄當前線程狀態 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); } //啟動線程做了比較判斷 private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } } private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { //記錄當前執行線程 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //這里調用的是子類,注意子類是死循環不停的執行任務 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //更改線程結束狀態 省略部分代碼 for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } try { // 執行未完成任務同 shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { //最后清理操作,如 NioEventLoop實現 selector.close(); cleanup(); } finally { //省略部分代碼 } } } } }); }
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { //安全執行任務 safeExecute(task); //繼續執行剩余任務 task = pollTaskFrom(taskQueue); if (task == null) { return true; } } } protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); //忽略WAKEUP_TASK類型任務 if (task == WAKEUP_TASK) { continue; } return task; } } protected boolean runAllTasks(long timeoutNanos) { //先執行周期任務 fetchFromScheduledTaskQueue(); //從taskQueue提一個任務,如果為空執行所有tailTasks Runnable task = pollTask(); //如果taskQueue沒有任務,立即執行子類的tailTasks if (task == null) { afterRunningAllTasks(); return false; } //計算出超時時間 = 當前 nanoTime + timeoutNanos final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; //當執行任務次數大於64判斷是否超時,防止長時間獨占CPU if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
//SingleThreadEventLoop run 實現 public class DefaultEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); } if (confirmShutdown()) { break; } } } }
我們可以在SingleThreadEventExecutor 兩個runAllTasks 方法打上斷點,看執行任務時調用邏輯
本人為了搞清楚 taskQueue 同tailTasks 類型任務,在任務入隊時打斷點,分別為 SingleThreadEventLoop executeAfterEventLoopIteration方法同 SingleThreadEventExecutor offerTask方法
ServerBootstrap[bind address] ->
NioEventLoopGroup [register Channel] -> [ChannelPromise] ->
NioEventLoop [build and push register task]
從調用鏈可以清晰看出,啟動 netty server 綁定生成抽象 Channel 然后l轉換成ChannelPromise,再調用注冊實現register0
這里用了判斷是否為當前線程,如果是不用加入隊列馬上執行,目前減少上下文切換開削
if (eventLoop.inEventLoop()) { register0(promise); } else { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); }
總結:
1.SingleThreadEventLoop 任務執行加了超時限制,目的防止當前線程長時間執行任務獨占cpu
2.提交任務時做了減少上下文開削優化
3.執行任務優先級 1.周期任務 2.taskQueue 3.tailTasks
目前沒有看到任何調用 SingleThreadEventLoop executeAfterEventLoopIteration 方法,估計是擴展處理。
4.用到Atomic*技術解決並發問題,從Executor提取當前線程,把單一線程維護交給Executor