[編織消息框架][netty源碼分析]3 EventLoop 實現類SingleThreadEventLoop職責與實現


eventLoop是基於事件系統機制,主要技術由線程池同隊列組成,是由生產/消費者模型設計,那么先搞清楚誰是生產者,消費者內容

SingleThreadEventLoop 實現

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    private final Queue<Runnable> tailTasks;

    @Override
    protected void afterRunningAllTasks() {
        runAllTasksFrom(tailTasks);
    }
}

SingleThreadEventLoop是個抽象類,從實現代碼上看出很簡單的邏輯邊界判斷

SingleThreadEventExecutor也是個抽象類,代碼量比較大,我們先看重要的成員屬性

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //事件隊列
    private final Queue<Runnable> taskQueue;
    //執行事件線程,可以看出只有一個線程只要用來記錄executor的當前線程
    private volatile Thread thread;
    //主要負責監控該線程的生命周期,提取出當前線程然后用thread記錄
    private final Executor executor;
    //用Atomic*技術記錄當前線程狀態
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
                AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
}

//啟動線程做了比較判斷
private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

private void doStartThread() {
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //記錄當前執行線程
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                //這里調用的是子類,注意子類是死循環不停的執行任務
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                //更改線程結束狀態 省略部分代碼
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                try {
                    // 執行未完成任務同 shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        //最后清理操作,如 NioEventLoop實現 selector.close();
                        cleanup();
                    } finally {
                        //省略部分代碼
                    }
                }
            }
        }
    });
}

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { //安全執行任務 safeExecute(task); //繼續執行剩余任務 task = pollTaskFrom(taskQueue); if (task == null) { return true; } } } protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); //忽略WAKEUP_TASK類型任務 if (task == WAKEUP_TASK) { continue; } return task; } } protected boolean runAllTasks(long timeoutNanos) { //先執行周期任務 fetchFromScheduledTaskQueue(); //從taskQueue提一個任務,如果為空執行所有tailTasks Runnable task = pollTask(); //如果taskQueue沒有任務,立即執行子類的tailTasks if (task == null) { afterRunningAllTasks(); return false; } //計算出超時時間 = 當前 nanoTime + timeoutNanos final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; //當執行任務次數大於64判斷是否超時,防止長時間獨占CPU if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }

 

//SingleThreadEventLoop run 實現
public class DefaultEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }
    }
}

 

我們可以在SingleThreadEventExecutor  兩個runAllTasks 方法打上斷點,看執行任務時調用邏輯

 

 本人為了搞清楚 taskQueue 同tailTasks 類型任務,在任務入隊時打斷點,分別為 SingleThreadEventLoop executeAfterEventLoopIteration方法同 SingleThreadEventExecutor offerTask方法

 

 

ServerBootstrap[bind address] ->

NioEventLoopGroup [register Channel] ->  [ChannelPromise] ->

NioEventLoop [build and push register task]

從調用鏈可以清晰看出,啟動 netty server 綁定生成抽象 Channel 然后l轉換成ChannelPromise,再調用注冊實現register0

這里用了判斷是否為當前線程,如果是不用加入隊列馬上執行,目前減少上下文切換開削

if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    eventLoop.execute(new Runnable() {
        @Override
        public void run() {
            register0(promise);
        }
    });
}

 

總結:

1.SingleThreadEventLoop 任務執行加了超時限制,目的防止當前線程長時間執行任務獨占cpu

2.提交任務時做了減少上下文開削優化

3.執行任務優先級 1.周期任務 2.taskQueue 3.tailTasks

目前沒有看到任何調用 SingleThreadEventLoop executeAfterEventLoopIteration 方法,估計是擴展處理。

4.用到Atomic*技術解決並發問題,從Executor提取當前線程,把單一線程維護交給Executor 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM