Netty-ServerBootstrap


 

ServerBootstrap 為 netty 建立服務端的輔助類, 以 NIO為例,創建代碼如下:

 

public static void main(String[] args) throws Exception {
        
        ServerBootstrap bs = new ServerBootstrap();

        bs.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline()
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(65535))
                        .addLast(new Controller());
                    }
                }).bind(8080).sync().channel().closeFuture().sync();

        
    }

 

 

 

 

核心參數如下:

 
    //配置屬性,如 SO_KEEPALIVE 等private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
//acceot 的 子channel所綁定的 事件循環組"
private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler;

 

初始化流程主要為 綁定本地端口 -> 注冊自身到 EventLoop , 並注冊 accept 和 read 事件 -> EventLoop的主循環中會不斷的select注冊的channel的事件,並處理。

 

 

 

首先執行綁定,核心邏輯位於  io.netty.bootstrap.AbstractBootstrap.doBind(SocketAddress) 和  io.netty.bootstrap.AbstractBootstrap.initAndRegister()中

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        ..........if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
//綁定邏輯 doBind0(regFuture, channel, localAddress, promise);
return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

 

先來看 initAndRegister , 核心邏輯就是利用channelFactory初始化一個NioServerSocketChannel實例,並為其設置上config中的參數,然后將其注冊到EventLoop中,實際上是委托的channel的Unsafe來實現注冊的,核心邏輯位於 AbstractUnsafe.register0 中 完成注冊

 

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
//本例子中實際調用的是 NioServerSocketChannel的構造參數, 並為其設置感興趣的事件類型為 OP_ACCEPT channel
= channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

 

 

 1 void init(Channel channel) throws Exception {
 2         //設置屬性
15          ..........

17         p.addLast(new ChannelInitializer<Channel>() {
30             @Override
31             public void initChannel(final Channel ch) throws Exception {
32                 final ChannelPipeline pipeline = ch.pipeline();
33                 ChannelHandler handler = config.handler();
34                 if (handler != null) {
35                     pipeline.addLast(handler);
36                 }
37 
38                 ch.eventLoop().execute(new Runnable() {
39                     @Override
40                     public void run() {
                           //為NioServerSocketChannel 設置一個 默認的 channelhandler : ServerBootstrapAcceptor , 當發生 accept事件時,將 accept的channel注冊到 childEventLoop中
41                         pipeline.addLast(new ServerBootstrapAcceptor(
42                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
43                     }
44                 });
45             }
46         });
47     }

 

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
//執行channel到 eventloop的 selector doRegister(); neverRegistered
= false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise);
                 //觸發 InboundChannelHnader.channelRegistered 事件
pipeline.fireChannelRegistered();
 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) {
//觸發channelActive事件,並會為 channel 綁定上 read 事件 pipeline.fireChannelActive(); }
else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }

 

initAndRegister注冊成功后,開始執行真正的綁定端口邏輯,核心邏輯位於 NioSocketChannel.doBind0(SocketAddress) 中

 private void doBind0(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            SocketUtils.bind(javaChannel(), localAddress);
        } else {
            SocketUtils.bind(javaChannel().socket(), localAddress);
        }
    }

 

 

 

至此 綁定個成功, 當觸發 ACCEPT 事件時, 會觸發  NioServerSocketChannel.doReadMessages -> ServerBootstrapAcceptor.channelRead , 並將 子channel 注冊到 childEventLoop中

 

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
//注冊channel childGroup.register(child).addListener(
new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM