netty核心組件之EventLoopGroup和EventLoop


這節我們着重介紹netty最為核心的組件EventLoopGroup和EventLoop

EventLoopGroup:顧名思義就是EventLoop的組,下面來看它們的繼承結構

 

 

 

 

 

 在netty中我們可以把EventLoop看做一個線程,當然線程不單是jdk中的的線程,它們都從Executor一路繼承過來,NioEventLoop繼承SinfleThreadEventLoop,從名字可以看出它是一個單線程的EventLoop,

我們先來看NioEventLoop是如何構造的

public NioEventLoopGroup() {
    this(0);
}
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
~~~~~~~~~~~~~
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                         final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory,
                         final RejectedExecutionHandler rejectedExecutionHandler) {
    super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}

NioEventLoop構造函數非常多,每個參數都可以定制,我就不全貼出來了,最后回到這個參數最全的構造函數,下面我們挨個解釋每個參數的作用

  • nThreads: 線程數,對應EventLoop的數量,為0時 默認數量為CPU核心數*2
  • executor: 這個我們再熟悉不過,最終用來執行EventLoop的線程
  • chooserFactor: 當我們提交一個任務到線程池,chooserFactor會根據策略選擇一個線程來執行
  • selectorProvider:用來實例化jdk中的selector,沒一個EventLoop都有一個selector
  • selectStrategyFactory:用來生成后續線程運行時對應的選擇策略工廠
  • rejectedExecutionHandler:跟jdk中線程池中的作用一樣,用於處理線程池沒有多余線程的情況,默認直接拋出異常

接着我們進入父類MultithreadEventLoopGroup的構造函數

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 創建nThreads大小的EventLoop數組
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 創建具體的EventLoop,會調用子類NioEventLoopGruop中的方法
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 如果其中有一個創建失敗,把之前創建好的都關閉掉
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 把剛才創建好的EventLoop提供給EventExecutorChooser,用於后續選擇
    chooser = chooserFactory.newChooser(children);

    // 添加一個EventLoop監聽器,用來監聽EventLoop終止狀態
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // 循環加入
        e.terminationFuture().addListener(terminationListener);
    }
    // 將EventLoop數組轉成一個只讀的set
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

我們繼續跟到父類NioEventLoopGroup中的newChild

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

可以看出先用來創建EventLoopGroup的參數其實都是用來創建EventLoop的,我們繼續跟NioEventLoop的構造

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 創建selector,由此可見每一個EventLoop都會有一個selector
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    // 用來添加task的阻塞隊列 鏈表結構
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

 

這里我們只是創建EventLoop,同時設置了執行的線程池、selector、taskQueue,並添加一個監聽器用來監聽它們的關閉狀態,當所有線程都全部處於關閉狀態terminationFuture會被置為true,到此還是沒有實際創建可執行的thread。

 后續我們在介紹channel的時候就知道EventLoop和channel如何建立關系,什么時候執行線程,我們姑且把EventLoop當做一個可執行runnable的netty線程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM