從連接器組件看Tomcat的線程模型——NIO模式


Tomcat8之后,針對Http協議默認使用org.apache.coyote.http11.Http11NioProtocol,也就是NIO模式。通過之前的博客分析,我們知道Connector組件在初始化和start的時候會觸發它子組件(Http11NioProtocol、NIOEndpoint的初始化和start)。

NIO模式工作時序圖

還是像之前那樣,我們先整理出NIO模式啟動時的時序圖。

從上面的時序圖可以看出,整個流程的重點時在NioEndpoint這個類中。下面我們通過源代碼看下這幾個重點方法。

//NIO模式綁定端口
public void bind() throws Exception {
        //初始化套接字服務,需要注意的是在NIO模式下,這個ServerSocketChannel還是阻塞模式的
        initServerSocket();
        //設置默認的acceptor線程數,默認是1個,這個參數暫時好像沒法修改(??)
        //注意這個參數和acceptCount(接收請求連接的數量)之間的區別
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        //設置pollerThreadCount,根據CPU的核數來,CPU大於2個設置為2,否則為1
        if (pollerThreadCount <= 0) 
            pollerThreadCount = 1;
        }
        //設置CountDownLatch
        setStopLatch(new CountDownLatch(pollerThreadCount));
        initialiseSsl();
        selectorPool.open();
    }

這個代碼主要做了些初始化工作,初始化套接字服務,初始化acceptorThreadCount和pollerThreadCount等。

再看看startInternal代碼:

@Override
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;
        //創建3個緩存
        //頻繁創建SocketProcessor成本高
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                                 socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                             socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                                              socketProperties.getBufferPool());
        //一般情況下,我們自己不配置線程池,所以會進入這個方法,也可以自己在server.xml中配置這個線程池。
        if ( getExecutor() == null ) {
            //創建一個核心線程數是10,最大線程數是200,隊列長度是Integer.MaxValue的線程池
            //注意下,這邊線程池的邏輯和JDK中線程池的邏輯不一樣,默認創建10個線程,當請求數
            //超過10個的話會繼續創建,最大創建200個線程,超過200個后,任務就會進入阻塞隊列

            //值得注意的是Tomcat的線程池繼承了JDK的ThreadPoolExecutor,但是重寫了線程池的默認
            //機制。Tomcat的線程池會默認創建corePoolSize個線程,此時線程池中的線程都是空閑的。
            //隨着不斷向線程池中添加任務,空閑線程逐漸減少,當線程池中的空閑線程耗盡之前,任務
            //都會直接被提交到線程池的隊列中(這些任務會立即被空閑線程消費),當線程池中沒有空閑
            //線程而且線程池中的線程總數沒達到MaximumPoolSize,會創建一個新的線程來執行新的任務;
            //當線程池的大小達到MaximumPoolSize時,直接將任務放進隊列,等到有線程空閑下來后再處理
            //這個任務。(參考TaskQueue的offer方法)
            createExecutor();
        }

        initializeConnectionLatch();
        // Start poller threads
        //開啟poller線程,如果CPU是多核就開啟2個,否則開啟一個
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        //開啟acceptor線程,默認開啟一個acceptor線程
        startAcceptorThreads();
    }
}

Acceptor線程分析

acceptor線程的作用是接收客戶端請求,啟動之后一個loop線程一直在監聽用戶請求。值得注意的是,如果用戶一直沒法請求過來,這個線程也是會一直阻塞的,直到有請求過來。

//Acceptor這個類是NIOEndpoint的一個內部類
public void run() {
    int errorDelay = 0;
    // 一直會監聽,直到關閉tomcat
    while (endpoint.isRunning()) {
        // Loop if endpoint is paused
        while (endpoint.isPaused() && endpoint.isRunning()) {
            state = AcceptorState.PAUSED;
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                // Ignore
            }
        }
        if (!endpoint.isRunning()) {
            break;
        }
        state = AcceptorState.RUNNING;
        try {
            //如果已經接受的請求超過maxAcceptCount,那么accept線程進入wait狀態
            endpoint.countUpOrAwaitConnection();
            if (endpoint.isPaused()) {
                continue;
            }
            U socket = null;
            try {
                //接受socket,這個方法會阻塞,因為NIOEndpoint在初始化的時候
                //將ServerSocketChannel設置成了阻塞模式
                socket = endpoint.serverSocketAccept();
            } catch (Exception ioe) {
                endpoint.countDownConnection();
                if (endpoint.isRunning()) {
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                } else {
                    break;
                }
            }
            errorDelay = 0;
            if (endpoint.isRunning() && !endpoint.isPaused()) {
                //這邊委托給NioEndpoint的setSocketOptions方法處理
                if (!endpoint.setSocketOptions(socket)) {
                    endpoint.closeSocket(socket);
                }
            } else {
                endpoint.destroySocket(socket);
            }
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            String msg = sm.getString("endpoint.accept.fail");
            if (t instanceof Error) {
                Error e = (Error) t;
                if (e.getError() == 233) {
                    log.warn(msg, t);
                } else {
                    log.error(msg, t);
                }
            } else {
                log.error(msg, t);
            }
        }
    }
    state = AcceptorState.ENDED;
}

下面看下NioEndpoint的setSocketOptions(SocketChannel socket)方法:

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
            //使用緩存的NioChannel,沒有緩存的則新建
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                //使用緩存的channel,但是需要重新reset這個信道
                channel.reset();
            }
            //將socket注冊到poller隊列中
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

Tomcat以NIO模式啟動時NioEndpoint組件將啟動某個端口的監聽,一個連接到來后將被注冊到NioChannel隊列中,由Poller(輪詢器)負責檢測通道的讀寫事件,並在創建任務后扔進線程池中,線程池進行任務處理。處理過程中將通過協議解析器Http11NioProcessor組件對HTTP協議解析,同時通過適配器(Adapter)匹配到指定的容器進行處理並響應客戶端。

LimitLatch組件負責對連接數的控制,Acceptor組件負責接收套接字連接並注冊到通道隊列里面,Poller組件負責輪詢檢查事件列表,Poller池包含了若干Poller組件,SocketProcessor組件是任務定義器,Executor組件是負責處理套接字的線程池。下面將對每個組件的結構與作用進行解析。

連接數控制器LimitLatch

NIO模式中的LimitLatch組件和BIO模式中的LimitLatch組件功能一致,作用也是對最大連接數的限制。

與BIO中的控制器不同的是,控制閥門的大小不相同,BIO模式受本身模式的限制,它的連接數與線程數比例是1:1的關系,所以當連接數太多時將導致線程數也很多,JVM線程數過多將導致線程間切換成本很高。默認情況下,Tomcat處理連接池的線程數為200,所以BIO流量控制閥門大小也默認設置為200。但NIO模式能克服BIO連接數的不足,它能基於事件同時維護大量的連接,對於事件的遍歷只須交給同一個或少量的線程,再把具體的事件執行邏輯交給線程池。例如,Tomcat把套接字接收工作交給一個線程,而把套接字讀寫及處理工作交給N個線程,N一般為CPU核數。對於NIO模式,Tomcat默認把流量閥門大小設置為10 000,如果你想更改大小,可以通過server.xml中 節點的maxConnections屬性修改,同時要注意,連接數到達最大值后,操作系統仍然會接收客戶端連接,直到操作系統接收隊列被塞滿。隊列默認長度為100,可通過server.xml中 節點的acceptCount屬性配置。

Acceptor組件

Acceptor的主要職責也是監聽是否有客戶端連接進來並接收連接,這里需要注意的是,accept操作是阻塞的。假如用戶一直沒有請求發送過來,acceptor線程將一直阻塞。

Acceptor接收SocketChannel對象后要把它設置為非阻塞,這是因為后面對客戶端所有的連接都采取非阻塞模式處理。接着設置套接字的一些屬性,再封裝成非阻塞通道對象。非阻塞通道可能是NioChannel也可能是SecureNioChannel,這取決於使用HTTP通信還是使用HTTPS通信。最后將非阻塞通道對象注冊到通道隊列中並由Poller負責檢測事件。

任務定義器SocketProcessor

與JIoEndpoint組件相似,將任務放到線程池中處理前需要定義好任務的執行邏輯。根據線程池的約定,它必須擴展Runnable接口:

protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
    //NIO方式讀取套接字處理,並返回
    //連接數減一
    //關閉連接
}

因為NIO與BIO模式有很大不同,其中一個很大不同在於BIO每次返回都肯定能獲取若干字節,而NIO無法保證每次讀取的字節量,可多可少甚至可能沒有,所以對於NIO模式,只能“嘗試”處理請求報文。例如,第一次只讀取了請求頭部的一部分,不足以開始處理,但並不會阻塞,而是繼續往下執行,直到下次循環到來,此時可能請求頭部的另外一部分已經被讀取,則可以開始處理請求頭部。

連接輪詢器Poller

NIO模型需要同時對很多連接進行管理,管理的方式則是不斷遍歷事件列表,對相應連接的相應事件做出處理,而遍歷的工作正是交給Poller負責。Poller負責的工作可以用下圖簡單表示出來,在Java層面上看,它不斷輪詢事件列表,一旦發現相應的事件則封裝成任務定義器SocketProcessor,進而扔進線程池中執行任務。當然,由於NioEndpoint組件內有一個Poller池,因此如果不存在線程池,任務將由Poller直接執行。

Poller內部依賴JDK的Selector對象進行輪詢,Selector會選擇出待處理的事件,每輪詢一次就選出若干需要處理的通道,例如從通道中讀取字節、將字節寫入Channel等。在NIO模式下,因為每次讀取的數據是不確定的,對於HTTP協議來說,每次讀取的數據可能既包含了請求行也包含了請求頭部,也可能不包含請求頭部,所以每次只能嘗試去解析報文。若解析不成功則等待下次輪詢讀取更多的數據后再嘗試解析,若解析報文成功則做一些邏輯處理后對客戶端響應,而這些報文解析、邏輯處理、響應等都是在任務定義器中定義的。

Poller池子

在NIO模式下,對於客戶端連接的管理都是基於事件驅動的,上一節提到NioEndpoint組件包含了Poller組件,Poller負責的工作就是檢測事件並處理事件。但假如整個Tomcat的所有客戶端連接都交給一個線程來處理,那么即使這個線程是不阻塞的,整體處理性能也可能無法達到最佳或較佳的狀態。為了提升處理性能,Tomcat設計成由多個Poller共同處理所有客戶端連接,所有連接均攤給每個Poller處理,而這些Poller便組成了Poller池。

整個結構如圖6.40所示,客戶端連接由Acceptor組件接收后按照一定的算法放到通道隊列上。這里使用的是輪詢調度算法,從第1個隊列到第N個隊列循環分配,假如這里有3個Poller,則第1個連接分配給第1個Poller對應的通道列表,第2個連接分配給第2個Poller對應的通道列表,以此類推,到第4個連接又分配到第1個Poller對應的通道列表上。這種算法基本保證了每個Poller所對應處理的連接數均勻,每個Poller各自輪詢檢測自己對應的事件列表,一旦發現需要處理的連接則對其進行處理。這時如果NioEndpoint組件包含任務執行器(Executor)則會將任務處理交給它,但假如沒有Executor組件,Poller則自己處理任務。

Poller池的大小多少比較合適呢?Tomcat使用了一個經典的算法Math.min(2, Runtime. getRuntime().availableProcessors()),即會根據Tomcat運行環境決定Poller組件的數量。所以在Tomcat中一般會有兩個Poller組件,而如果運行在更多處理器的機器上,則JVM可用處理器個數等於Poller組件的個數。

BIO、NIO和AIO的對比

Java對BIO、NIO、AIO的支持

Java BIO : 同步並阻塞,服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善(Tomcat中就引入了線程池、但是即使使用了線程池,你accept到socket立馬就將其扔到線程池,此時請求的數據可能還沒到,線程池中的的線程還是會阻塞)。

Java NIO : 同步非阻塞,服務器實現模式為一個請求一個線程,即客戶端發送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有I/O請求時才啟動一個線程進行處理(數據准備好了再扔到線程池,防止線程無味的阻塞)。

Java AIO(NIO.2) : 異步非阻塞,服務器實現模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啟動線程進行處理(數據好了,操作系統主動通知,避免NIO中一直輪訓的操作)

BIO、NIO、AIO適用場景分析

BIO方式適用於連接數目比較小且固定的架構,這種方式對服務器資源要求比較高,並發局限於應用中,JDK1.4以前的唯一選擇,但程序直觀簡單易理解。

NIO方式適用於連接數目多且連接比較短(輕操作)的架構,比如聊天服務器,並發局限於應用中,編程比較復雜,JDK1.4開始支持。

AIO方式使用於連接數目多且連接比較長(重操作)的架構,比如相冊服務器,充分調用OS參與並發操作,編程比較復雜,JDK7開始支持。

參考

http://server.51cto.com/sOS-595052.html

https://nod0620.iteye.com/blog/998215

https://www.jianshu.com/p/370af4895545

https://www.jianshu.com/p/901a6e35b3d9

http://m.elecfans.com/article/632834.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM