0. NioEventLoop簡介
NioEventLoop如同它的名字,它是一個無限循環(Loop),在循環中不斷處理接收到的事件(Event)
在Reactor模型中,NioEventLoop就是Worker的角色,關聯於多個Channel,監聽這些Channel上的read/write事件,一旦有事件發生,就做出相應的處理
1. NioEventLoop類圖
繼承關系可以說是相當復雜了,我們慢慢分析
2. NioEventLoop的構造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector();//利用JDK提供的SelectorProvider直接創建一個Selector } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEYSET_OPTIMIZATION) {//如果沒有開啟KEYSET優化,將上面的那個Selector直接返回 return new SelectorTuple(unwrappedSelector); } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//創建一個專門存放SelectionKey的Set對象 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try {//用反射的方式創建一個SelectorImpl對象 return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet);//強制將SelectorImpl中的selectedKeys域替換為優化版的SelectedSelectionKeySet對象 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);//強制將SelectorImpl中的publicSelectedKeys域替換為優化版的SelectedSelectionKeySet對象 return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
構造方法的主要作用是創建了一個SelectorImpl對象,如果沒有設置DISABLE_KEYSET_OPTIMIZATION屬性,SelectorImpl中類型為Set<SelectionKey>的selectedKeys與publicSelectedKeys域會被替換為一個SelectedSelectionKeySet對象。
為什么搞得這么麻煩呢?因為默認的域是Set類型,插入元素的開銷是o(log n),而優化版的SelectedSelectionKeySet繼承了AbstractSet,具有Set的功能,但是內部是用數組實現,只具有add功能,而且其開銷為o(1)。
3. NioEventLoop.run()
run方法是NioEventLoop的核心,此方法會在無限循環中監聽關聯的Channel上是否有新事件產生
@Override protected void run() { for (;;) {//無限循環 try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//如果任務隊列中有任務,調用selectNow()方法,如果沒有,則直接返回SelectStrategy.SELECT case SelectStrategy.CONTINUE://沒搞懂這個分支的目的是什么,全局搜了一下SelectStrategy.CONTINUE,沒發現有賦這個值的地方 continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));//調用select()方法,嘗試從關聯的channel里讀取IO事件。需要注意的是這個select方法相當復雜,因為它悄悄的解決了老版本的JDK的select方法存在的bug,有興趣的可以仔細分析一下相關源碼 if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio;//ioRatio代表EventLoop會花多少時間在IO事件上 if (ioRatio == 100) { try { processSelectedKeys();//處理IO事件 } finally { // Ensure we always run tasks. runAllTasks();//處理CPU事件 } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys();//處理IO事件 } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime;//本次循環處理IO事件的耗時 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//分給CPU事件的耗時 } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized();//處理IO事件。由於這里采用的是優化版的SelectorImpl,IO事件已經被寫在selectedKeys屬性里了,所以無需額外傳參 } else { processSelectedKeysPlain(selector.selectedKeys());//未優化版的SelectorImpl,需要調用selectedKeys()方法才能獲取准備好的IO事件 } } private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) {//遍歷已經准備好的IO事件 final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;//手動將數組元素賦為null,以幫助gc(因為在系統壓力大的時候,SelectionKey數組靠后的部分會被占用,如果不手動將用過的元素設置為null,那么在系統壓力小的時候,這些元素是不會被釋放的,也就是內存泄漏了) final Object a = k.attachment();//附件是這個事件所關聯的Channel,后續的代碼會直接從這個Channel上讀取數據 if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a);//處理某個IO事件 } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps();//獲取事件的類型 // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//不太理解這段代碼的原因 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) {//如果是可寫事件,則flush緩存 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//如果是可寫事件,則調用unsafe.read()方法 unsafe.read();//此處的unsafe為NioSocketChannel.NioSocketChannelUnsafe } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
代碼很長,大概邏輯如下:
a. 在無限循環中調用Selector.select方法
b. 使用ioRatio控制IO事件與CPU事件的耗時比例(ioRatio的默認值為50)
c. 如果有IO事件發生,遍歷所有IO事件並調用processSelectedKey方法
d. 在processSelectedKey中,如果發現事件是READ/ACCEPT類型,則調用NioSocketChannel.NioSocketChannelUnsafe.read()方法對事件進行處理。其實際實現位於AbstractNioByteChannel中:
@Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline();//獲取用戶注冊的pipeline final ByteBufAllocator allocator = config.getAllocator();//默認值為PooledByteBufAllocator,也就是申請direct memory final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator);//開辟一塊內存作為buffer allocHandle.lastBytesRead(doReadBytes(byteBuf));//doReadBytes方法會從綁定的Channel里讀取數據到buffer里。順便記錄一下本次讀了多少字節的數據出來 if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1);//計數 readPending = false; pipeline.fireChannelRead(byteBuf);//觸發pipeline的channelRead事件 byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete();//觸發pipeline的channelReadComplete事件 if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
這段代碼主要做了兩件事情:
a. 在循環中將Channel可讀的數據讀到一個臨時的ByteBuf中
b. 調用pipeline.fireChannelRead方法,處理讀取到數據。(具體的處理邏輯在后續文章中闡述)
現在我們已經基本搞清楚NioEventLoop的工作邏輯了:
在無限循環中監聽綁定的Channel上的事件,如果有ACCEPT/READ事件發生,則從Channel里讀取數據,並調用關聯的pipeline的fireChannelRead方法進行處理。
目前仍不清楚的地方是:NioEventLoop是如何與Channel/Pipeline綁定的?
且聽后文分解。