Netty源碼學習(三)NioEventLoop


0. NioEventLoop簡介

NioEventLoop如同它的名字,它是一個無限循環(Loop),在循環中不斷處理接收到的事件(Event)

在Reactor模型中,NioEventLoop就是Worker的角色,關聯於多個Channel,監聽這些Channel上的read/write事件,一旦有事件發生,就做出相應的處理

 

1. NioEventLoop類圖

繼承關系可以說是相當復雜了,我們慢慢分析

 

2.  NioEventLoop的構造方法

 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }


    private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();//利用JDK提供的SelectorProvider直接創建一個Selector
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEYSET_OPTIMIZATION) {//如果沒有開啟KEYSET優化,將上面的那個Selector直接返回
            return new SelectorTuple(unwrappedSelector);
        }

        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//創建一個專門存放SelectionKey的Set對象

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {//用反射的方式創建一個SelectorImpl對象
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);//強制將SelectorImpl中的selectedKeys域替換為優化版的SelectedSelectionKeySet對象
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);//強制將SelectorImpl中的publicSelectedKeys域替換為優化版的SelectedSelectionKeySet對象
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

構造方法的主要作用是創建了一個SelectorImpl對象,如果沒有設置DISABLE_KEYSET_OPTIMIZATION屬性,SelectorImpl中類型為Set<SelectionKey>的selectedKeys與publicSelectedKeys域會被替換為一個SelectedSelectionKeySet對象。

為什么搞得這么麻煩呢?因為默認的域是Set類型,插入元素的開銷是o(log n),而優化版的SelectedSelectionKeySet繼承了AbstractSet,具有Set的功能,但是內部是用數組實現,只具有add功能,而且其開銷為o(1)。

 

3. NioEventLoop.run()

run方法是NioEventLoop的核心,此方法會在無限循環中監聽關聯的Channel上是否有新事件產生

    @Override
    protected void run() {
        for (;;) {//無限循環
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//如果任務隊列中有任務,調用selectNow()方法,如果沒有,則直接返回SelectStrategy.SELECT
                    case SelectStrategy.CONTINUE://沒搞懂這個分支的目的是什么,全局搜了一下SelectStrategy.CONTINUE,沒發現有賦這個值的地方
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));//調用select()方法,嘗試從關聯的channel里讀取IO事件。需要注意的是這個select方法相當復雜,因為它悄悄的解決了老版本的JDK的select方法存在的bug,有興趣的可以仔細分析一下相關源碼
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;//ioRatio代表EventLoop會花多少時間在IO事件上
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();//處理IO事件
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();//處理CPU事件
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();//處理IO事件
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;//本次循環處理IO事件的耗時
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//分給CPU事件的耗時
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }


    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();//處理IO事件。由於這里采用的是優化版的SelectorImpl,IO事件已經被寫在selectedKeys屬性里了,所以無需額外傳參
        } else {
            processSelectedKeysPlain(selector.selectedKeys());//未優化版的SelectorImpl,需要調用selectedKeys()方法才能獲取准備好的IO事件
        }
    }


    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {//遍歷已經准備好的IO事件
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;//手動將數組元素賦為null,以幫助gc(因為在系統壓力大的時候,SelectionKey數組靠后的部分會被占用,如果不手動將用過的元素設置為null,那么在系統壓力小的時候,這些元素是不會被釋放的,也就是內存泄漏了)

            final Object a = k.attachment();//附件是這個事件所關聯的Channel,后續的代碼會直接從這個Channel上讀取數據

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);//處理某個IO事件
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();//獲取事件的類型
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//不太理解這段代碼的原因
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {//如果是可寫事件,則flush緩存
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//如果是可寫事件,則調用unsafe.read()方法
                unsafe.read();//此處的unsafe為NioSocketChannel.NioSocketChannelUnsafe
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

代碼很長,大概邏輯如下:

a. 在無限循環中調用Selector.select方法

b. 使用ioRatio控制IO事件與CPU事件的耗時比例(ioRatio的默認值為50)

c. 如果有IO事件發生,遍歷所有IO事件並調用processSelectedKey方法

d. 在processSelectedKey中,如果發現事件是READ/ACCEPT類型,則調用NioSocketChannel.NioSocketChannelUnsafe.read()方法對事件進行處理。其實際實現位於AbstractNioByteChannel中:

        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();//獲取用戶注冊的pipeline
            final ByteBufAllocator allocator = config.getAllocator();//默認值為PooledByteBufAllocator,也就是申請direct memory
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);//開辟一塊內存作為buffer
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));//doReadBytes方法會從綁定的Channel里讀取數據到buffer里。順便記錄一下本次讀了多少字節的數據出來
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);//計數
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//觸發pipeline的channelRead事件
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();//觸發pipeline的channelReadComplete事件

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

這段代碼主要做了兩件事情:

a. 在循環中將Channel可讀的數據讀到一個臨時的ByteBuf中

b. 調用pipeline.fireChannelRead方法,處理讀取到數據。(具體的處理邏輯在后續文章中闡述)

 

現在我們已經基本搞清楚NioEventLoop的工作邏輯了:

在無限循環中監聽綁定的Channel上的事件,如果有ACCEPT/READ事件發生,則從Channel里讀取數據,並調用關聯的pipeline的fireChannelRead方法進行處理。

 

目前仍不清楚的地方是:NioEventLoop是如何與Channel/Pipeline綁定的?

且聽后文分解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM