說起Tomcat的NIO,不得不提的就是Connector這個Tomcat組件。Connector是Tomcat的連接器,其主要任務是負責處理收到的請求,並創建一個Request和Response的對象,然后用一個線程用於處理請求,Connector會把Request和Response對象傳遞給該線程,該線程的具體的處理過程是Container容器的事了。
在tomcat啟動過程中,會初始化Connector,並調用Connector的startInternal()方法開啟Connector,開始監聽、處理請求。
想了解Tomcat NIO的工作方式,就得先了解一下Connector的實現原理。下面從三個方面來了解一下Connector組件:Connector的數據結構、Connector初始化以及Connector開啟。
Connector
Connector的數據結構
先了解一下Connector的數據結構。Connector的一個主要的屬性:ProtocolHandler protocolHandler(協議)
protocolHandler(協議)
- 維護服務器使用的協議,如http1.1等。ProtocolHandler是接口,實現類有Http11Nio2Protocol 、Http11Nio2Protocol等
- 維護服務提供的IO方式,負責EndPoint的初始化、啟動。目前有BIO、NIO、AIO等IO方式,來實現監聽端口、讀寫socket數據的功能。通過EndPoint封裝實現不同的IO方式
- EndPoint監聽到IO讀寫,交給Tomcat線程池中的一個線程來處理,SocketProcessor會根據protocolHandler采用的協議,調用協議的process方法處理請求。
- 維護adapter(適配器),可以將請求/響應數據進行適配
protocolHandler會找到socket對應的處理器(如Http11Processor),然后進行數據讀寫、適配,處理。請求由adapter最終會交給servlet處理
常說的BIO、NIO,主要的應用就在protocolHandler中。protocolHandler負責維護Connector使用的協議以及IO方式。在protocolHandler中,不同的IO方式,會使用不同的EndPoint,具體采用哪種IO方式,取決於采用哪個EndPoint,每一個EndPoint的實現類,都封裝了一種IO策略。若采用NIO,則為NioEndpoint。
Connector初始化
創建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協議配置,利用反射創建ProtocolHandler:
/** * Coyote Protocol handler class name. * Defaults to the Coyote HTTP/1.1 protocolHandler. */ protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; public Connector(String protocol) { //設置protocolHandlerClassName類名 setProtocol(protocol); // Instantiate protocol handler ProtocolHandler p = null; try { //根據server.xml中<connector/>標簽的protocol屬性值,獲取到對應的http協議類 Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; } if (Globals.STRICT_SERVLET_COMPLIANCE) { uriCharset = StandardCharsets.ISO_8859_1; } else { uriCharset = StandardCharsets.UTF_8; } } //設置protocolHandlerClassName類名 public void setProtocol(String protocol) { boolean aprConnector = AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseAprConnector(); //若配置了protocol="HTTP/1.1"或者沒配,則默認是Http11NioProtocol或者Http11AprProtocol if ("HTTP/1.1".equals(protocol) || protocol == null) { if (aprConnector) { setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol"); } else { setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol"); } } else if ("AJP/1.3".equals(protocol)) { if (aprConnector) { setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol"); } else { setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol"); } } else { //直接取配置的類名 setProtocolHandlerClassName(protocol); } }
以Tomcat8.5.20為例,這里默認是http1.1的NIO。
Connector.start()開啟
Connector初始化后,調用start方法開啟。主要涉及一下幾個方法:
Connector的startInternal()方法,會調用protocolHandler.start();
protocolHandler中會調用endpoint.start(),從而達到開啟endpoint、監聽端口、讀寫Socket的目的:
以Tomcat8.5.20為例,這里默認是http1.1的NIO。 Connector.start()開啟 Connector初始化后,調用start方法開啟。主要涉及一下幾個方法: Connector的startInternal()方法,會調用protocolHandler.start(); protocolHandler中會調用endpoint.start(),從而達到開啟endpoint、監聽端口、讀寫Socket的目的:
至此,Connector完成了開啟的過程,開啟監聽端口、可以讀寫Socket了。
總結一下,關於Connector:
創建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協議配置,利用反射創建ProtocolHandler。
ProtocolHandler負責維護Connector使用的協議以及IO方式,不同的IO方式如BIO、NIO、AIO封裝在EndPoint中
開啟Connector時,會開啟protocolHandler,從而達到EndPoint的開啟,開始監聽端口、讀寫socket數據了
protocolHandler中將請求拿到的數據進行適配,通過adapter適配成Request和Response對象,最終交給Container去處理
下面重點就來了,NIO。
Tomcat NIO
Tomcat在處理客戶端請求時,讀寫socket數據是一種網絡IO操作。目前Tomcat有幾種IO方式,分別是BIO(同步阻塞),NIO(同步非阻塞)和AIO(異步非阻塞)。不同IO方式的讀寫機制,被封裝在了Endpoint中。BIO、AIO不再贅述。這里主要看NIO。
Tomcat NIO模型
當然要了解一下Tomcat NIO的模型了。Tomcat NIO是基於Java NIO實現的,其基本原理如下:
Tomcat NIO是對Java NIO的一種典型的應用方式:通過JDK提供的同步非阻塞的IO方式,實現了IO多路復用,即一個線程管理多個客戶端的連接。了解Java NIO,可以看一下Java NIO。
Tomcat在NIO模式下,所有客戶端的請求先由一個接收線程接收,然后由若干個(一般為CPU的個數)線程輪詢讀寫事件,最后將具體的讀寫操作交由線程池處理。
NioEndpoint
要了解Tomcat的NIO實現,其實就是了解NioEndpoint的實現原理。
數據結構
它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5個部分
- LimitLatch是連接控制器,它負責維護連接數的計算,nio模式下默認是10000,達到這個閾值后,就會拒絕連接請求。
- Acceptor負責接收連接,默認是1個線程來執行,將請求的事件注冊到事件列表
- Poller來負責輪詢上述產生的事件。Poller線程數量是cpu的核數Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller將就緒的事件生成SocketProcessor,然后交給Excutor去執行。
- SocketProcessor繼承了SocketProcessorBase,實現了Runnable接口,可以提交給線程池Excutor來執行。它里面的doRun()方法,封裝了讀寫Socket、完成Container調用的邏輯
- Excutor線程池是一個Tomcat線程池。用來執行Poller創建的SocketProcessor。Excutor線程池的大小就是我們在Connector節點配置的maxThreads的值。
SocketProcessor被一個線程執行的時候,會完成從socket中讀取http request,解析成HttpServletRequest對象,分派到相應的servlet並完成邏輯,然后將response通過socket發回client。在從socket中讀數據和往socket中寫數據的過程,並沒有像典型的非阻塞的NIO的那樣,注冊OP_READ或OP_WRITE事件到主Selector,而是直接通過socket完成讀寫,這時是阻塞完成的,但是在timeout控制上,使用了NIO的Selector機制,但是這個Selector並不是Poller線程維護的主Selector,而是BlockPoller線程中維護的Selector,稱之為輔Selector,實現可見org.apache.coyote.http11.Http11InputBuffer#fill。
了解了NioEndPoint的數據結構之后,可以看一下它們的關系圖
NioEndpoint組件關系圖
以上過程就以同步非阻塞的方式完成了網絡IO。
其實是一個Reactor模型:
- 一個Acceptor(當然多個也行,不過一般場景一個夠了)負責accept事件,把接收到SocketChannel注冊到按某種算法從Reactor池中取出的一個Reactor上,注冊的事件為讀,寫等,之后這個Socket Channel的所有IO事件都和Acceptor沒關系,都由被注冊到的那個Reactor來負責。
- 每個Acceptor和每個Reactor都各自持有一個Selector
- 當然每個Acceptor和Reactor都是一個線程
這里的Poller池其實就是一個Reactor池,可以是多個線程。
NioEndPoint實現
工作原理簡單了解了一下,接下來看一下具體的代碼實現吧。先上一個NioEndpoint的UML圖:
NioEndPoint啟動
AbstractEndpoint里實現了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法
NioEndPoint啟動 AbstractEndpoint里實現了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法
其中,bind()方法和startInternal()方法,由其子類具體實現。
bind()方法用於初始化endpoint,綁定監聽端口等、設置最大線程數、ssl等。
startInternal()方法在EndPoint初始化完畢后,創建pollers輪詢線程以及acceptors線程並開啟。
其中,bind()方法和startInternal()方法,由其子類具體實現。 bind()方法用於初始化endpoint,綁定監聽端口等、設置最大線程數、ssl等。 startInternal()方法在EndPoint初始化完畢后,創建pollers輪詢線程以及acceptors線程並開啟。
NioEndPoint時序圖
看完了開啟EndPoint的過程,再來詳細看一下NioEndpoint處理的的時序圖:
通過上面的時序圖,結合代碼來詳細了解一下Acceptor和Poller的工作方式。
Acceptor接收請求
NioEndPoint中的Acceptor方法實現了Runnable接口,主要干的活就是上述圖中的3,4,5,6,7
@Override public void run() { int errorDelay = 0; // 循環,直到收到一個關閉的命令 while (running) { // 如果EndPoint被暫停,則循環sleep while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //如果達到了最大連接數,則等待 countUpOrAwaitConnection(); SocketChannel socket = null; try { // 創建一個socketChannel,接收下一個從服務器進來的連接 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // 成功接收,重置error delay errorDelay = 0; // 如果處於EndPoint處於running狀態並且沒有沒暫停,Configure the socket if (running && !paused) { // setSocketOptions()將把socket傳遞給適當的處理器。如果成功,會關閉socket。 // 否則,在這里關閉socket if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
看的出來,Acceptor使用serverSock.accept()阻塞的監聽端口,如果有連接進來,拿到了socket,並且EndPoint處於正常運行狀態,則調用NioEndPoint的setSocketOptions方法,一頓操作。
至於setSocketOptions做了什么,概括來說就是根據socket構建一個NioChannel,然后把這個的NioChannel注冊到Poller的事件列表里面,等待poller輪詢。
看下setSocketOptions的代碼:
/** * 處理指定的連接 * @param socket The socket channel * @return * 如果socket配置正確,並且可能會繼續處理,返回true * 如果socket需要立即關閉,則返回false */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //非阻塞模式 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //從緩存中拿一個nioChannel 若沒有,則創建一個。將socket傳進去 NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } //從pollers數組中獲取一個Poller對象,注冊這個nioChannel getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
顯然,下面的重點就是register這個方法了。這個方法是NioEndPoint中的Poller實現的,主要干的事就是在Poller注冊新創建的套接字。
/** * 使用輪詢器注冊新創建的socket * * @param socket 新創建的socket */ public void register(final NioChannel socket) { socket.setPoller(this); //創建一個NioSocketWrapper,包裝一下socket。然后一頓設置。 NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); //從緩存中取出一個PollerEvent對象,若沒有則創建一個。將socket和NioSocketWrapper設置進去 PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); //添到到該Poller的事件列表 addEvent(r); }
總結一下,從Acceptor接收到請求,它做了這么些工作:
- 如果達到了最大連接數,則等待。否則,阻塞監聽端口。
- 監聽到有連接,則創建一個socketChannel。若服務正常運行,則把socket傳遞給適當的處理器。如果成功,會關閉socket。
在這里,適當的處理是指調用NioEndPoint的setSocketOptions方法,處理指定的連接:
- 將socket設置為非阻塞
- 從緩存中拿一個nioChannel 若沒有,則創建一個。將socket傳進去。
- 從pollers數組中獲取一個Poller對象,把nioChannel注冊到該Poller中。
其中最后一步注冊的過程,是調用Poller的register()方法:
- 創建一個NioSocketWrapper,包裝socket。然后配置相關屬性,設置感興趣的操作為SelectionKey.OP_READ
- PollerEvent。PollerEvent可以是從緩存中取出來的,若沒有則創建一個。初始化或者重置此Event對象,設置感興趣的操作為OP_REGISTER (Poller輪詢時會用到)
- 將新的PollerEvent添加到這個Poller的事件列表events,等待Poller線程輪詢。
Poller輪詢
其實上面已經提到了Poller將一個事件注冊到事件隊列的過程。接下來便是Poller線程如何處理這些事件了,這就是Poller線程的工作機制。
Poller作為一個線程,實現了Runnable接口的run方法,在run方法中會輪詢事件隊列events,將每個PollerEvent中的SocketChannel感興趣的事件注冊到Selector中,然后將PollerEvent從隊列里移除。之后就是SocketChanel通過Selector調度來進行非阻塞的讀寫數據了。
看下Poller.run()代碼:
/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override public void run() { // 循環直到 destroy() 被調用 while (true) { boolean hasEvents = false; try { if (!close) { //將events隊列,將每個事件中的通道感興趣的事件注冊到Selector中 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //如果走到了這里,代表已經有就緒的IO通道 //調用非阻塞的select方法,直接返回就緒通道的數量 keyCount = selector.selectNow(); } else { //阻塞等待操作系統返回 數據已經就緒的通道,然后被喚醒 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //如果上面select方法超時,或者被喚醒,先將events隊列中的通道注冊到Selector上。 if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 遍歷已就緒的通道,並調用processKey來處理該Socket的IO。 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // 如果其它線程已調用,則Attachment可能為空 if (attachment == null) { iterator.remove(); } else { iterator.remove(); //創建一個SocketProcessor,放入Tomcat線程池去執行 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
讀取已就緒通道的部分,是常見的Java NIO的用法,Selector調用selectedKeys(),獲取IO數據已經就緒的通道,遍歷並調用processKey方法來處理每一個通道就緒的事件。而processKey方法會創建一個SocketProcessor,然后丟到Tomcat線程池中去執行。
其中需要注意的一個點是,events()方法,用來處理PollerEvent事件,執行PollerEvent.run(),然后將PollerEvent重置再次放入緩存中,以便對象復用。
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ public boolean events() { boolean result = false; PollerEvent pe = null; while ( (pe = events.poll()) != null ) { result = true; try { //把SocketChannel感興趣的事件注冊到Selector中 pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }
可以看出,PollerEvent.run()方法才是重點:
public void run() { //Acceptor調用Poller.register()方法時,創建的PollerEvent感興趣的事件為OP_REGISTER,因此走這個分支 if (interestOps == OP_REGISTER) { try { //將SocketChannel的讀事件注冊到Poller線程的Selector中,使用Selector來調度IO。 socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socket.socketWrapper.getEndpoint().countDownConnection(); } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } else { socket.getPoller().cancelledKey(key); } } } catch (CancelledKeyException ckx) { try { socket.getPoller().cancelledKey(key); } catch (Exception ignore) {} } } }
至此,可以看出Poller線程的作用
- 將Acceptor接收到的請求注冊到Poller的事件隊列中
- Poller輪詢事件隊列中,處理到達的事件,將PollerEvent中的通道注冊到Poller的Selector中
- 輪詢已就緒的通道,對每個就緒通道創建一個SocketProcessor,交個Tomcat線程池去處理
剩下的事情,就是SocketProcessor怎么適配客戶端發來請求的數據、然后怎樣交給Tomcat容器去處理了。
SocketProcessor處理請求
簡單提一下SocketProcessor的處理過程,不是這篇文章的重點。通過上面可以知道,具體處理一個請求,是在SocketProcessor通過線程池去執行的。執行一次請求的時序圖
SocketProcessor中通過Http11ConnectionHandler,取到Htpp11Processor,Htpp11Processor調用prepareRequest方法,准備好請求數據。然后調用CoyoteAdapter的service方法進行request和response的適配,之后交給容器進行處理。
在CoyoteAdapter的service方法中,主要干了2件事:
- org.apache.coyote.Request -> org.apache.catalina.connector.Request extends HttpServletRequest,org.apache.coyote.Response -> org.apache.catalina.connector. Response extends HttpServletResponse
- 將請求交給StandardEngineValue處理
將請求交給Tomcat容器處理后,后將請求一層一層傳遞到Engin、Host、Context、Wrapper,最終經過一系列Filter,來到了Servlet,執行我們自己具體的代碼邏輯。其中,容器之間數據的傳遞用到了管道流的機制。這里就不在贅述,以后有時間專門寫一篇Tomcat容器的工作原理。
參考文章:
《Tomcat內核設計剖析》