Tomcat NIO


說起Tomcat的NIO,不得不提的就是Connector這個Tomcat組件。Connector是Tomcat的連接器,其主要任務是負責處理收到的請求,並創建一個Request和Response的對象,然后用一個線程用於處理請求,Connector會把Request和Response對象傳遞給該線程,該線程的具體的處理過程是Container容器的事了。

在tomcat啟動過程中,會初始化Connector,並調用Connector的startInternal()方法開啟Connector,開始監聽、處理請求。

 

想了解Tomcat NIO的工作方式,就得先了解一下Connector的實現原理。下面從三個方面來了解一下Connector組件:Connector的數據結構、Connector初始化以及Connector開啟。

 

Connector

Connector的數據結構

先了解一下Connector的數據結構。Connector的一個主要的屬性:ProtocolHandler protocolHandler(協議)

 

protocolHandler(協議)

  • 維護服務器使用的協議,如http1.1等。ProtocolHandler是接口,實現類有Http11Nio2Protocol 、Http11Nio2Protocol等
  • 維護服務提供的IO方式,負責EndPoint的初始化、啟動。目前有BIO、NIO、AIO等IO方式,來實現監聽端口、讀寫socket數據的功能。通過EndPoint封裝實現不同的IO方式
  • EndPoint監聽到IO讀寫,交給Tomcat線程池中的一個線程來處理,SocketProcessor會根據protocolHandler采用的協議,調用協議的process方法處理請求。
  • 維護adapter(適配器),可以將請求/響應數據進行適配

 

protocolHandler會找到socket對應的處理器(如Http11Processor),然后進行數據讀寫、適配,處理。請求由adapter最終會交給servlet處理

 

常說的BIO、NIO,主要的應用就在protocolHandler中。protocolHandler負責維護Connector使用的協議以及IO方式。在protocolHandler中,不同的IO方式,會使用不同的EndPoint,具體采用哪種IO方式,取決於采用哪個EndPoint,每一個EndPoint的實現類,都封裝了一種IO策略。若采用NIO,則為NioEndpoint。

 

Connector初始化

 

創建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協議配置,利用反射創建ProtocolHandler:

/**
 * Coyote Protocol handler class name.
 * Defaults to the Coyote HTTP/1.1 protocolHandler.
 */
protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";


public Connector(String protocol) {
	//設置protocolHandlerClassName類名
    setProtocol(protocol);
    // Instantiate protocol handler
    ProtocolHandler p = null;
    try {
		//根據server.xml中<connector/>標簽的protocol屬性值,獲取到對應的http協議類
        Class<?> clazz = Class.forName(protocolHandlerClassName);
        p = (ProtocolHandler) clazz.getConstructor().newInstance();
    } catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }

    if (Globals.STRICT_SERVLET_COMPLIANCE) {
        uriCharset = StandardCharsets.ISO_8859_1;
    } else {
        uriCharset = StandardCharsets.UTF_8;
    }
}

//設置protocolHandlerClassName類名
public void setProtocol(String protocol) {

    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

	//若配置了protocol="HTTP/1.1"或者沒配,則默認是Http11NioProtocol或者Http11AprProtocol
    if ("HTTP/1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
        }
    } else if ("AJP/1.3".equals(protocol)) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
        }
    } else {
		//直接取配置的類名
        setProtocolHandlerClassName(protocol);
    }
}

 

以Tomcat8.5.20為例,這里默認是http1.1的NIO。

 

 

Connector.start()開啟

 

Connector初始化后,調用start方法開啟。主要涉及一下幾個方法:

 

Connector的startInternal()方法,會調用protocolHandler.start();

protocolHandler中會調用endpoint.start(),從而達到開啟endpoint、監聽端口、讀寫Socket的目的:

以Tomcat8.5.20為例,這里默認是http1.1的NIO。


Connector.start()開啟

Connector初始化后,調用start方法開啟。主要涉及一下幾個方法:

Connector的startInternal()方法,會調用protocolHandler.start();
protocolHandler中會調用endpoint.start(),從而達到開啟endpoint、監聽端口、讀寫Socket的目的:

  

至此,Connector完成了開啟的過程,開啟監聽端口、可以讀寫Socket了。

 

總結一下,關於Connector:

創建Connector時,會拿到Tomcat目錄下conf/server.xml中Connector的協議配置,利用反射創建ProtocolHandler。

ProtocolHandler負責維護Connector使用的協議以及IO方式,不同的IO方式如BIO、NIO、AIO封裝在EndPoint中

開啟Connector時,會開啟protocolHandler,從而達到EndPoint的開啟,開始監聽端口、讀寫socket數據了

protocolHandler中將請求拿到的數據進行適配,通過adapter適配成Request和Response對象,最終交給Container去處理

 

下面重點就來了,NIO。

Tomcat NIO

 

Tomcat在處理客戶端請求時,讀寫socket數據是一種網絡IO操作。目前Tomcat有幾種IO方式,分別是BIO(同步阻塞),NIO(同步非阻塞)和AIO(異步非阻塞)。不同IO方式的讀寫機制,被封裝在了Endpoint中。BIO、AIO不再贅述。這里主要看NIO。

Tomcat NIO模型

當然要了解一下Tomcat NIO的模型了。Tomcat NIO是基於Java NIO實現的,其基本原理如下:

 

Tomcat NIO是對Java NIO的一種典型的應用方式:通過JDK提供的同步非阻塞的IO方式,實現了IO多路復用,即一個線程管理多個客戶端的連接。了解Java NIO,可以看一下Java NIO

Tomcat在NIO模式下,所有客戶端的請求先由一個接收線程接收,然后由若干個(一般為CPU的個數)線程輪詢讀寫事件,最后將具體的讀寫操作交由線程池處理。

NioEndpoint

 

要了解Tomcat的NIO實現,其實就是了解NioEndpoint的實現原理。

 

數據結構

它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5個部分

 

  • LimitLatch是連接控制器,它負責維護連接數的計算,nio模式下默認是10000,達到這個閾值后,就會拒絕連接請求。
  • Acceptor負責接收連接,默認是1個線程來執行,將請求的事件注冊到事件列表
  • Poller來負責輪詢上述產生的事件。Poller線程數量是cpu的核數Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller將就緒的事件生成SocketProcessor,然后交給Excutor去執行。
  • SocketProcessor繼承了SocketProcessorBase,實現了Runnable接口,可以提交給線程池Excutor來執行。它里面的doRun()方法,封裝了讀寫Socket、完成Container調用的邏輯
  • Excutor線程池是一個Tomcat線程池。用來執行Poller創建的SocketProcessor。Excutor線程池的大小就是我們在Connector節點配置的maxThreads的值。

 

SocketProcessor被一個線程執行的時候,會完成從socket中讀取http request,解析成HttpServletRequest對象,分派到相應的servlet並完成邏輯,然后將response通過socket發回client。在從socket中讀數據和往socket中寫數據的過程,並沒有像典型的非阻塞的NIO的那樣,注冊OP_READ或OP_WRITE事件到主Selector,而是直接通過socket完成讀寫,這時是阻塞完成的,但是在timeout控制上,使用了NIO的Selector機制,但是這個Selector並不是Poller線程維護的主Selector,而是BlockPoller線程中維護的Selector,稱之為輔Selector,實現可見org.apache.coyote.http11.Http11InputBuffer#fill。

 

了解了NioEndPoint的數據結構之后,可以看一下它們的關系圖

NioEndpoint組件關系圖

以上過程就以同步非阻塞的方式完成了網絡IO。

 

其實是一個Reactor模型:

  • 一個Acceptor(當然多個也行,不過一般場景一個夠了)負責accept事件,把接收到SocketChannel注冊到按某種算法從Reactor池中取出的一個Reactor上,注冊的事件為讀,寫等,之后這個Socket Channel的所有IO事件都和Acceptor沒關系,都由被注冊到的那個Reactor來負責。
  • 每個Acceptor和每個Reactor都各自持有一個Selector
  • 當然每個Acceptor和Reactor都是一個線程

 

這里的Poller池其實就是一個Reactor池,可以是多個線程。

 

 

NioEndPoint實現

 

工作原理簡單了解了一下,接下來看一下具體的代碼實現吧。先上一個NioEndpoint的UML圖:

NioEndPoint啟動

 

AbstractEndpoint里實現了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法

 
NioEndPoint啟動

AbstractEndpoint里實現了一些EndPoint的抽象的通用的方法,其中主要的一個入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法

  

其中,bind()方法和startInternal()方法,由其子類具體實現。

bind()方法用於初始化endpoint,綁定監聽端口等、設置最大線程數、ssl等。

startInternal()方法在EndPoint初始化完畢后,創建pollers輪詢線程以及acceptors線程並開啟。

其中,bind()方法和startInternal()方法,由其子類具體實現。
bind()方法用於初始化endpoint,綁定監聽端口等、設置最大線程數、ssl等。
startInternal()方法在EndPoint初始化完畢后,創建pollers輪詢線程以及acceptors線程並開啟。

  

NioEndPoint時序圖

 

看完了開啟EndPoint的過程,再來詳細看一下NioEndpoint處理的的時序圖:

通過上面的時序圖,結合代碼來詳細了解一下Acceptor和Poller的工作方式。

 

Acceptor接收請求

 

NioEndPoint中的Acceptor方法實現了Runnable接口,主要干的活就是上述圖中的3,4,5,6,7

 

@Override  
public void run() {  
  
    int errorDelay = 0;  
  
    // 循環,直到收到一個關閉的命令  
    while (running) {  
  
        // 如果EndPoint被暫停,則循環sleep  
        while (paused && running) {  
            state = AcceptorState.PAUSED;  
            try {  
                Thread.sleep(50);  
            } catch (InterruptedException e) {  
                // Ignore  
            }  
        }  
  
        if (!running) {  
            break;  
        }  
        state = AcceptorState.RUNNING;  
  
        try {  
            //如果達到了最大連接數,則等待  
            countUpOrAwaitConnection();  
  
            SocketChannel socket = null;  
            try {  
                // 創建一個socketChannel,接收下一個從服務器進來的連接  
                socket = serverSock.accept();  
            } catch (IOException ioe) {  
                // We didn't get a socket  
                countDownConnection();  
                if (running) {  
                    // Introduce delay if necessary  
                    errorDelay = handleExceptionWithDelay(errorDelay);  
                    // re-throw  
                    throw ioe;  
                } else {  
                    break;  
                }  
            }  
            // 成功接收,重置error delay  
            errorDelay = 0;  
  
            // 如果處於EndPoint處於running狀態並且沒有沒暫停,Configure the socket  
            if (running && !paused) {  
                // setSocketOptions()將把socket傳遞給適當的處理器。如果成功,會關閉socket。  
                // 否則,在這里關閉socket  
                if (!setSocketOptions(socket)) {  
                    closeSocket(socket);  
                }  
            } else {  
                closeSocket(socket);  
            }  
        } catch (Throwable t) {  
            ExceptionUtils.handleThrowable(t);  
            log.error(sm.getString("endpoint.accept.fail"), t);  
        }  
    }  
    state = AcceptorState.ENDED;  
}  

  

看的出來,Acceptor使用serverSock.accept()阻塞的監聽端口,如果有連接進來,拿到了socket,並且EndPoint處於正常運行狀態,則調用NioEndPoint的setSocketOptions方法,一頓操作。

至於setSocketOptions做了什么,概括來說就是根據socket構建一個NioChannel,然后把這個的NioChannel注冊到Poller的事件列表里面,等待poller輪詢。

 

看下setSocketOptions的代碼:

 
/**
 * 處理指定的連接
 * @param socket The socket channel
 * @return  
 *  如果socket配置正確,並且可能會繼續處理,返回true 
 *  如果socket需要立即關閉,則返回false
 */
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //非阻塞模式
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
		
		//從緩存中拿一個nioChannel  若沒有,則創建一個。將socket傳進去
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
		//從pollers數組中獲取一個Poller對象,注冊這個nioChannel
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

  

顯然,下面的重點就是register這個方法了。這個方法是NioEndPoint中的Poller實現的,主要干的事就是在Poller注冊新創建的套接字。

/**
 * 使用輪詢器注冊新創建的socket
 *
 * @param socket    新創建的socket
 */
public void register(final NioChannel socket) {
    socket.setPoller(this);
	//創建一個NioSocketWrapper,包裝一下socket。然后一頓設置。
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getSocketProperties().getSoTimeout());
    ka.setWriteTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());


	//從緩存中取出一個PollerEvent對象,若沒有則創建一個。將socket和NioSocketWrapper設置進去
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);


	//添到到該Poller的事件列表
    addEvent(r);
}

  

總結一下,從Acceptor接收到請求,它做了這么些工作:

  • 如果達到了最大連接數,則等待。否則,阻塞監聽端口。
  • 監聽到有連接,則創建一個socketChannel。若服務正常運行,則把socket傳遞給適當的處理器。如果成功,會關閉socket。

 

在這里,適當的處理是指調用NioEndPoint的setSocketOptions方法,處理指定的連接:

  • 將socket設置為非阻塞
  • 從緩存中拿一個nioChannel  若沒有,則創建一個。將socket傳進去。
  • 從pollers數組中獲取一個Poller對象,把nioChannel注冊到該Poller中。

 

其中最后一步注冊的過程,是調用Poller的register()方法:

  • 創建一個NioSocketWrapper,包裝socket。然后配置相關屬性,設置感興趣的操作為SelectionKey.OP_READ
  • PollerEvent。PollerEvent可以是從緩存中取出來的,若沒有則創建一個。初始化或者重置此Event對象,設置感興趣的操作為OP_REGISTER (Poller輪詢時會用到)
  • 將新的PollerEvent添加到這個Poller的事件列表events,等待Poller線程輪詢。

 

Poller輪詢

 

其實上面已經提到了Poller將一個事件注冊到事件隊列的過程。接下來便是Poller線程如何處理這些事件了,這就是Poller線程的工作機制。

Poller作為一個線程,實現了Runnable接口的run方法,在run方法中會輪詢事件隊列events,將每個PollerEvent中的SocketChannel感興趣的事件注冊到Selector中,然后將PollerEvent從隊列里移除。之后就是SocketChanel通過Selector調度來進行非阻塞的讀寫數據了。

看下Poller.run()代碼:

/**
 * The background thread that adds sockets to the Poller, checks the
 * poller for triggered events and hands the associated socket off to an
 * appropriate processor as events occur.
 */
@Override
public void run() {
    // 循環直到 destroy() 被調用
    while (true) {

        boolean hasEvents = false;

        try {
            if (!close) {
				//將events隊列,將每個事件中的通道感興趣的事件注冊到Selector中
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //如果走到了這里,代表已經有就緒的IO通道
                    //調用非阻塞的select方法,直接返回就緒通道的數量
                    keyCount = selector.selectNow();
                } else {
					//阻塞等待操作系統返回 數據已經就緒的通道,然后被喚醒
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error("",x);
            continue;
        }
        //如果上面select方法超時,或者被喚醒,先將events隊列中的通道注冊到Selector上。
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // 遍歷已就緒的通道,並調用processKey來處理該Socket的IO。
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // 如果其它線程已調用,則Attachment可能為空
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
				//創建一個SocketProcessor,放入Tomcat線程池去執行
                processKey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    getStopLatch().countDown();
}

  

讀取已就緒通道的部分,是常見的Java NIO的用法,Selector調用selectedKeys(),獲取IO數據已經就緒的通道,遍歷並調用processKey方法來處理每一個通道就緒的事件。而processKey方法會創建一個SocketProcessor,然后丟到Tomcat線程池中去執行。

其中需要注意的一個點是,events()方法,用來處理PollerEvent事件,執行PollerEvent.run(),然后將PollerEvent重置再次放入緩存中,以便對象復用。

/**
 * Processes events in the event queue of the Poller.
 *
 * @return <code>true</code> if some events were processed,
 *   <code>false</code> if queue was empty
 */
public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    while ( (pe = events.poll()) != null ) {
        result = true;
        try {
			//把SocketChannel感興趣的事件注冊到Selector中
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }

    return result;
}

 

可以看出,PollerEvent.run()方法才是重點:

public void run() {
	//Acceptor調用Poller.register()方法時,創建的PollerEvent感興趣的事件為OP_REGISTER,因此走這個分支
    if (interestOps == OP_REGISTER) {
        try {
			//將SocketChannel的讀事件注冊到Poller線程的Selector中,使用Selector來調度IO。
            socket.getIOChannel().register(
                    socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    } else {
        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        try {
            if (key == null) {
                // The key was cancelled (e.g. due to socket closure)
                // and removed from the selector while it was being
                // processed. Count down the connections at this point
                // since it won't have been counted down when the socket
                // closed.
                socket.socketWrapper.getEndpoint().countDownConnection();
            } else {
                final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                if (socketWrapper != null) {
                    //we are registering the key to start with, reset the fairness counter.
                    int ops = key.interestOps() | interestOps;
                    socketWrapper.interestOps(ops);
                    key.interestOps(ops);
                } else {
                    socket.getPoller().cancelledKey(key);
                }
            }
        } catch (CancelledKeyException ckx) {
            try {
                socket.getPoller().cancelledKey(key);
            } catch (Exception ignore) {}
        }
    }
}

  

至此,可以看出Poller線程的作用

  • 將Acceptor接收到的請求注冊到Poller的事件隊列中
  • Poller輪詢事件隊列中,處理到達的事件,將PollerEvent中的通道注冊到Poller的Selector中
  • 輪詢已就緒的通道,對每個就緒通道創建一個SocketProcessor,交個Tomcat線程池去處理

 

剩下的事情,就是SocketProcessor怎么適配客戶端發來請求的數據、然后怎樣交給Tomcat容器去處理了。

 

SocketProcessor處理請求

 

簡單提一下SocketProcessor的處理過程,不是這篇文章的重點。通過上面可以知道,具體處理一個請求,是在SocketProcessor通過線程池去執行的。執行一次請求的時序圖

SocketProcessor中通過Http11ConnectionHandler,取到Htpp11Processor,Htpp11Processor調用prepareRequest方法,准備好請求數據。然后調用CoyoteAdapter的service方法進行request和response的適配,之后交給容器進行處理。

 

在CoyoteAdapter的service方法中,主要干了2件事:

  • org.apache.coyote.Request -> org.apache.catalina.connector.Request extends HttpServletRequest,org.apache.coyote.Response -> org.apache.catalina.connector. Response extends HttpServletResponse
  • 將請求交給StandardEngineValue處理

 

將請求交給Tomcat容器處理后,后將請求一層一層傳遞到Engin、Host、Context、Wrapper,最終經過一系列Filter,來到了Servlet,執行我們自己具體的代碼邏輯。其中,容器之間數據的傳遞用到了管道流的機制。這里就不在贅述,以后有時間專門寫一篇Tomcat容器的工作原理。

 

參考文章:

《Tomcat內核設計剖析》

深度解讀Tomcat中的NIO模型


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM