Connector實例的創建已經在Spring Boot啟動過程(四):Spring Boot內嵌Tomcat啟動中提到了:
Connector是LifecycleMBeanBase的子類,先是設置LifecycleState為LifecycleState.NEW,構造首先執行setProtocol,設置protocolHandlerClassName為"org.apache.coyote.http11.Http11NioProtocol"事實上它默認值就是這個,然后通過反射創建此協議處理器的實例,此時開始執行Http11NioProtocol的構造函數:
public Http11NioProtocol() { super(new NioEndpoint()); }
初始化NioEndpoint過程中初始化了NioSelectorPool,NioSelectorShared默認為true,即所有的SocketChannel共享一個Selector;設置pollerThreadCount,socket超時時間等。然后就是將new出來的NioEndPoint一路super,直到AbstractProtocol:
public AbstractProtocol(AbstractEndpoint<S> endpoint) { this.endpoint = endpoint; setSoLinger(Constants.DEFAULT_CONNECTION_LINGER); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); }
關於soLinger可以參考內嵌Tomcat的Connector對象的靜態代碼塊。之后是外層AbstractHttp11Protocol的構造函數,Handler就是這里初始化並set的,這部分和上一塊所有的set最后都是到endpoint的:
public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) { super(endpoint); setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); ConnectionHandler<S> cHandler = new ConnectionHandler<>(this); setHandler(cHandler); getEndpoint().setHandler(cHandler); }
回到Connector將初始化好的Http11NioProtocol傳給this.protocolHandler(AbstractProtocol<S>實現了protocolHandler),之后就是下面這幾句代碼就結束Connector初始化了:
if (!Globals.STRICT_SERVLET_COMPLIANCE) { URIEncoding = "UTF-8"; URIEncodingLower = URIEncoding.toLowerCase(Locale.ENGLISH); }
之后就是啟動了,在Spring Boot啟動過程(二)提到過一點,在finishRefresh中,由於AbstractApplicationContext被EmbeddedWebApplicationContext實現,所以執行的是:
@Override protected void finishRefresh() { super.finishRefresh(); EmbeddedServletContainer localContainer = startEmbeddedServletContainer(); if (localContainer != null) { publishEvent( new EmbeddedServletContainerInitializedEvent(this, localContainer)); } }
startEmbeddedServletContainer方法中的localContainer.start的前幾句代碼:
addPreviouslyRemovedConnectors(); Connector connector = this.tomcat.getConnector(); if (connector != null && this.autoStart) { startConnector(connector); }
addPreviouslyRemovedConnectors方法將Spring Boot啟動過程(四):Spring Boot內嵌Tomcat啟動中提到的從Service中解綁的Connector綁定回來了。具體綁定方法:
public void addConnector(Connector connector) { synchronized (connectorsLock) { connector.setService(this); Connector results[] = new Connector[connectors.length + 1]; System.arraycopy(connectors, 0, results, 0, connectors.length); results[connectors.length] = connector; connectors = results; if (getState().isAvailable()) { try { connector.start(); } catch (LifecycleException e) { log.error(sm.getString( "standardService.connector.startFailed", connector), e); } } // Report this property change to interested listeners support.firePropertyChange("connector", null, connector); } }
加了同步鎖,綁定了一下service,start流程之前說過好多次,就不細說了。Connector的initInternal,先是super(LifecycleMBeanBase);之后兩句引入了CoyoteAdapter:protected Adapter adapter = new CoyoteAdapter(this),protocolHandler.setAdapter(adapter);確保parseBodyMethodsSet有默認值,沒有就設置HTTP方法為支持請求體的POST方法為默認;接着初始化protocolHandler,先是關於ALPN支持的,我這里沒走,直接進入spuer.init(AbstractProtocol),生成ObjectName(MBean之前說過的):並注冊Registry.getRegistry(null, null).registerComponent(this, oname, null),生成並注冊線程池和Global Request Processor:
接着endpoint的init,首先是testServerCipherSuitesOrderSupport方法,這個方法只判斷jdk7及以下版本,我這不走也沒什么內容其實;然后是super.init(AbstractEndpoint),然而此時其實並沒有走:
public void init() throws Exception { if (bindOnInit) { bind(); bindState = BindState.BOUND_ON_INIT; } }
Connetor的init結束了。然后Connetor的startInternal,這里其實只做了一件事:protocolHandler.start()。協議處理器的start又start了endpoint:
public final void start() throws Exception { if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } startInternal(); }
這次走到了bind,首先ServerSocketChannel.open:
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
然后設置超時時間,綁定端口和IP,設置積壓(backlog)數量,配置阻塞serverSock.configureBlocking(true)關於阻塞模式,在網上摘抄了一段:
Tomcat在使用Java NIO的時候,將ServerSocketChannel配置成阻塞模式,這樣可以方便地對ServerSocketChannel編寫程序。當accept方法獲得一個SocketChannel,並沒有立即從線程池中取出一個線程來處理這個SocketChannel,而是構建一個OP_REGISTER類型的PollerEvent,並放到Poller.events隊列中。Poller線程會處理這個PollerEvent,發現是OP_REGISTER類型,會在Poller.selector上注冊一個這個SocketChannel的OP_READ就緒事件。因為Java NIO的wakeup特性,使用wakeupCount信號量控制Selector.wakeup()方法,非阻塞方法Selector.selectNow()和阻塞方法Selector.select()的調用。我們在編寫Java NIO程序時候也可以參考這種方式。在SocketChannel上讀的時候,分成非阻塞模式和阻塞模式。 非阻塞模式,如果讀不到數據,則直接返回了;如果讀到數據則繼續讀。 阻塞模式。如果第一次讀取不到數據,會在NioSelectorPool提供的Selector對象上注冊OP_READ就緒事件,並循環調用Selector.select(long)方法,超時等待OP_READ就緒事件。如果OP_READ事件已經就緒,並且接下來讀到數據,則會繼續讀。read()方法整體會根據readTimeout設置進行超時控制。若超時,則會拋出SocketTimeoutException異常。 在SocketChannel上寫的時候也分成非阻塞模式和阻塞模式。 非阻塞模式,寫數據之前不會監聽OP_WRITE事件。如果沒有成功,則直接返回。 阻塞模式。第一次寫數據之前不會監聽OP_WRITE就緒事件。如果沒有寫成功,則會在NioSelectorPool提供的selector注冊OP_WRITE事件。並循環調用Selector.select(long)方法,超時等待OP_WRITE就緒事件。如果OP_WRITE事件已經就緒,並且接下來寫數據成功,則會繼續寫數據。write方法整體會根據writeTimeout設置進行超時控制。如超時,則會拋出SocketTimeoutException異常。 在寫數據的時候,開始沒有監聽OP_WRITE就緒事件,直接調用write()方法。這是一個樂觀設計,估計網絡大部分情況都是正常的,不會擁塞。如果第一次寫沒有成功,則說明網絡可能擁塞,那么再等待OP_WRITE就緒事件。 阻塞模式的讀寫方法沒有在原有的Poller.selector上注冊就緒事件,而是使用NioSelectorPool類提供的Selector對象注冊就緒事件。這樣的設計可以將各個Channel的就緒事件分散注冊到不同的Selector對象中,避免大量Channel集中注冊就緒事件到一個Selector對象,影響性能。 http://www.linuxidc.com/Linux/2015-02/113900p2.htm
為了注釋,貼一下接下來的代碼:
// Initialize thread count defaults for acceptor, poller if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; }
實例化private volatile CountDownLatch stopLatch為pollerThreadCount數量,用閉鎖應該是希望多個pollerThread同時開始執行吧,后面確定一下。如果有需要初始化SSL:initialiseSsl吐槽下這里命名,方法體里用的SSL偏偏這里首字母大寫,大家不要學。selectorPool.open():
public void open() throws IOException { enabled = true; getSharedSelector(); if (SHARED) { blockingSelector = new NioBlockingSelector(); blockingSelector.open(getSharedSelector()); } }
}
AbstractEndpoint的bind方法就執行完了,綁定狀態設為BOUND_ON_START然后執行startInternal,這個方法在NioEndpoint中。先判斷是否是正在運行狀態,如果不是就置為是,暫停狀態置為否,然后初始化了三個SynchronizedStack,這是Tomcat自定義的簡化同步棧,自定義的結構好處就是既能滿足需要又能提高時間空間利用率,最合適自己的場景:
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());
createExecutor線程池並設置給任務隊列:
initializeConnectionLatch根據配置設定最大允許的並發連接數maxConnections,實現方法是自定義的鎖結構LimitLatch:
if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); }
啟動poller線程的代碼直接看吧,沒啥好解釋的:
pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); }
啟動acceptor線程是startAcceptorThreads一樣的方式,代碼就不貼了。endpoint的start之后,啟動了一個異步超時線程(例Thread[http-nio-8080-AsyncTimeout,5,main]),這個線程會每隔一段時間檢查每一個等待隊列中的協議處理器,判斷如果超時了,就會給它發一個超時事件:socketWrapper.processSocket(SocketEvent.TIMEOUT, true)
while (asyncTimeoutRunning) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } long now = System.currentTimeMillis(); for (Processor processor : waitingProcessors) { processor.timeoutAsync(now); } // Loop if endpoint is paused while (endpoint.isPaused() && asyncTimeoutRunning) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } }
while (asyncTimeoutRunning) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } long now = System.currentTimeMillis(); for (Processor processor : waitingProcessors) { processor.timeoutAsync(now); } // Loop if endpoint is paused while (endpoint.isPaused() && asyncTimeoutRunning) { try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore } } }
至此connector.start執行結束,因為我也沒注冊什么監聽器,所以這部分沒提,而且相關內容之前都有寫。當循環綁定connect結束后,暫存的綁定信息也就沒用了,移除掉。
回到TomcatEmbeddedServletContainer,接下來:
if (connector != null && this.autoStart) { startConnector(connector); }
startConnector:
for (Container child : this.tomcat.getHost().findChildren()) { if (child instanceof TomcatEmbeddedContext) { ((TomcatEmbeddedContext) child).deferredLoadOnStartup(); } }
// Earlier versions of Tomcat used a version that returned void. If that // version is used our overridden loadOnStart method won't have been called // and the original will have already run. super.loadOnStartup(findChildren());
具體什么版本會怎么樣,就不考證了,反正該執行的都會執行,只不過位置可能不一樣。實際上,方法取出了所有的Wapper(StandardWrapper),執行了它們的load方法。load方法取出了Servlet綁定並記錄加載時間,並設置了jsp監控的mbean,期間還檢查了servlet的安全注解比如是否允許訪問和傳輸是否加密(EmptyRoleSemantic,TransportGuarantee):
InstanceManager instanceManager = ((StandardContext)getParent()).getInstanceManager();
servlet = (Servlet) instanceManager.newInstance(servletClass);
processServletSecurityAnnotation(servlet.getClass());
初始化servlet,比如輸入輸出的buffer sizes大小是否合理(最小256):
instanceInitialized = true;
在load前后都有判斷如果bean加載器和當前線程上下文加載器不同時用在用的bean加載器替換當前線程上下文類加載器,因為用上下文加載器的話,這一步加載的東西就不能被多個Context共享了,后面又來了一次,推測是為了防止加載的過程中為了避免SPI的加載問題而被替換為線程上下文加載器。其實這里已經和本篇博客沒什么關系了,但是秉着流水賬的風格,把它貼完:
Context context = findContext();
ContextBindings.unbindClassLoader(context, getNamingToken(context), getClass().getClassLoader());
if (ContextAccessController.checkSecurityToken(obj, token)) { Object o = clObjectBindings.get(classLoader); if (o == null || !o.equals(obj)) { return; } clBindings.remove(classLoader); clObjectBindings.remove(classLoader); }
本來是因為Tomcat一個BUG造成CLOSE_WAIT問題屢的這些代碼,然而這里其實別沒有直接到,因為只是啟動,還沒到運行,以后如果有機會寫運行部分再說吧。
以上。
==========================================================
咱最近用的github:https://github.com/saaavsaaa
微信公眾號: