Tomcat-如何建立連接獲取http請求


tomcat-如何建立連接,獲取遠程請求

學習探討tomcat如何建立網絡連接協議,並處理客戶端過來的請求

建立網絡連接,指定http1.1通信協議

tomcat在創建時,會創建連接對象,負責處理客戶端的請求,基於socket

connector 連接 protocol 協議 endpoint終端 socket插座,端口連接

創建初始化

connector -> protocol -> endpoint -> socket

接收請求創建任務

acceptor.socket.acceptor()->

​ socketWrapper(攜帶通信信息)

​ -> poller(socketWrapper)

​ -> execute(socketWrapper) 創建線程

創建連接器

Conector類

org.apache.catalina.connector.Connector

空參構造connector() -> connector(http/1.1)

/**
 * Defaults to using HTTP/1.1 NIO implementation.
 */
public Connector() {
    this("HTTP/1.1");
}

指定通信協議http11

org.apache.coyote.http11.Http11NioProtocol

-> new Http11NioProtocol()

public Http11NioProtocol() {
       super(new NioEndpoint());
   }

指定服務終端處理模型非阻塞nio

org.apache.tomcat.util.net.NioEndpoint

-> new NioEndPoint()

創建之后如何被啟動?見springboot啟動tomcat方式

終端處理線程和線程池初始化

啟動之后

NioEndpoint執行bind()方法,

一些初始化,綁定端口

@Override
    public void bind() throws Exception {
        initServerSocket();
        setStopLatch(new CountDownLatch(1));
        // Initialize SSL if needed
        initialiseSsl();
        selectorPool.open(getName());
    }
	
	//socket相關  initServerSocket()具體如下
	// Separated out to make it easier for folks that extend NioEndpoint to
    // implement custom [server]sockets
    protected void initServerSocket() throws Exception {
            //.......
        	//根據平台不同,反回具體底層類對象(windows,linux,unix)
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            //綁定地址和端口號
            InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
            serverSock.socket().bind(addr,getAcceptCount());
        	//.......
    }

NioEndpoint初始化之后,調用start()執行startInternal()

代碼如下

// Create worker collection
if (getExecutor() == null) {
    //創建線程池
    createExecutor();
}

initializeConnectionLatch();
// Start poller thread
// 創建客戶端隊列(客戶端過來的請求)

poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

//創建接收遠程請求線程
startAcceptorThread();

初始化線程池配置

-> createExecutor() 用於處理用戶請求

指定 備用線程,對大線程數,隊列類型,超時時間,和線程工廠

public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }

創建Poller線程

poller = new Poller();
Thread pollerThread = new Thread(poller, getName() + "-ClientPoller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();

創建Acceptor線程

protected void startAcceptorThread() {
        acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor";
        acceptor.setThreadName(threadName);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }

處理請求的相關對象(線程)

Acceptor

org.apache.tomcat.util.net.Acceptor

Acceptor 負責循環等待遠程請求,將請求以socket形式攜帶信息,調用setSocketOptions()將socket包裝配置為socketWrapper,

setSocketOptions: 對socket包裝處理配置,使用poller對象注冊到隊列,讓poller線程做后續的處理

Acceptor 類的run方法:

public void run() {

        int errorDelay = 0;
		//......以下省略部分代碼
        try {
            // Loop until we receive a shutdown command
            // 一直循環等待遠程請求
            while (!stopCalled) {
                // Accept the next incoming connection from the server socket
                // 1 接收請求
                socket = endpoint.serverSocketAccept();
                    
                // setSocketOptions() will hand the socket off to
                // 2 處理請求,setSocketOptions() 內部調用poller 將新請求任務放入隊列
                if (!endpoint.setSocketOptions(socket)) {
                    endpoint.closeSocket(socket);
                }
                        
            }
        } finally {
            stopLatch.countDown();
        }
        state = AcceptorState.ENDED;
    }

Poller

org.apache.tomcat.util.net.NioEndpoint.Poller

Poller負責接收包裝后的socket請求,放入隊列,

並在run方法中循環去poll()請求任務,將與流讀寫有關的組件IOChannel Selector socketWrapper 綁定關聯

再通過selector獲取selectionKeys

迭代循環獲取對應的socket,提交任務(線程),線程讀寫處理socketWrapper等后續操作

public void run() {
            // Loop until destroy() is called
            while (true) {
                // events()方法 poller隊列任務處理  將IOChannel Selector socketWrapper 關聯 
		hasEvents = events();
                //......省略
				
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // 非阻塞io api 任務處理
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper != null) {
                        // 如果有等待處理的任務,則處理
                        processKey(sk, socketWrapper);
                        //processKey內部會調用processSocket方法,最終用線程池提交任務
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

調用線程池處理請求

org.apache.tomcat.util.net.AbstractEndpoint

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor(); //獲取線程池
            if (dispatch && executor != null) {
                executor.execute(sc); //最終通過線程池處理配置后的請求
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;

events隊列

private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>(); //事件隊列(socket請求)
//注冊請求到隊列
public void rigister(final NioSocketWrapper socketWrapper)
{
    event = new PollerEvent(socketWrapper, OP_REGISTER);
	addEvent(event);
}

private void addEvent(PollerEvent event) {
    events.offer(event);
    if (wakeupCounter.incrementAndGet() == 0) {
        selector.wakeup();
    }
}

其他

events()綁定及后面的 processSocket()最終提交實際處理任務到線程

/**
         * Processes events in the event queue of the Poller.
         *
         * @return <code>true</code> if some events were processed,
         *   <code>false</code> if queue was empty
         */
        public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
                result = true;
                NioSocketWrapper socketWrapper = pe.getSocketWrapper();
                SocketChannel sc = socketWrapper.getSocket().getIOChannel();
                int interestOps = pe.getInterestOps();
                if (sc == null) {
                    log.warn(sm.getString("endpoint.nio.nullSocketChannel"));
                    socketWrapper.close();
                } else if (interestOps == OP_REGISTER) {
                    try {
                        //注冊綁定
                        sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper);
                    } catch (Exception x) {
                        log.error(sm.getString("endpoint.nio.registerFail"), x);
                    }
                } else {
                    final SelectionKey key = sc.keyFor(getSelector());
                    if (key == null) {
                        // The key was cancelled (e.g. due to socket closure)
                        // and removed from the selector while it was being
                        // processed. Count down the connections at this point
                        // since it won't have been counted down when the socket
                        // closed.
                        socketWrapper.close();
                    } else {
                        final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment();
                        if (attachment != null) {
                            // We are registering the key to start with, reset the fairness counter.
                            try {
                                int ops = key.interestOps() | interestOps;
                                attachment.interestOps(ops);
                                key.interestOps(ops);
                            } catch (CancelledKeyException ckx) {
                                cancelledKey(key, socketWrapper);
                            }
                        } else {
                            cancelledKey(key, socketWrapper);
                        }
                    }
                }
                if (running && !paused && eventCache != null) {
                    pe.reset();//清空任務socketWrapper
                    eventCache.push(pe);
                }
            }

            return result;
        }

setSocketOptions 中的socket任務注冊

protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            NioChannel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            //...... 部分省略
            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            poller.register(socketWrapper);
            return true;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error(sm.getString("endpoint.socketOptionsError"), t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM