netty 處理客戶端連接


Netty如何處理連接事件

上文講了Netty如何綁定端口,現在我們來閱讀下netty如何處理connect事件。上文我們說了NioEventLoop啟動后不斷去調用select的事件,當客戶端連接時候,回觸發processSelectedKeys方法,然后調用 processSelectedKey方法

SelectKey 說明
OP_READ 讀 1
OP_WRITE 寫 4
OP_CONNECT 客戶端connect 8
OP_ACCEPT 連接 16
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 省略。。。
        try {
            // 16 是連接事件
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
               // 調用NioMessageUnsafe的read方法
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

通過代碼我們知道調用的是NioMessageUnsafe的read方法,進入方法我們源碼,我門發現其調用了NioServerSocketChannel的doReadMessages方法,

      try {
                    do {
                        // 調用NioServerSocketChannel的doReadMessages方法
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (continueReading(allocHandle));
                } catch (Throwable t) {
                    exception = t;
                }
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 觸發fireChannelRead事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
       //這里處理java的accpet事件
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                // 丟到buf里 然后觸發channelRead事件
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

之前文章我門在綁定端口時候,pipiline中添加了一個ServerBootstrapAcceptor類,我門看下其channelRead方法的實現

      @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // 這個其實就是上面的NioSocketChannel
            final Channel child = (Channel) msg;
            // 我們啟動設置的ChannelInitializer
            child.pipeline().addLast(childHandler);
            //設置 options
            setChannelOptions(child, childOptions, logger);       //設置 attributes
            setAttributes(child, childAttrs);

            try {
                // 注冊child?
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

接下來我們繼續看下childGroup.register(child)如何注冊channel的。邏輯就是調用EveentLoopGrpoup的next方法分配SingleThreadEventLoop,調用其register方法

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // 最終走到NioSocketChannel的unsafe對象,registry這個方法在Abstract&AbstractUnsafe對象里實現
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

我們看下register方法實現,我們發現這個其實跟NioServerSocketChannel的注冊是一樣的,綁定EventLoop,並且開啟EventLoop,然后調用其

       @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            // 綁定當前的EventLoop
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                  // 第一次提交Runnable會啟動EventLoop線程去啟動事件,具體之前Netty綁定端口文章我寫過。如何去啟動的
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

繼續看下register0方法

 private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 開始注冊綁定selectKey
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    //第一次注冊時候會調用HeadContext的channelActive完成讀事件的注冊
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

這邊如何調用鏈路比較長 我這邊以斷點形式給大家展示一下

完成讀事件的注冊
完成讀事件的注冊

至此netty完成了處理客戶端的連接,綁定EventLoop,並且開啟EventLoop,完成讀事件的注冊,

結束

識別下方二維碼!回復: 入群 ,掃碼加入我們交流群!

點贊是認可,在看是支持
點贊是認可,在看是支持


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM