Netty ServerBootstrap如何綁定端口


ServerBootstrap監聽端口

接下來帶他們通過源碼去分析下ServerBootstrap是如何監聽端口

流程

源碼分析

1. 先看一下啟動demo

            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

2. ServerBootstrap.bind(PORT)

首先從ServerBootstrap.bind(PORT)入手,開始看下他是如何去監聽端口,完成Nio底層的一些封裝。直接看其抽象類AbstractBootstrap的方法實現

    private ChannelFuture doBind(final SocketAddress localAddress) {
        
        final ChannelFuture regFuture = initAndRegister(); // 初始化並且去注冊channel
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

3. 我們先分析下initAndRegister()到底干了什么?

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); // 這邊是ReflectiveChannelFactory類通過反射去創建我們初始化bootstrap設置的Channel,這里由於我們是服務端,那就是NioServerSocketChannel
            init(channel);  
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

4. ServerBootstrap類 init方法

分析一下這個init(channel)干了什么

  void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger); // 設置channelOptions
        setAttributes(channel, newAttributesArray());// 設置Attributes

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

        // 往ChannelPipeline添加了一個ChannelInitializer,此時channelPipeline里結構為。Head-> ChannelInitializer -> Tail
        p.addLast(new ChannelInitializer <Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

總結一下 init大致就是設置一些配置屬性以及添加了一個ChannelInitializer,這個ChannelInitializer看他的方法好像是設置一個ServerBootstrapAcceptor,具體哪里執行這個ChannelInitializer不清楚,帶着疑惑我們繼續往下看。

5. MultithreadEventLoopGroup類 register方法

回到步驟3我們看下這行代碼

ChannelFuture regFuture = config().group().register(channel);

config().group() 這個代碼就是通過ServerBootstrapConfig的group()方法去獲取我們設置的NioEventLoopGroup(boss線程)

NioEventLoopGroup類的register方法在父類MultithreadEventLoopGroup中實現:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

MultithreadEventLoopGroup 種next()返回的實例是 SingleThreadEventLoop,因此我們直接看SingleThreadEventLoop的registry方法,通過方法的調用鏈路最終找到下面這個方法:

    @Deprecated
    @Override
    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        ObjectUtil.checkNotNull(channel, "channel");
        channel.unsafe().register(this, promise);
        return promise;
    }

這下出來了一個新的東西 channel.unsafe(),我們先分析下這個東西返回的是什么,因為我們知道我們的channel是NioServerSocketChannel,所以我們直接去看NioServerSocketChannel的unsafe()方法:

AbstractNioChannel.unsafe() -> AbstractChannel的unsafe變量 -> AbstractNioMessageChannel.newUnsafe()
最終我們可以確定返回的是NioMessageUnsafe;

那我直接看NioMessageUnsafe的register方法,這個方法是在父類AbstractUnsafe中定義

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            // 將NioServerSocketChannel的eventLoop 綁定到 MultithreadEventLoopGroup的next()返回的eventLoop
            AbstractChannel.this.eventLoop = eventLoop;
            // 如果當前線程是eventLoop則直接執行
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    // 提交一個eventLoop任務,任務會在EventLoop線程啟動后去之行,下面會講EventLoop線程是如何啟動的
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

繼續看下AbstractUnsafe的register0方法,(此方法不是立馬執行,而是等EventLoop線程啟動之后,這邊可以順便分許下這個方法)


        private void register0(ChannelPromise promise) {
            try {
                //代碼省略
                doRegister(); // 開始注冊,由外部類AbstractChannel實現
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
             //代碼省略
            if (isActive()) {
                    if (firstRegistration) {
                        // 第一次注冊通過處罰ChannelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 設置感興趣的事件
                        beginRead();
                    }
             }
                   
        }

NioServerSocketChannel繼承圖

由上圖我們找到doRegister方法在AbstractNioChannel中實現,AbstractChannel里僅僅是個空實現,

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 注冊java的ServerSocketChannel到EventLoop的Selector上去,並且把當前的netty的channel綁定到java的attachment上去,第二次參數0代表不訂閱事件
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
  1. 這邊注冊完成后但是沒有完成Accept事件的注冊,我們繼續研究下是怎么完成Accept事件的注冊,通過代碼我們得知如果不是第一次注冊直接調用AbstractChannel的beginRead()->AbstractNioChannel的doBeginRead(),然后完成注冊,
  2. 第一次調用的話是通過PipeLine觸發ChannelActive事件 ,然后調用HeadContext的channelActive方法,然后調用readIfIsAutoRead方法
      // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }

NioSeverSocketChannel在新建時候初始化到父類AbstractNioChannel是一個SelectionKey.OP_ACCEPT事件,因此這邊完成的是連接事件的監聽

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

到這里完成了nio ServerSocketChannel selector的注冊,

6. EventLoop類 run方法

看到這里有同學有疑問,這個提交任務,但是沒有看到哪里啟動了EventLoop的線程?帶着這個疑惑我們看下eventLoop的execute方法。

   @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop(); // 判斷是不是當前EventLoop線程
        addTask(task); // 提交job
        if (!inEventLoop) { //如果不是在EventLoop線程中
            startThread(); // 這個是開啟線程嗎?下面我會給分析下這個代碼
            // 移除job的一些操作
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
    

SingleThreadEventExecutor的startThread()這個方法是開啟EventLoop的線程(如果線程沒有啟動的話)

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            // cas判斷下避免多線程開啟線程,
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    // 開啟當前的EventLoop
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

SingleThreadEventExecutor的doStartThread()方法

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
              // 其他代碼省略。。。
              SingleThreadEventExecutor.this.run();
              // 其他代碼省略。。。
            } 
        });
    }

接下來我們直接看 SingleThreadEventExecutor.this.run()這個方法,其運行的是子類NioEventLoop類中的run方法:

 @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
          //代碼省略 這里面大致就是處理IO事件 以及 自定義Job事件  
        }
    }

7. ServerBootstrap類 doBind0方法

通過下面我們可以看到,此時像EventLoop線程池中提交了一個Runnable,里面會調用channel.bind(localAddress, promise)去綁定端口

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    // 綁定ip端口邏輯
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

那我們直接來看下channel.bind(localAddress, promise)具體看了什么,因為是服務端我們知道channel是NioServerSocketChannel,那我們去這里面尋找答案,果然在里面找到了最關鍵的一個方法,調用了pipeline的bind方法。pipeline默認是DefaultChannelPipeline

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

我們繼續往下看DefaultChannelPipeline的bind方法,調用了Tail節點的bind方法,然后往Head節點傳播

    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // 調用tail節點的bind
        return tail.bind(localAddress, promise);
    }
  

tail的bind方法定義在其父類AbstractChannelHandlerContext中

    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        ObjectUtil.checkNotNull(localAddress, "localAddress");
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            // 提交job,最終會被EventLoop執行
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }

這時候我們發現又是提交了一個Runnable去調用下一個的invokeBind方法

   private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                // 這行實際調用的是Header節點中的bind(this, localAddress, promise)
                ((ChannelOutboundHandler) handler()).bind()(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

直接看HeadContext中的實現方法

        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            // 這個unsafe是NioServerScoketChannel中產生的
            unsafe.bind(localAddress, promise);
        }

分析代碼在其方法里找到了AbstractUnsafe類最終調用的外部類(NioServerScoketChannel)doBind方法,我們得知道NioServerScoketChannel中肯定存在doBind方法的實現l類

        @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // 代碼省略
             try {
                //核心代碼出現,  
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
          // 代碼省略
        }

NioServerScoketChannel的doBind方法

    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //獲取java的channel然后開始綁定端口
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

綜上自此完成端口的綁定

總結一下

根據以上源碼分析,我們大致能夠清晰看到Netty是如何去封裝服務端的端口綁定,下面我們總結下主要流程

  1. 初始化netty channel,設置一些屬性,初始化pipeline等操作
  2. 注冊channel
  • 綁定channel設置EventLoop
  • 將初始化的java channel綁定到EventLoop的selector上
  • 啟動EventLoop的run方法,用於處理Io事件
  • pipeline觸發fireChannelActive注冊Accept事件
  1. 執行bind方法,

結束

識別下方二維碼!回復: 入群 ,掃碼加入我們交流群!

點贊是認可,在看是支持


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM