Netty源碼分析之NioEventLoop(一)—NioEventLoop的創建


一、NioEventLoop的概述

NioEventLoop做為Netty線程模型的核心部分,從本質上講是一個事件循環執行器,每個NioEventLoop都會綁定一個對應的線程通過一個for(;;)循環來處理與 Channel 相關的 IO 操作, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;其次作為任務隊列, 執行 taskQueue 中的任務, 例如eventLoop.schedule 提交的定時任務也是這個線程執行的。而NioEventLoopGroup顧名思義,它是維護了一組這樣的事件循環器,這也是Netty基於Reactor模式的具體設計體現。

接下來我們就結合具體的代碼,對NioEventLoop的整個創建流程進行一個說明與總結

二、NioEventLoop的創建

我們基於Netty構建服務端還是客戶端時,都首先需要創建NioEventLoopGroup 實例

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

 NioEventLoopGroup 做為基於NIO的處理channle相關IO操作的事件循環器組,它的類層次結構如下

通過NioEventLoopGroup構造函數傳入線程數量

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

 NioEventLoopGroup最終的構造函數中會包含以下幾個函數

1、nThreads:傳入的線程數量

2、executor :線程執行器Executor接口,默認為空

3、selectorProvider:用於創建Selector的SelectorProvider 

4、selectStrategyFactory:傳入DefaultSelectStrategyFactory.INSTANCE,  一個使用默認選擇策略的工廠。

5、RejectedExecutionHandlers.reject():Netty自定義線程拒絕策略

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

 在父類MultithreadEventLoopGroup中,會根據你傳入nThreads大小,確定初始化的線程數量,為0且沒有設置io.netty.eventLoopThreads參數項,則會以當前系統的核心線程數*2做為默認的線程數量

    static {
        //如果沒有設置io.netty.eventLoopThreads參數項,則會以當前運行系統的核心線程數*2作為線程數
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

接下來在MultithreadEventExecutorGroup的構造函數中我們會根據傳入的線程數,去初始化和創建一組NioEventLoop

首先我們看下NioEventLoop的類層次結構

 

下面在MultithreadEventExecutorGroup構造函數中主要完成以下幾個功能:

1、初始化ThreadPerTaskExecutor線程執行器,並傳入一個線程創建工廠,用於NioEventLoop對應線程的創建

2、根據傳入的線程數,初始化一個EventExecutor數組,用於放置創建的NioEventLoop對象

3、循環數組,通過newChild方法創建NioEventLoop對象。

    /**  
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            // 創建線程工廠,netty根據需要指定了線程的命名方式、優先級、是否是守護線程等屬性
            // 該線程池沒有任何隊列,提交任務后,創建任何線程類型都是 FastThreadLocalRunnable, 並且立即start。
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //初始化一組事件循環執行器
        children = new EventExecutor[nThreads];

        //根據傳入的線程數,初始化一個線程數組
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 創建 new NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

 繼續跟蹤進入newChild(executor, args)內部,看到它會返回一個NioEventLoop對象

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

繼續查看NioEventLoop構造函數和他的父類構造函數

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

父類構造函數

    /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param executor          the {@link Executor} which will be used for executing
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

通過上面的代碼我們可以看到,初始化NioEventLoop主要完成了以下的功能

1、保存線程執行器ThreadPerTaskExecutor

2、創建一個selector 

3、基於LinkedBlockingQueue創建一個taskQueue任務隊列,用於保存要執行的任務

這些都是為了后續的循環執行Channel 相關事件所做准備。

到這里其實我們創建了一組NioEventLoop,也就是一組事件循環執行器,每個NioEventLoop中都有對應的一個線程和一個selector ;創建完畢之后,自然就是要為每一個連接分配對應的NioEventLoop。Netty中通過

實現EventLoopGroup接口中的next()方法來返回一個可以使用的的NioEventLoop

public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();
}

在MultithreadEventExecutorGroup中我們可以查看它的具體實現方式

   chooser = chooserFactory.newChooser(children);
    
   @Override
    public EventExecutor next() {
        return chooser.next();
    }

進入代碼內部我們可以看到Netty針對數組大小,對數組下標的計算方式進行了優化

/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //判斷是否是二的次冪,如果為true返回PowerOfTwoEventExecutorChooser,反之GenericEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        //通過&運算的方式循環獲取數組下標
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        //通過取模的方式循環獲取數組下標
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

到此我們基本把Netty中NioEventLoop及NioEventLoopGroup的創建流程及核心代碼梳理了一遍。NioEventLoop做為Netty線程模型的核心部分包含的內容比較多,上面只是初始化及創建的一部分內容,后續的部分我會陸續的補齊,其中有錯誤和不足之處還請指正與海涵。

 

關注微信公眾號,查看更多技術文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM