Netty(6)源碼-服務端與客戶端創建


原生的NIO類圖使用有諸多不便,Netty向用戶屏蔽了細節,在與用戶交界處做了封裝。

一、服務端創建時序圖

步驟一:創建ServerBootstrap實例

ServerBootstrap是Netty服務端的啟動輔助類,它提供了一些列的方法用於設置參數,由於參數太多,使用builder模式。

步驟二:設置並且綁定好Reactor線程池

Netty中的Reactor線程池是EventLoopGroup,它實際上就是EventLoop數組。EventLoop的職責是處理所有注冊到本線程多路復用器Selector上的Channel。Selector的輪詢操作由綁定的EventLoop線程run方法驅動,在一個循環體內循環執行。值得說明的是,EventLoop的職責不僅僅是處理網絡I/O事件用戶自定義的Task定時任務Task也統一由EventLoop負責處理,這樣線程模型就實現了統一。從調度層面看,也不存在從EvenetLoop線程中再啟動其他類型的線程用於異步執行另外的任務,這樣就避免了多線程的並發操作和鎖競爭,提升了I/O線程的處理和調度性能。

步驟三:設置並綁定服務端Channel

服務端需要創建ServerSocketChannel,對原生NIO類庫進行了封裝,對應是NioServerSocketChannel

對用戶而言,不需要關心服務端Channel的底層實現細節和工作原理,只需要指定具體是哪種服務端

Netty的ServerBootstrap提供了channel方法用於指定服務端Channel的類型。Netty通過工廠反射創建NioServerSocketChannel對象。

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

步驟四:鏈路建立的時候創建並且初始化ChannelPipeline。

它本質上是一個負載處理網絡事件的職責鏈,負載管理和執行ChannelHanler。網絡事件以事件流的形式在ChannelPipeline中流轉,由ChannelPipeline根據ChannelHandler的執行策略調度ChannelHandler的執行。典型的網絡事件如下:

(1)鏈路注冊;

(2)鏈路激活;

(3)鏈路斷開;

(4)接收到請求消息;

(5)請求消息接收並處理完畢;

(6)發送應答消息;

(7)鏈路發生異常;

(8)發生用戶自定義事件。

步驟五:初始化ChannelPipeline完成之后,添加並設置ChannelHandler。

ChannelHandler是Netty提供給用戶定制和擴展的關鍵接口。利用ChannelHandler用戶可以完成大多數的功能定制,例如消息編解碼、心跳、安全認證、TSL/SSL認證、流量控制和流量整形等。Netty同時也提供了大量的系統ChannelHandler供用戶使用,比較實用的系統ChannelHandler總結如下:

(1)系統編解碼框架——ByteToMessageCodec

(2)通用基於長度的半包解碼器——LengthFieldBasedFrameDecoder;

(3)碼流日志打印Handler——LoggingHandler

(4)SSL安全認證Handler——SslHandler

(5)鏈路空閑檢測Handler——IdleStateHandler

(6)流量整形Handler——ChannelTrafficShapingHandler;

(7)Base64編解碼——Base64DecoderBase64Encoder

.childHandler(new ChannelInitializer<SocketChannel>() {  
            @Override  
            public void initChannel(SocketChannel ch)  
                throws Exception {  
                ch.pipeline().addLast(  
                    new EchoServerHandler());  
            }  
            }); 

步驟6:綁定並啟動監聽端口。

在綁定監聽端口之前系統會做一系列的初始化和檢測工作,完成之后,會啟動監聽端口,並將ServerSocketChannel注冊到Selector上監聽客戶端連接,相關代碼如下。

   public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

步驟7:Selector輪詢。

由Reactor線程NioEventLoop負責調度和執行Selector輪詢操作,選擇准備就緒的Channel集合,相關代碼如下

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;  
        try {  
 
                //此處代碼省略...  
                int selectedKeys = selector.select(timeoutMillis);  
                selectCnt ++;  
                //此處代碼省略...  

步驟8:當輪詢到准備就緒的Channel之后,就由Reactor線程NioEventLoop執行ChannelPipeline的相應方法,最終調度並執行ChannelHandler

步驟9:執行Netty系統ChannelHandler和用戶添加定制的ChannelHandler。

ChannelPipeline根據網絡事件的類型,調度並執行ChannelHandler,相關代碼如下。

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

二、Netty服務端創建源碼分析

1. 創建線程組:

  通常會創建兩個EventLoopGroup,也可以只創建一個並共享。

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup實際就是Reactor線程池,負責調度和執行具體的任務:

  client接入

  網絡讀寫事件處理

  用戶自定義任務

  定時任務

通過ServerBootstrapgroup方法將2個EventLoopGroup實例傳入:

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //1. 調用父類的group方法傳入parentGroup
        super.group(parentGroup);
        //2. 設置childGroup
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

如果只傳一個參數,則2個線程池會被重用。

    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

2. 設置服務端Channel用於端口監聽和客戶端鏈路接入:

根據傳入的channel class創建對應的服務端Channel,調用的是ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

3. 下面要設定服務端TCP參數:

Netty使用一個LinkedHashMap來保存

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

主要參數是TCP的backlog參數,底層C對應的接口為:

int listen(int fd, int backlog);

backlog指定了內核為此套接字接口排隊的最大連接個數,內核要為套接字維護2個隊列:未連接隊列已連接隊列,根據TCP三路握手的三個分節分隔這2個隊列。

服務器處於listen狀態時,收到客戶端syn分節(connect)時在未完成隊列中創建一個新的條目,然后用三路握手的第二個分節即服務器的syn響應客戶端,此條目在第三個分節到達前(客戶端對服務器syn的ack)一直保留在未完成隊列中,如果三路握手完成,該條目將從未完成連接隊列搬到已完成隊列尾部。

當進程調用accept時,從已完成隊列中的頭部取出一個條目給進程,當已完成隊列為空時進程將進入睡眠,直到有條目在已完成隊列中才喚醒。backlog則用來規定2個隊列總和的最大值,大多數實現值為5,但是在高並發場景中顯然不夠,比如Lighttpd中此值達到128*8。需要設置此值更大原因是未完成連接隊列可能因為客戶端syn的到達以及等待握手第三個分節的到達延時而增大。Netty默認是100,用戶可以調整。

4. 下面可以為啟動輔助類和其父類分別指定Handler。

本質區別就是:

  • ServerBootstrap中的Handler是NioServerSocketChannel使用的,所有連接該監聽端口的客戶端都會執行它;
  • 父類AbstractBootstrap中的Handler是個工廠類,它為每個新接入的客戶端都創建一個新的Handler;

5. 服務端啟動的最后一步,就是綁定本地端口,啟動服務:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

看紅色標注的initAndRegister方法:

調用 channelFactory.newChannel()常見NioServerSocketChannel

然后調用init(channel)方法,由具體子類實現,這里實現的子類是ServerBootrap.

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

下面關注init方法,主要完成了以下功能:

(1). 設置Socket參數和NioServerSocketChannel的附加屬性,代碼如下:

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

(2).將AbstractBootstrap的Handler添加到NioServerSocketChannel的ChannelPipeline中,將用於服務端注冊的Handler ServerBootstrapAcceptor添加到ChannelPipeline中,代碼如下:

p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
                // In this case the initChannel(...) method will only be called after this method returns. Because
                // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
                // placed in front of the ServerBootstrapAcceptor.
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

到此處,Netty服務端監聽的相關資源已經初始化完畢,就剩下最后一步-

6. 注冊NioServerSocketChannel到Reactor線程的多路復用器上,然后輪詢客戶端連接事件。

在分析注冊代碼之前,我們先通過下圖看看目前NioServerSocketChannel的ChannelPipeline的組成:

通過Debug最終發現在AbstractChannel類中的register方法上:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

首先判斷是否是NioEventLoop自身發起的操作,如果是,則不存在並發操作,直接執行Channel注冊;如果由其它線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,由於是由ServerBootstrap所在線程執行的注冊操作,所以會將其封裝成Task投遞到NioEventLoop中執行:

        private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. //
                        // See https://github.com/netty/netty/issues/4805
 beginRead(); } }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

將NioServerSocketChannel注冊到NioEventLoop的Selector上,代碼如下:

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

大伙兒可能會很詫異,應該注冊OP_ACCEPT(16)到多路復用器上,怎么注冊0呢?0表示只注冊,不監聽任何網絡操作。這樣做的原因如下:

  • 注冊方法是多態的,它既可以被NioServerSocketChannel用來監聽客戶端的連接接入,也可以用來注冊SocketChannel,用來監聽網絡讀或者寫操作;
  • 通過SelectionKey的interestOps(int ops)方法可以方便的修改監聽操作位。所以,此處注冊需要獲取SelectionKey並給AbstractNioChannel的成員變量selectionKey賦值。

注冊成功之后,觸發ChannelRegistered事件,方法如下:

                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();

Netty的HeadHandler不需要處理ChannelRegistered事件,所以,直接調用下一個Handler,當ChannelRegistered事件傳遞到TailHandler后結束,TailHandler也不關心ChannelRegistered事件。

ChannelRegistered事件傳遞完成后,判斷ServerSocketChannel監聽是否成功,如果成功,需要出發NioServerSocketChannel的ChannelActive事件,判斷方法即isActive().

          pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }

其中isActive()是一個多態方法,如果是服務端則,判斷監聽是否啟動;如果是客戶端,判斷TCP連接是否完成。ChannelActive事件在ChannelPipeline()傳遞,完成之后根據配置決定是否自動觸發Channel的讀操作。

讀方法最終到:

        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

由於不同類型的Channel對讀的准備工作不同,因此doBeginRead也是個多態方法。

對於NIO通信,無論是客戶端,還是服務端,都要設置網絡監聽操作位為自己感興趣的,對於NioServerSocketChannel感興趣的是OP_ACCEPT(16),於是修改操作位:

 @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

某些情況下,當前監聽的操作類型和Channel關心的網絡事件是一致的,不需要重復注冊,所以增加了&的判斷。JDK SelectionKey有4種操作類型,分別為:

(1) OP_READ = 1 <<0;

(2) OP_WRTE = 1 <<2;

(3) OP_CONNECT = 1 <<3;

(4) OP_ACCEPT = 1 <<4;

此時,服務器監聽啟動部分源碼已經分析結束。

三、客戶端接入源碼分析

負責處理網絡讀寫、連接和客戶端請求介入的Reactor線程就是NioEventLoop,下面分析如何處理新的客戶端接入。

當多路復用器檢測到新的Channel時候,默認執行processSelectedKeysOptimized方法.

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

由於Channel的Attachment是NioServerSocketChannel,所以執行processSelectedKey方法。

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

繼續點入該方法debug,由於監聽的是連接操作,會執行unsafe.read()方法。由於不同的Channel執行不同的操作,所以NioUnsafe被設計為接口。

debug發現使用的是:

其read()方法如下所示:

 @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf); if (localRead == 0) {
                            break;
                        }
                       //代碼省略...
    }

doReadMessages方法進行分析,發現它實際就是接受新的客戶端連接並且創建NioSocketChannel:

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept(); try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

接收到新的連接之后,觸發ChannelPipeLine的ChannelRead方法,代碼如下:

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }

於是觸發pipeLine調用鏈,事件在ChannelPipeline中傳遞,執行ServerBootstrapAcceptor中的channelRead方法,代碼如下:

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler); //(1) for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }
        //(2)
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        //(3)
            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

上面方法主要分為3個步驟:

(1) 加入childHandler到客戶端SocketChannel的ChannelPipeline中

(2) 設置SocketChannel的TCP參數

(3) 注冊SocketChannel到多路復用器

注意這里register注冊也是注冊操作位為0.

執行完注冊后,緊接着會觸發ChannelReadComplete事件。

Netty的Header和Tailer本身不關於這個事件,因此ChannelReadComplete是直接透傳, 執行完ChannelReadComplete后,接着執行PipeLine的read()方法,最終到HeadHandler的read()方法。read()方法在前面說過,會修改操作位,此時這里debug發現把操作位修改OP_READ。

此時,客戶端連接處理完成,可以進行網絡讀寫等I/O操作。

四、Netty客戶端創建流程

1、用戶線程創建Bootstrap

Bootstrap b = new Bootstrap();

Bootstrap是Socket客戶端創建工具類,通過API設置創建客戶端相關的參數,異步發起客戶端連接。

2、創建處理客戶端連接、IO讀寫的Reactor線程組NioEventLoopGroup

EventLoopGroup group = new NioEventLoopGroup();

3、通過Bootstrap的ChannelFactory和用戶指定的Channel類型創建用於客戶端連接的NioSocketChannel

b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)

 此處的NioSocketChannel類似於Java NIO提供的SocketChannel。

4、創建默認的channel Handler pipeline

b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>()
  {
       @Override
       public void initChannel(SocketChannel ch) throws Exception
       {
         ch.pipeline().addLast(new HelloClientHandler());
        }
  });

用於調度和執行網絡事件。

5、異步發起TCP連接

 // 發起異步連接操作
 ChannelFuture f = b.connect(host, port).sync();

SocketChannel執行connect()操作后有以下三種結果:

  • 連接成功,然會true;
  • 暫時沒有連接上,服務器端沒有返回ACK應答,連接結果不確定,返回false。此種結果下,需要將NioSocketChannel中的selectionKey設置為OP_CONNECT,監聽連接結果;
  • 接連失敗,直接拋出I/O異常

6、注冊對應的網絡監聽狀態位到多路復用器

7、由多路復用器在I/O中輪詢個Channel,處理連接結果

8、如果連接成功,設置Future結果,發送連接成功事件,觸發ChannelPipeline執行

9、由ChannelPipeline調度執行系統和用戶的ChannelHandler,執行業務邏輯

五、客戶端創建源碼分析

Netty客戶端創建流程非常繁瑣,這里只針對關鍵步驟進行分析。

5.1 客戶端連接輔助類Bootstrap

1. 設置I/O線程組:只需要一個線程組EventLoopGroup

2. TCP參數設置:

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

主要TCP參數如下:

  • (1) SO_TIMEOUT: 控制讀取操作將阻塞多少毫秒,如果返回值為0,計時器就被禁止了,該線程將被無限期阻塞。
  • (2) SO_SNDBUF: 套接字使用的發送緩沖區大小
  • (3) SO_RCVBUF: 套接字使用的接收緩沖區大小
  • (4) SO_REUSEADDR : 是否允許重用端口
  • (5) CONNECT_TIMEOUT_MILLIS: 客戶端連接超時時間,原生NIO不提供該功能,Netty使用的是自定義連接超時定時器檢測和超時控制
  • (6) TCP_NODELAY : 是否使用Nagle算法

3. channel接口

同樣使用反射創建NioSocketChannel

4. 設置Handler接口

Bootstrap為了簡化Handler的編排,提供了ChannelInitializer,當TCP鏈路注冊成功后,調用initChannel接口:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
        // the handler.
        if (initChannel(ctx)) {
            // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
            // miss an event.
            ctx.pipeline().fireChannelRegistered();
        } else {
            // Called initChannel(...) before which is the expected behavior, so just forward the event.
            ctx.fireChannelRegistered();
        }
    }

其中InitChannel為抽象接口,即下面紅色標注的代碼,用戶便是在這個方法中設置ChannelHandler:

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

5.2 客戶端連接操作

1. 首先要創建和初始化NioSocketChannel

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
     //.....

創建之后,初始化,然后在注冊:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

2. 創建完成后,連接操作會異步執行,最終調用到HeadContext的connect方法.

        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

的connect操作如下:

 if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
//...

3. doConnect三種可能結果

  • 連接成功,然會true;
  • 暫時沒有連接上,服務器端沒有返回ACK應答,連接結果不確定,返回false。此種結果下,需要將NioSocketChannel中的selectionKey設置為OP_CONNECT,監聽連接結果;
  • 接連失敗,直接拋出I/O異常

異步返回之后,需要判斷連接結果,如果成功,則觸發ChannelActive事件。最終會將NioSocketChannel中的selectionKey設置為SelectionKey.OP_READ,用於監聽網絡讀操作。

5.3 異步連接結果通知

NioEventLoop的Selector輪詢客戶端連接Channel,當服務端返回應答后,進行判斷。依舊是NioEventLoop中的processSelectedKey方法:

 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

下面分析finishConnect方法:

        @Override
        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                doFinishConnect(); //判斷SocketChannel的連接結果,true表示成功
                fulfillConnectPromise(connectPromise, wasActive); //觸發鏈路激活
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }

fulfillConnectPromise方法則觸發鏈路激活事件,並由ChannelPipeline進行傳播:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
            // We still need to ensure we call fireChannelActive() in this case.
            boolean active = isActive();

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

跟之前類似,將網絡監聽修改為讀操作。

5.4 客戶端連接超時機制

由Netty自己實現的客戶端超時機制,在AbstractNioChannel的connect方法中:

 public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                if (connectPromise != null) {
                    // Already a connect in process.
                    throw new ConnectionPendingException();
                }

                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); }

                    promise.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isCancelled()) {
                                if (connectTimeoutFuture != null) {
                                    connectTimeoutFuture.cancel(false);
                                }
                                connectPromise = null; close(voidPromise()); } }
                    });
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }

一旦超時定時器執行,則說明客戶端超時,構造異常,將異常結果設置到connectPromise中,同時關閉客戶端句柄。

如果在超時之前獲取結果,則直接刪除定時器,防止其被觸發。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM