深入理解NIO(二)—— Tomcat中對NIO的應用
老哥行行好,轉載和我說一聲好嗎,我不介意轉載的,但是請把原文鏈接貼大點好嗎
Tomcat大致架構
先貼兩張圖大致看一眼Tomcat的架構
- Tomcat中只有一個Server,一個Server可以有多個Service,一個Service可以有多個Connector和一個Container;
- Service 是對外提供服務的;
- Connector用於接受請求並將請求封裝成Request和Response來具體處理;
- Container用於封裝和管理Servlet,以及具體處理request請求;
接下來我們只解析Connector部分的源碼,因為它底層是NIO實現的
我們先啟動一個Tomcat試試:
Tomcat tomcat = new Tomcat(); // 1.先分析這個 Connector connector = new Connector("HTTP/1.1"); connector.setPort(8080); tomcat.setConnector(connector); // 2.再分析這個 tomcat.start(); tomcat.getServer().await();
Connector初始化
在 Tomcat 中,使用 Connector 來處理連接,一個 Tomcat 可以配置多個 Connector,分別用於監聽不同端口,或處理不同協議。
在 Connector 的構造方法中,我們可以傳 HTTP/1.1
或 AJP/1.3
用於指定協議,也可以傳入相應的協議處理類,org.apache.coyote.http11.Http11NioProtocol:對應非阻塞 IO。
public Connector(String protocol) { . . . . . . . ProtocolHandler p = null; try { // 前面我刪了一部分代碼,這里的protocolHandlerClassName其實就是傳入的參數protocol // 所以下面這段代碼會利用反射構造了一個Http11NioProtocol對象作為ProtocolHandler Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"), e); } finally { this.protocolHandler = p; } }
Connector就是使用ProtocolHandler來處理請求的,不同的ProtocolHandler代表不同的連接類型,其中ProtocolHandler包含了三個部件:Endpoint、Processor、Adapter。
下面是Http11NioProtocol類的構造方法,里面構造了一個NioEndpoint對象
public Http11NioProtocol() { super(new NioEndpoint()); }
當然這里只是構造了NioEndpoint,還沒有用它去綁定某個端口,也就是還沒開始初始化
在設置端口 connector.setPort(8080); 並調用 start() 之后,才正式開始綁定端口
start()
// Tomcat public void start() throws LifecycleException { getServer(); // 從這里進去 server.start(); } // LifecycleBase public final synchronized void start() throws LifecycleException { // 這里省略其他代碼,我們直接進到第173行的init方法 if (state.equals(LifecycleState.NEW)) { init(); } // 第183行,一會的第4小節我們會分析到 startInternal(); } // 還是LifecycleBase public final synchronized void init() throws LifecycleException { // 第136行 initInternal(); } // 進到Connector中 protected void initInternal() throws LifecycleException { // 第932行有這樣一行代碼,它調用了AbstractProtocol的初始化init()方法 protocolHandler.init(); }
// AbstractProtocol public void init() throws Exception { ... String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); endpoint.setDomain(domain); // endpoint 的 name=http-nio-8089,domain=Tomcat endpoint.init(); } // AbstractEndpoint public final void init() throws Exception { if (bindOnInit) { bind(); // 這里對應的當然是子類 NioEndpoint 的 bind() 方法 bindState = BindState.BOUND_ON_INIT; } ... }
接下來終於進入正題了,也就是 NioEndpoint的綁定端口方法了
Endpoint
雖然Endpoint有多種,但是這里我們只講NioEndpoint,Tomcat 使用不同的 Endpoint 來處理不同的協議請求
bind()
追隨着Tomcat的start方法中的init()初始化部分,我們來到了屬於NioEndpoint的init() 方法,也就是bind() 方法
// NioEndpoint public void bind() throws Exception { // initServerSocket(); 原代碼是這行,我們 “內聯” 過來一起說 // 開啟 ServerSocketChannel serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); // getPort() 會返回我們最開始設置的 8080,得到我們的 address 是 0.0.0.0:8080 InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); // ServerSocketChannel 綁定地址、端口, // 第二個參數 backlog 默認為 100,超過 100 的時候,新連接會被拒絕(不過源碼注釋也說了,這個值的真實語義取決於具體實現) serverSock.socket().bind(addr,getAcceptCount()); // ※※※ 設置 ServerSocketChannel 為阻塞模式 ※※※ serverSock.configureBlocking(true); // 設置 acceptor 和 poller 的數量,至於它們是什么角色,待會說 // acceptorThreadCount 默認為 1 if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads // 作者想表達的意思應該是:使用多個 acceptor 線程並不見得性能會更好 acceptorThreadCount = 1; } // poller 線程數,默認值定義如下,所以在多核模式下,默認為 2 // pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors()); if (pollerThreadCount <= 0) { pollerThreadCount = 1; } // setStopLatch(new CountDownLatch(pollerThreadCount)); // 初始化 ssl,我們忽略 ssl initialiseSsl(); // 打開 NioSelectorPool,先忽略它 selectorPool.open(); }
- ServerSocketChannel 已經打開,並且綁定要了之前指定的 8080 端口,設置成了阻塞模式。
- 設置了 acceptor 的線程數為 1
- 設置了 poller 的線程數,單核 CPU 為 1,多核為 2
到這里我們就正式 init() 結束了,接下來我們就該分析剛剛提到的 startInternal(); 方法了
// LifecycleBase public final synchronized void start() throws LifecycleException { // 這里省略其他代碼,我們直接進到第173行的init方法 if (state.equals(LifecycleState.NEW)) { init(); } // 第183行,一會的第4小節我們會分析到 startInternal(); } // Connector protected void startInternal() throws LifecycleException { // 第957行 protocolHandler.start(); } // AbstractProtocol public void start() throws Exception { ... // 調用 endpoint 的 start 方法 endpoint.start(); } // AbstractEndpoint public final void start() throws Exception { // 按照我們的流程,剛剛 init 的時候,已經把 bindState 改為 BindState.BOUND_ON_INIT 了, // 所以下面的 if 分支我們就不進去了 if (bindState == BindState.UNBOUND) { bind(); bindState = BindState.BOUND_ON_START; } // 往里看 NioEndpoint 的實現 startInternal(); }
startInternal
// NioEndpoint public void startInternal() throws Exception { if (!running) { running = true; paused = false; // 以下幾個是緩存用的,之后我們也會看到很多這樣的代碼,為了減少 new 很多對象出來 processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // 創建【工作線程池】,Tomcat 自己包裝了一下 ThreadPoolExecutor, // 1. 為了在創建線程池以后,先啟動 corePoolSize 個線程 // 2. 自己管理線程池的增長方式(默認 corePoolSize 10, maxPoolSize 200),keepAliveTime是60秒,workQueue是LinkBlockingQueue if ( getExecutor() == null ) { createExecutor(); } // 設置一個柵欄(tomcat 自定義了類 LimitLatch),控制最大的連接數,默認是 10000 initializeConnectionLatch(); // 開啟 poller 線程 // 還記得之前 init 的時候,默認地設置了 poller 的數量為 2,所以這里啟動 2 個 poller 線程 pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } // 開啟 acceptor 線程,和開啟 poller 線程組差不多。 // init 的時候,默認地,acceptor 的線程數是 1 startAcceptorThreads(); } }
- Tomcat自己管理線程池的增長方式(默認 corePoolSize 10, maxPoolSize 200),keepAliveTime是60秒,workQueue是LinkBlockingQueue
- 該創建的工作線程池、 poller 線程組、acceptor 線程組這里都創建完畢
- 設置一個柵欄(tomcat 自定義了類 LimitLatch),控制最大的連接數,默認是 10000
線程模型圖
在開始講這幾個線程之前,我們先看一下這張線程模型圖,有個大概的印象。
Acceptor
它的結構非常簡單,在構造函數中,已經把 endpoint 傳進來了,此外就只有 threadName 和 state 兩個簡單的屬性。
private final AbstractEndpoint<?,U> endpoint; private String threadName; protected volatile AcceptorState state = AcceptorState.NEW; public Acceptor(AbstractEndpoint<?,U> endpoint) { this.endpoint = endpoint; }
threadName 就是一個線程名字而已,Acceptor 的狀態 state 主要是隨着 endpoint 來的。
public enum AcceptorState { NEW, RUNNING, PAUSED, ENDED }
接下來我們直接來看 acceptor 的 run 方法吧:
run()
public void run() { int errorDelay = 0; // 只要 endpoint 處於 running,這里就一直循環 while (endpoint.isRunning()) { // 如果 endpoint 處於 pause 狀態,這邊 Acceptor 用一個 while 循環將自己也掛起 while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { } } // endpoint 結束了,Acceptor 自然也要結束嘛 if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { // 如果此時達到了最大連接數(之前我們說過,默認是10000),就等待 endpoint.countUpOrAwaitConnection(); if (endpoint.isPaused()) { continue; } U socket = null; try { // 這里就是接收下一個進來的 SocketChannel // 之前我們設置了 ServerSocketChannel 為阻塞模式,所以這邊的 accept 是阻塞的 // 這里的實現是return serverSock.accept(); 返回的是一個SocketChannel socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { endpoint.countDownConnection(); if (endpoint.isRunning()) { errorDelay = handleExceptionWithDelay(errorDelay); throw ioe; } else { break; } } // accept 成功,將 errorDelay 設置為 0 errorDelay = 0; if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() 是這里的關鍵方法,也就是說前面千辛萬苦都是為了能到這里進行處理 if (!endpoint.setSocketOptions(socket)) { // 如果上面的方法返回 false,關閉 SocketChannel endpoint.closeSocket(socket); } } else { // 由於 endpoint 不 running 了,或者處於 pause 了,將此 SocketChannel 關閉 endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }
- 大家應該發現了,Acceptor 繞來繞去,都是在調用 NioEndpoint 的方法
- acceptor 啟動以后就開始循環調用 ServerSocketChannel 的 accept() 方法獲取新的連接,然后調用 endpoint.setSocketOptions(socket) 處理新的連接,之后再進入循環 accept 下一個連接。
setSocketOptions
protected boolean setSocketOptions(SocketChannel socket) { try { // 設置該 SocketChannel 為非阻塞模式 socket.configureBlocking(false); Socket sock = socket.socket(); // 設置 socket 的一些屬性 socketProperties.setProperties(sock); // 還記得 startInternal 的時候,說過了 nioChannels 是緩存用的。 // 限於篇幅,這里的 NioChannel 就不展開了,它包括了 socket 和 buffer NioChannel channel = nioChannels.pop(); if (channel == null) { // 主要是創建讀和寫的兩個 buffer,默認地,讀和寫 buffer 都是 8192 字節,8k(名字叫handler哦handler,是不是突然就懂了呢,寫出見名知意的代碼是真的很重要) SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // getPoller0() 會選取所有 poller 中的一個 poller getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
- 這里創建了一個包着SocketChannel和BufferHandler的NioChannel
- 之后往 poller 中注冊了這個 NioChannel 實例
Poller
之前我們看到 acceptor 將一個 NioChannel 實例 register 到了一個 poller 中。在看 register 方法之前,我們需要先對 poller 要有個簡單的認識。(雖然你有打開源碼就會知道,但我還是提一下,這里的Poller其實還是NioEnpoint的內部類哦!)
public class Poller implements Runnable { public Poller() throws IOException { // 每個 poller 開啟一個 Selector this.selector = Selector.open(); } private Selector selector; // events 隊列,此類的核心 private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private volatile boolean close = false; private long nextExpiration = 0;// 這個值后面有用,記住它的初始值為 0 private AtomicLong wakeupCounter = new AtomicLong(0); private volatile int keyCount = 0; ... }
每個 poller 關聯了一個 Selector。
register()
回來看看剛剛Acceptor類中把NioChannel注冊到Poller中的register方法
public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 注意第三個參數值 OP_REGISTER if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 添加 event 到 poller 中 addEvent(r); }
這里將這個 socket(包含 socket 和 buffer 的 NioChannel 實例) 包裝為一個 PollerEvent,然后添加到 events 隊列中
將NioChannel注冊進Poller之后,Acceptor的任務完成,之后的事情都交給這個作為Selector的Poller線程了
之后我們就來看看Poller線程是怎么工作的吧:
Poller.run ()
public void run() { while (true) { boolean hasEvents = false; try { if (!close) { // 執行 events 隊列中每個 event 的 run() 方法
// events() 方法比較簡單,就是取出當前隊列中的 PollerEvent 對象,逐個執行 event.run() 方法。 hasEvents = events(); // wakeupCounter 的初始值為 0,這里設置為 -1 if (wakeupCounter.getAndSet(-1) > 0) { keyCount = selector.selectNow(); } else { // timeout 默認值 1 秒 keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } // 篇幅所限,我們就不說 close 的情況了 if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } // 這里沒什么好說的,頂多就再執行一次 events() 方法 if ( keyCount == 0 ) hasEvents = (hasEvents | events()); // 如果剛剛 select 有返回 ready keys,進行處理 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); if (attachment == null) { iterator.remove(); } else { iterator.remove(); // ※※※※※ 處理 ready key ※※※※※ processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
- 大概就是調用 events() 方法調用每個剛剛注冊到events隊列中的PollerEvent類的run方法
- 處理注冊到 Selector 上的 ready key
events()方法比較簡單,就是調用每個剛剛注冊到events隊列中的PollerEvent類的run方法,所以這里就不再分析
我們直接來看PollerEvent類的run方法(注意,在調用PollerEvent類的run方法之前,雖然NioChannel已經注冊給Poller,但是並沒有真的注冊到Selector里面)
PollerEvent.run()
public void run() { // 對於新來的連接,前面我們說過,interestOps == OP_REGISTER if (interestOps == OP_REGISTER) { try { // 將這個新連接 SocketChannel 注冊到該 poller 的 Selector 中, // 設置監聽 OP_READ 事件, // 將 socketWrapper 設置為 attachment 進行傳遞(這個對象可是什么鬼都有,往上看就知道了) socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { /* else 這塊不介紹*/ } }
SocketChannel 注冊到了 Poller 內部的 Selector 中,監聽 OP_READ 事件
剛剛Poller方法的run中,除了調用events()以外,它的主要職責就是對是 readable 狀態的SocketChannel調用processKey方法
所以接下來我們來看看processKey方法
processKey()
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { // 忽略 sendfile if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { // unregister 相應的 interest set, // 如接下來是處理 SocketChannel 進來的數據,那么就不再監聽該 channel 的 OP_READ 事件 unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { // 處理讀 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { // 處理寫 if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { // 創建一個 SocketProcessorBase 的實例 sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { // 將任務放到之前建立的 worker 線程池中執行 executor.execute(sc); } else { sc.run(); // ps: 如果 dispatch 為 false,那么就當前線程自己執行 } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
上面兩段代碼,大概是創建了一個SocketProcessorBase類后交給之前的worker線程池中去執行
本來想分析一下SocketProcessorBase類的run方法的,看了一下,好長,算了,主題也不是Tomcat源碼分析,只是看一下其中NIO的應用而已,就到此為止吧。
PS:管 ™ 的找不找得到工作,管 ™ 的面試會不會問,老子的原則是:淦 ™ 的源碼看爆,水文水爆。
下一篇:NIO原理及部分源碼的解析
參考資料:
https://javadoop.com/post/tomcat-nio 大量搬運自此文章
https://blog.csdn.net/qq_38245537/article/details/79009448 參考圖片