這節我們着重介紹netty最為核心的組件EventLoopGroup和EventLoop
EventLoopGroup:顧名思義就是EventLoop的組,下面來看它們的繼承結構
在netty中我們可以把EventLoop看做一個線程,當然線程不單是jdk中的的線程,它們都從Executor一路繼承過來,NioEventLoop繼承SinfleThreadEventLoop,從名字可以看出它是一個單線程的EventLoop,
我們先來看NioEventLoop是如何構造的
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } ~~~~~~~~~~~~~ public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory, final RejectedExecutionHandler rejectedExecutionHandler) { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); }
NioEventLoop構造函數非常多,每個參數都可以定制,我就不全貼出來了,最后回到這個參數最全的構造函數,下面我們挨個解釋每個參數的作用
- nThreads: 線程數,對應EventLoop的數量,為0時 默認數量為CPU核心數*2
- executor: 這個我們再熟悉不過,最終用來執行EventLoop的線程
- chooserFactor: 當我們提交一個任務到線程池,chooserFactor會根據策略選擇一個線程來執行
- selectorProvider:用來實例化jdk中的selector,沒一個EventLoop都有一個selector
- selectStrategyFactory:用來生成后續線程運行時對應的選擇策略工廠
- rejectedExecutionHandler:跟jdk中線程池中的作用一樣,用於處理線程池沒有多余線程的情況,默認直接拋出異常
接着我們進入父類MultithreadEventLoopGroup的構造函數
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 創建nThreads大小的EventLoop數組 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 創建具體的EventLoop,會調用子類NioEventLoopGruop中的方法 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 如果其中有一個創建失敗,把之前創建好的都關閉掉 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 把剛才創建好的EventLoop提供給EventExecutorChooser,用於后續選擇 chooser = chooserFactory.newChooser(children); // 添加一個EventLoop監聽器,用來監聽EventLoop終止狀態 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { // 循環加入 e.terminationFuture().addListener(terminationListener); } // 將EventLoop數組轉成一個只讀的set Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
我們繼續跟到父類NioEventLoopGroup中的newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
可以看出先用來創建EventLoopGroup的參數其實都是用來創建EventLoop的,我們繼續跟NioEventLoop的構造
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; // 創建selector,由此可見每一個EventLoop都會有一個selector final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); // 用來添加task的阻塞隊列 鏈表結構 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
這里我們只是創建EventLoop,同時設置了執行的線程池、selector、taskQueue,並添加一個監聽器用來監聽它們的關閉狀態,當所有線程都全部處於關閉狀態terminationFuture會被置為true,到此還是沒有實際創建可執行的thread。
后續我們在介紹channel的時候就知道EventLoop和channel如何建立關系,什么時候執行線程,我們姑且把EventLoop當做一個可執行runnable的netty線程