Spring Boot啟動過程(七):Connector初始化


  Connector實例的創建已經在Spring Boot啟動過程(四):Spring Boot內嵌Tomcat啟動中提到了:

  

   Connector是LifecycleMBeanBase的子類,先是設置LifecycleState為LifecycleState.NEW,構造首先執行setProtocol,設置protocolHandlerClassName為"org.apache.coyote.http11.Http11NioProtocol"事實上它默認值就是這個,然后通過反射創建此協議處理器的實例,此時開始執行Http11NioProtocol的構造函數:

    public Http11NioProtocol() {
        super(new NioEndpoint());
    }

  初始化NioEndpoint過程中初始化了NioSelectorPool,NioSelectorShared默認為true,即所有的SocketChannel共享一個Selector;設置pollerThreadCount,socket超時時間等。然后就是將new出來的NioEndPoint一路super,直到AbstractProtocol:

    public AbstractProtocol(AbstractEndpoint<S> endpoint) {
        this.endpoint = endpoint;
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }

  關於soLinger可以參考內嵌Tomcat的Connector對象的靜態代碼塊。之后是外層AbstractHttp11Protocol的構造函數,Handler就是這里初始化並set的,這部分和上一塊所有的set最后都是到endpoint的:

    public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
        super(endpoint);
        setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
        ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
        setHandler(cHandler);
        getEndpoint().setHandler(cHandler);
    }

  回到Connector將初始化好的Http11NioProtocol傳給this.protocolHandler(AbstractProtocol<S>實現了protocolHandler),之后就是下面這幾句代碼就結束Connector初始化了:

        if (!Globals.STRICT_SERVLET_COMPLIANCE) {
            URIEncoding = "UTF-8";
            URIEncodingLower = URIEncoding.toLowerCase(Locale.ENGLISH);
        }

  之后就是啟動了,在Spring Boot啟動過程(二)提到過一點,在finishRefresh中,由於AbstractApplicationContext被EmbeddedWebApplicationContext實現,所以執行的是:

    @Override
    protected void finishRefresh() {
        super.finishRefresh();
        EmbeddedServletContainer localContainer = startEmbeddedServletContainer();
        if (localContainer != null) {
            publishEvent(
                    new EmbeddedServletContainerInitializedEvent(this, localContainer));
        }
    }

  startEmbeddedServletContainer方法中的localContainer.start的前幾句代碼:

            addPreviouslyRemovedConnectors();
            Connector connector = this.tomcat.getConnector();
            if (connector != null && this.autoStart) {
                startConnector(connector);
            }

  addPreviouslyRemovedConnectors方法將Spring Boot啟動過程(四):Spring Boot內嵌Tomcat啟動中提到的從Service中解綁的Connector綁定回來了。具體綁定方法:

public void addConnector(Connector connector) {

        synchronized (connectorsLock) {
            connector.setService(this);
            Connector results[] = new Connector[connectors.length + 1];
            System.arraycopy(connectors, 0, results, 0, connectors.length);
            results[connectors.length] = connector;
            connectors = results;

            if (getState().isAvailable()) {
                try {
                    connector.start();
                } catch (LifecycleException e) {
                    log.error(sm.getString(
                            "standardService.connector.startFailed",
                            connector), e);
                }
            }

            // Report this property change to interested listeners
            support.firePropertyChange("connector", null, connector);
        }

    }

  加了同步鎖,綁定了一下service,start流程之前說過好多次,就不細說了。Connector的initInternal,先是super(LifecycleMBeanBase);之后兩句引入了CoyoteAdapter:protected Adapter adapter = new CoyoteAdapter(this),protocolHandler.setAdapter(adapter);確保parseBodyMethodsSet有默認值,沒有就設置HTTP方法為支持請求體的POST方法為默認;接着初始化protocolHandler,先是關於ALPN支持的,我這里沒走,直接進入spuer.init(AbstractProtocol),生成ObjectName(MBean之前說過的):並注冊Registry.getRegistry(null, null).registerComponent(this, oname, null),生成並注冊線程池和Global Request Processor:

 

 

  接着endpoint的init,首先是testServerCipherSuitesOrderSupport方法,這個方法只判斷jdk7及以下版本,我這不走也沒什么內容其實;然后是super.init(AbstractEndpoint),然而此時其實並沒有走:

    public void init() throws Exception {
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
    }

  Connetor的init結束了。然后Connetor的startInternal,這里其實只做了一件事:protocolHandler.start()。協議處理器的start又start了endpoint:

    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

  這次走到了bind,首先ServerSocketChannel.open:

    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }

  然后設置超時時間,綁定端口和IP,設置積壓(backlog)數量,配置阻塞serverSock.configureBlocking(true)關於阻塞模式,在網上摘抄了一段:

Tomcat在使用Java NIO的時候,將ServerSocketChannel配置成阻塞模式,這樣可以方便地對ServerSocketChannel編寫程序。當accept方法獲得一個SocketChannel,並沒有立即從線程池中取出一個線程來處理這個SocketChannel,而是構建一個OP_REGISTER類型的PollerEvent,並放到Poller.events隊列中。Poller線程會處理這個PollerEvent,發現是OP_REGISTER類型,會在Poller.selector上注冊一個這個SocketChannel的OP_READ就緒事件。因為Java NIO的wakeup特性,使用wakeupCount信號量控制Selector.wakeup()方法,非阻塞方法Selector.selectNow()和阻塞方法Selector.select()的調用。我們在編寫Java NIO程序時候也可以參考這種方式。在SocketChannel上讀的時候,分成非阻塞模式和阻塞模式。

非阻塞模式,如果讀不到數據,則直接返回了;如果讀到數據則繼續讀。
阻塞模式。如果第一次讀取不到數據,會在NioSelectorPool提供的Selector對象上注冊OP_READ就緒事件,並循環調用Selector.select(long)方法,超時等待OP_READ就緒事件。如果OP_READ事件已經就緒,並且接下來讀到數據,則會繼續讀。read()方法整體會根據readTimeout設置進行超時控制。若超時,則會拋出SocketTimeoutException異常。

在SocketChannel上寫的時候也分成非阻塞模式和阻塞模式。

非阻塞模式,寫數據之前不會監聽OP_WRITE事件。如果沒有成功,則直接返回。
阻塞模式。第一次寫數據之前不會監聽OP_WRITE就緒事件。如果沒有寫成功,則會在NioSelectorPool提供的selector注冊OP_WRITE事件。並循環調用Selector.select(long)方法,超時等待OP_WRITE就緒事件。如果OP_WRITE事件已經就緒,並且接下來寫數據成功,則會繼續寫數據。write方法整體會根據writeTimeout設置進行超時控制。如超時,則會拋出SocketTimeoutException異常。

在寫數據的時候,開始沒有監聽OP_WRITE就緒事件,直接調用write()方法。這是一個樂觀設計,估計網絡大部分情況都是正常的,不會擁塞。如果第一次寫沒有成功,則說明網絡可能擁塞,那么再等待OP_WRITE就緒事件。

阻塞模式的讀寫方法沒有在原有的Poller.selector上注冊就緒事件,而是使用NioSelectorPool類提供的Selector對象注冊就緒事件。這樣的設計可以將各個Channel的就緒事件分散注冊到不同的Selector對象中,避免大量Channel集中注冊就緒事件到一個Selector對象,影響性能。

http://www.linuxidc.com/Linux/2015-02/113900p2.htm

  為了注釋,貼一下接下來的代碼:

        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }

  實例化private volatile CountDownLatch stopLatch為pollerThreadCount數量,用閉鎖應該是希望多個pollerThread同時開始執行吧,后面確定一下。如果有需要初始化SSL:initialiseSsl吐槽下這里命名,方法體里用的SSL偏偏這里首字母大寫,大家不要學。selectorPool.open():

    public void open() throws IOException {
        enabled = true;
        getSharedSelector();
        if (SHARED) {
            blockingSelector = new NioBlockingSelector();
            blockingSelector.open(getSharedSelector());
        }
    }

 

}

  AbstractEndpoint的bind方法就執行完了,綁定狀態設為BOUND_ON_START然后執行startInternal,這個方法在NioEndpoint中。先判斷是否是正在運行狀態,如果不是就置為是,暫停狀態置為否,然后初始化了三個SynchronizedStack,這是Tomcat自定義的簡化同步棧,自定義的結構好處就是既能滿足需要又能提高時間空間利用率,最合適自己的場景:

            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());

  createExecutor線程池並設置給任務隊列:

  initializeConnectionLatch根據配置設定最大允許的並發連接數maxConnections,實現方法是自定義的鎖結構LimitLatch:

        if (maxConnections==-1) return null;
        if (connectionLimitLatch==null) {
            connectionLimitLatch = new LimitLatch(getMaxConnections());
        }

  啟動poller線程的代碼直接看吧,沒啥好解釋的:

            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

  啟動acceptor線程是startAcceptorThreads一樣的方式,代碼就不貼了。endpoint的start之后,啟動了一個異步超時線程(例Thread[http-nio-8080-AsyncTimeout,5,main]),這個線程會每隔一段時間檢查每一個等待隊列中的協議處理器,判斷如果超時了,就會給它發一個超時事件:socketWrapper.processSocket(SocketEvent.TIMEOUT, true)

            while (asyncTimeoutRunning) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore
                }
                long now = System.currentTimeMillis();
                for (Processor processor : waitingProcessors) {
                   processor.timeoutAsync(now);
                }
                // Loop if endpoint is paused
                while (endpoint.isPaused() && asyncTimeoutRunning) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }
            while (asyncTimeoutRunning) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore
                }
                long now = System.currentTimeMillis();
                for (Processor processor : waitingProcessors) {
                   processor.timeoutAsync(now);
                }
                // Loop if endpoint is paused
                while (endpoint.isPaused() && asyncTimeoutRunning) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }

  至此connector.start執行結束,因為我也沒注冊什么監聽器,所以這部分沒提,而且相關內容之前都有寫。當循環綁定connect結束后,暫存的綁定信息也就沒用了,移除掉。

  回到TomcatEmbeddedServletContainer,接下來:

            if (connector != null && this.autoStart) {
                startConnector(connector);
            }

  startConnector:

            for (Container child : this.tomcat.getHost().findChildren()) {
                if (child instanceof TomcatEmbeddedContext) {
                    ((TomcatEmbeddedContext) child).deferredLoadOnStartup();
                }
            }
            // Earlier versions of Tomcat used a version that returned void. If that
            // version is used our overridden loadOnStart method won't have been called
            // and the original will have already run.
            super.loadOnStartup(findChildren());

  具體什么版本會怎么樣,就不考證了,反正該執行的都會執行,只不過位置可能不一樣。實際上,方法取出了所有的Wapper(StandardWrapper),執行了它們的load方法。load方法取出了Servlet綁定並記錄加載時間,並設置了jsp監控的mbean,期間還檢查了servlet的安全注解比如是否允許訪問和傳輸是否加密(EmptyRoleSemantic,TransportGuarantee):

            InstanceManager instanceManager = ((StandardContext)getParent()).getInstanceManager();
                servlet = (Servlet) instanceManager.newInstance(servletClass);
            processServletSecurityAnnotation(servlet.getClass());

  初始化servlet,比如輸入輸出的buffer sizes大小是否合理(最小256):

            instanceInitialized = true;

  在load前后都有判斷如果bean加載器和當前線程上下文加載器不同時用在用的bean加載器替換當前線程上下文類加載器,因為用上下文加載器的話,這一步加載的東西就不能被多個Context共享了,后面又來了一次,推測是為了防止加載的過程中為了避免SPI的加載問題而被替換為線程上下文加載器。其實這里已經和本篇博客沒什么關系了,但是秉着流水賬的風格,把它貼完:

            Context context = findContext();
            ContextBindings.unbindClassLoader(context, getNamingToken(context), getClass().getClassLoader());
        if (ContextAccessController.checkSecurityToken(obj, token)) {
            Object o = clObjectBindings.get(classLoader);
            if (o == null || !o.equals(obj)) {
                return;
            }
            clBindings.remove(classLoader);
            clObjectBindings.remove(classLoader);
        }

  本來是因為Tomcat一個BUG造成CLOSE_WAIT問題屢的這些代碼,然而這里其實別沒有直接到,因為只是啟動,還沒到運行,以后如果有機會寫運行部分再說吧。

  以上。

==========================================================

咱最近用的github:https://github.com/saaavsaaa

微信公眾號:

                      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM