摘要:本文主要介紹了tomcat內部處理HTTP請求的Connector部分
在上一篇文章中已經介紹了tomcat在能處理HTTP請求之前所做的准備,今天這篇文章就開始正式開始介紹tomcat處理HTTP請求。在上篇文章說到下面代碼:
//代碼清單1 在JIOEndpoint中
@Override
protected AbstractEndpoint.Acceptor createAcceptor() {
return new Acceptor();
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
JIOEndpoint的內部類
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
//各種條件判斷略
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
Socket socket = null;
try {
// Accept the next incoming connection from the server
// socket
//1111111111
socket = serverSocketFactory.acceptSocket(serverSocket);
} catch (IOException ioe) {
//異常處理 略
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
//22222222 設置socket的各種屬性
if (running && !paused && setSocketOptions(socket)) {
// Hand this socket off to an appropriate processor
//33333333
if (!processSocket(socket)) {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} else {
countDownConnection();
// Close socket right away
closeSocket(socket);
}
} catch (IOException x) {
//異常處理 略
} catch (NullPointerException npe) {
//異常處理 略
} catch (Throwable t) {
//異常處理 略
}
}
state = AcceptorState.ENDED;
}
}
上篇文章我們知道在標注1的地方會阻塞,一直到有請求過來才會繼續往下走。在標注2的地方setSocketOptions(socket)
設置socket
的各種屬性:
//代碼清單2
protected boolean setSocketOptions(Socket socket) {
try {
// 1: Set socket options: timeout, linger, etc
// 設置socket的參數
socketProperties.setProperties(socket);
} catch (SocketException s) {
//error here is common if the client has reset the connection
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.unexpected"), s);
}
// Close the socket
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.err.unexpected"), t);
// Close the socket
return false;
}
return true;
}
public void setProperties(Socket socket) throws SocketException{
if (rxBufSize != null)
socket.setReceiveBufferSize(rxBufSize.intValue());
if (txBufSize != null)
socket.setSendBufferSize(txBufSize.intValue());
if (ooBInline !=null)
socket.setOOBInline(ooBInline.booleanValue());
if (soKeepAlive != null)
socket.setKeepAlive(soKeepAlive.booleanValue());
if (performanceConnectionTime != null && performanceLatency != null &&
performanceBandwidth != null)
socket.setPerformancePreferences(
performanceConnectionTime.intValue(),
performanceLatency.intValue(),
performanceBandwidth.intValue());
if (soReuseAddress != null)
socket.setReuseAddress(soReuseAddress.booleanValue());
if (soLingerOn != null && soLingerTime != null)
socket.setSoLinger(soLingerOn.booleanValue(),
soLingerTime.intValue());
if (soTimeout != null && soTimeout.intValue() >= 0)
socket.setSoTimeout(soTimeout.intValue());
if (tcpNoDelay != null)
socket.setTcpNoDelay(tcpNoDelay.booleanValue());
}
繼續看processSocket(socket)
:
//代碼清單3
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
//將socket包裝成了 socketWrapper類
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
//獲取線程池執行 `SocketProcessor` 任務
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
//異常處理 略
} catch (Throwable t) {
//異常處理 略
}
return true;
}
可以看到先把socket
包裝成socketWrapper
類,緊接着將socketwrapper
對象作為構造函數的參數傳遞給SocketProcessor
類並且提交給線程池執行,可見SocketProcessor
類是個Runnable,查看SocketProcessor
類的run
方法。
//代碼清單4
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
if ((state != SocketState.CLOSED)) {
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
//1111111111 使用handler 處理socket
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT ||
state == SocketState.UPGRADED){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
} catch (RejectedExecutionException x) {
log.warn("Socket reprocessing request was rejected for:"+socket,x);
try {
//unable to handle connection at this time
handler.process(socket, SocketStatus.DISCONNECT);
} finally {
countDownConnection();
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
}
代碼雖然長,但是都是對很多情況做了很多判斷處理。主要執行的是在標注1的地方調用handler
的process(socket)
處理socket
。而因為SocketProcessor
是JIoEndpoint
的內部類,所以handler
變量是JIoEndpoint
的成員變量。在上一篇文章中,我們在說JIoEndPoint
是在Http11Protocol
類的構造函數中初始化的,而handler
變量也是在這里設置的:
//代碼清單5
public Http11Protocol() {
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
所以handler
變量是Http11ConnectionHandler
的實例。我們繼續查看其process(socket)
方法,最終我們在其父類AbstractConnectionHandler
中找到了實現:(方法有刪減)
//代碼清單6
public SocketState process(SocketWrapper<S> wrapper,SocketStatus status) {
//基本判斷
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
//獲取包裝類的socket
S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
//從Socket的連接緩存connections(用於緩存長連接的Socket)中獲取Socket對應的Http11Processor
Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.markAsContainerThread();
try {
//如果連接緩存connections中不存在Socket對應的Http11Processor,則從可以循環使用的recycledProcessors(類型為ConcurrentLinkedQueue)中獲取
if (processor == null) {
processor = recycledProcessors.poll();
}
//如果recycledProcessors中也沒有可以使用的Http11Processor,則調用createProcessor方法創建ttp11Processor
if (processor == null) {
//1111111
processor = createProcessor();
}
//如果Connector 標簽設置了 SSLEnabled=true 需要給Http11Processor 設置SSL相關的屬性
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.CLOSE_NOW) {
processor.errorDispatch();
state = SocketState.CLOSED;
} else if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync()) {
//222222222
//如果 processor 是異步的 調用 asyncDispatch 方法處理
state = processor.asyncDispatch(status);
} else if (state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
// release() won't get called so in case this request
// takes a long time to process remove the socket from
// the waiting requests now else the async timeout will
// fire
getProtocol().endpoint.removeWaitingRequest(wrapper);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data
// isn't processed now, execution will exit this
// loop and call release() which will recycle the
// processor (and input buffer) deleting any
// pipe-lined data. To avoid this, process it now.
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.getUpgradeInbound() != null) {
state = processor.upgradeDispatch();
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else {
// 33333333
state = processor.process(wrapper);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
} else if (state == SocketState.UPGRADING_TOMCAT) {
// Get the UpgradeInbound handler
org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
processor.getUpgradeInbound();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the light-weight upgrade processor
processor = createUpgradeProcessor(wrapper, inbound);
inbound.onUpgradeComplete();
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT);
//一直到最后都是 socket的后續處理
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket will be re-added to the
// poller
connections.remove(socket);
release(wrapper, processor, false, false);
} else if (state == SocketState.UPGRADED) {
// Need to keep the connection associated with the processor
connections.put(socket, processor);
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
processor.getHttpUpgradeHandler().destroy();
} else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
// NO-OP
} else {
release(wrapper, processor, true, false);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(
sm.getString("abstractConnectionHandler.error"), e);
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
&& !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}
源碼雖然長了點,但是主要做的事情就在標注2和標注2的地方,在標注2的地方主要針對的是異步的處理,標注3的地方針對的是同步的處理,我們以同步為例繼續查看其源碼之前,我們先看下標注1的地方如果沒有找到合適的processor
處理器,我們需要新建一個processor
,查看下createProcessor()
方法的源碼:
//代碼清單7
@Override
protected Http11Processor createProcessor() {
Http11Processor processor = new Http11Processor(
proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint,
proto.getMaxTrailerSize(), proto.getAllowedTrailerHeadersAsSet(),
proto.getMaxExtensionSize(), proto.getMaxSwallowSize());
//1111111111
processor.setAdapter(proto.adapter);
processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());
processor.setConnectionUploadTimeout(
proto.getConnectionUploadTimeout());
processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
processor.setCompressionMinSize(proto.getCompressionMinSize());
processor.setCompression(proto.getCompression());
processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
processor.setSocketBuffer(proto.getSocketBuffer());
processor.setMaxSavePostSize(proto.getMaxSavePostSize());
processor.setServer(proto.getServer());
processor.setDisableKeepAlivePercentage(
proto.getDisableKeepAlivePercentage());
register(processor);
return processor;
}
這個方法就是新建個Http11Processor
對象然后為這個對象設置許多屬性,這里我們主要看標注1的地方,為processor
對象設置adapter
對象,指向的是proto
變量的adapter
對象,我們查看下聲明:
//代碼清單8
protected Http11Protocol proto;
指向的是Http11Protocol
類,而createProcessor()
是Http11ConnectionHandler
類的方法,而類Http11ConnectionHandler
初始化的時候是在Http11Protocol
的構造函數中:
//代碼清單9
public Http11Protocol() {
endpoint = new JIoEndpoint();
cHandler = new Http11ConnectionHandler(this);
((JIoEndpoint) endpoint).setHandler(cHandler);
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
所以其實指向的就是一開始初始化的Http11Protocol
對象。而我們在上一篇文章中講到Connector
的init()
方法的時候,看到其中有這樣的幾行代碼:
//代碼清單10
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
這里protocolHandler
指向的就是Http11Protocol
實例,所以就是在Connector
的init()
方法里設置了adapter
對象,而adapter
對象指向的是CoyoteAdapter
的實例,所以在創建Http11Processor
對象的時候,設置他的adapter
屬性指向的也是這個CoyoteAdapter
實例。那么createProcessor()
方法就看完了,我們繼續查看上面標注3的processor.process(wrapper)
方法(同步socket處理),最終我們在AbstractHttp11Processor
類中找到了process()
方法:
//代碼清單11
@Override
public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);//初始化 輸入流
getOutputBuffer().init(socketWrapper, endpoint);//初始化 輸出流
// Flags
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}
if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}
while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
upgradeInbound == null &&
httpUpgradeHandler == null && !endpoint.isPaused()) {
// Parsing the request header
try {
setRequestLineReadTimeout();
// 解析請求行
if (!getInputBuffer().parseRequestLine(keptAlive)) {
if (handleIncompleteRequestLineRead()) {
break;
}
}
if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
// Currently only NIO will ever return false here
// 解析請求頭
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
//異常處理 略
} catch (Throwable t) {
//異常處理 略
}
if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
//設置過濾器 協議,方法,請求頭等
prepareRequest();
} catch (Throwable t) {
//異常處理 略
}
}
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
//111111111111 將request response 交給adapter處理
adapter.service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
} catch (InterruptedIOException e) {
//異常處理 略
} catch (HeadersTooLargeException e) {
//異常處理 略
} catch (Throwable t) {
//異常處理 略
}
}
// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
if (!isAsync() && !comet) {
if (getErrorState().isError()) {
// If we know we are closing the connection, don't drain
// input. This way uploading a 100GB file doesn't tie up the
// thread if the servlet has rejected it.
getInputBuffer().setSwallowInput(false);
} else {
// Need to check this again here in case the response was
// committed before the error that requires the connection
// to be closed occurred.
checkExpectationAndResponseStatus();
}
endRequest();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();
if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}
if (!disableUploadTimeout) {
if(endpoint.getSoTimeout() > 0) {
setSocketTimeout(endpoint.getSoTimeout());
} else {
setSocketTimeout(0);
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync() || comet) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else if (getUpgradeInbound() != null) {
return SocketState.UPGRADING_TOMCAT;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}
代碼比較重要的地方都加上了注釋,其中關於請求行請求頭的解析因為篇幅問題就不展開敘述了,需要的可以自行查看,代碼不算太復雜。在代碼中我們能看到Http11Processor
最終把socket
封裝成為request
和response
,然后把request
和response
交給了adapter
來處理,我們繼續查看adapter
的處理方法service()
:
//代碼清單12
/**
* Service method.
*/
@Override
public void service(org.apache.coyote.Request req,org.apache.coyote.Response res) throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringEncoding
(connector.getURIEncoding());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean comet = false;
boolean async = false;
boolean postParseSuccess = false;
try {
// Parse and set Catalina and configuration specific
// request parameters
req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
//1111111111 解析request 包含了對request的相當多的操作
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
//22222222222 調用StandardEngine的pipeline 處理request和response
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
// Invoke a read event right away if there are available bytes
if (event(req, res, SocketStatus.OPEN_READ)) {
comet = true;
res.action(ActionCode.COMET_BEGIN, null);
} else {
return;
}
} else {
comet = true;
res.action(ActionCode.COMET_BEGIN, null);
}
} else {
// Clear the filter chain, as otherwise it will not be reset elsewhere
// since this is a Comet request
request.setFilterChain(null);
}
}
}
if (request.isAsync()) {
async = true;
} else if (!comet) {
try {
request.finishRequest();
response.finishResponse();
} finally {
if (postParseSuccess) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
// If context is null this was the start of a comet request
// that failed and has already been logged.
((Context) request.getMappingData().context).logAccess(
request, response,
System.currentTimeMillis() - req.getStartTime(),
false);
}
req.action(ActionCode.POST_REQUEST , null);
}
}
} catch (IOException e) {
// Ignore
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);
// Recycle the wrapper request and response
if (!comet && !async || error.get()) {
request.recycle();
response.recycle();
} else {
// Clear converters so that the minimum amount of memory
// is used by this processor
request.clearEncoders();
response.clearEncoders();
}
}
}
方法很長,但是需要關注的地方就標注1的地方和標注的2的地方。在標注1的地方調用了私有方法postParseRequest()
來將org.apache.coyote.Request
中的內容解析到org.apache.catalina.connector.Request
。在標注2的地方connector.getService().getContainer()
返回的是StandardEngine
容器,因為StandardEngine
只有基本閥所以getPipeline().getFirst()
返回的應該是StandardEngineValve
,所以標注2的地方最終是調用了StandardEngineValve
的invoke()
方法,傳遞參數是解析完成的org.apache.catalina.connector.Request
和org.apache.catalina.connector.Response
(關於容器和pipeline的疑問可以查看之前相關文章)。
我們來查看下私有方法postParseRequest()
:
//代碼清單13
/**
* Parse additional request parameters.
*/
protected boolean postParseRequest(org.apache.coyote.Request req,
Request request,
org.apache.coyote.Response res,
Response response)
throws Exception {
// XXX the processor may have set a correct scheme and port prior to this point,
// in ajp13 protocols dont make sense to get the port from the connector...
// otherwise, use connector configuration
if (! req.scheme().isNull()) {
// use processor specified scheme to determine secure state
request.setSecure(req.scheme().equals("https"));
} else {
// use connector scheme and secure configuration, (defaults to
// "http" and false respectively)
req.scheme().setString(connector.getScheme());
request.setSecure(connector.getSecure());
}
// FIXME: the code below doesnt belongs to here,
// this is only have sense
// in Http11, not in ajp13..
// At this point the Host header has been processed.
// Override if the proxyPort/proxyHost are set
String proxyName = connector.getProxyName();
int proxyPort = connector.getProxyPort();
if (proxyPort != 0) {
req.setServerPort(proxyPort);
}
if (proxyName != null) {
req.serverName().setString(proxyName);
}
// Copy the raw URI to the decodedURI
MessageBytes decodedURI = req.decodedURI();
decodedURI.duplicate(req.requestURI());
// Parse the path parameters. This will:
// - strip out the path parameters
// - convert the decodedURI to bytes
//解析請求中的參數
parsePathParameters(req, request);
// URI decoding
// %xx decoding of the URL
try {
//URI decoding的轉換
req.getURLDecoder().convert(decodedURI, false);
} catch (IOException ioe) {
res.setStatus(400);
res.setMessage("Invalid URI: " + ioe.getMessage());
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Normalization
//處理請求中的特殊字符 比如// ./ ../ \ 等
if (!normalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Character decoding
//將字節轉換為字符
convertURI(decodedURI, request);
// Check that the URI is still normalized
// 判斷URI中是否包含特殊字符
if (!checkNormalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI character encoding");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Request mapping.
MessageBytes serverName;
if (connector.getUseIPVHosts()) {
serverName = req.localName();
if (serverName.isNull()) {
// well, they did ask for it
res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
}
} else {
serverName = req.serverName();
}
if (request.isAsyncStarted()) {
//TODO SERVLET3 - async
//reset mapping data, should prolly be done elsewhere
request.getMappingData().recycle();
}
// Version for the second mapping loop and
// Context that we expect to get for that version
String version = null;
Context versionContext = null;
boolean mapRequired = true;
while (mapRequired) {
// This will map the the latest version by default
//匹配請求對應的Host Context Wrapper
// 1111111111111
connector.getMapper().map(serverName, decodedURI, version,
request.getMappingData());
request.setContext((Context) request.getMappingData().context);
request.setWrapper((Wrapper) request.getMappingData().wrapper);
// If there is no context at this point, it is likely no ROOT context
// has been deployed
if (request.getContext() == null) {
res.setStatus(404);
res.setMessage("Not found");
// No context, so use host
Host host = request.getHost();
// Make sure there is a host (might not be during shutdown)
if (host != null) {
host.logAccess(request, response, 0, true);
}
return false;
}
// Now we have the context, we can parse the session ID from the URL
// (if any). Need to do this before we redirect in case we need to
// include the session id in the redirect
String sessionID;
if (request.getServletContext().getEffectiveSessionTrackingModes()
.contains(SessionTrackingMode.URL)) {
// Get the session ID if there was one
sessionID = request.getPathParameter(
SessionConfig.getSessionUriParamName(
request.getContext()));
if (sessionID != null) {
request.setRequestedSessionId(sessionID);
request.setRequestedSessionURL(true);
}
}
// Look for session ID in cookies and SSL session
//從cookie中解析sessionId
parseSessionCookiesId(req, request);
//從ssl中解析sessionId
parseSessionSslId(request);
sessionID = request.getRequestedSessionId();
mapRequired = false;
if (version != null && request.getContext() == versionContext) {
// We got the version that we asked for. That is it.
} else {
version = null;
versionContext = null;
Object[] contexts = request.getMappingData().contexts;
// Single contextVersion means no need to remap
// No session ID means no possibility of remap
if (contexts != null && sessionID != null) {
// Find the context associated with the session
for (int i = (contexts.length); i > 0; i--) {
Context ctxt = (Context) contexts[i - 1];
if (ctxt.getManager().findSession(sessionID) != null) {
// We found a context. Is it the one that has
// already been mapped?
if (!ctxt.equals(request.getMappingData().context)) {
// Set version so second time through mapping
// the correct context is found
version = ctxt.getWebappVersion();
versionContext = ctxt;
// Reset mapping
request.getMappingData().recycle();
mapRequired = true;
// Recycle session info in case the correct
// context is configured with different settings
request.recycleSessionInfo();
}
break;
}
}
}
}
if (!mapRequired && request.getContext().getPaused()) {
// Found a matching context but it is paused. Mapping data will
// be wrong since some Wrappers may not be registered at this
// point.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Should never happen
}
// Reset mapping
request.getMappingData().recycle();
mapRequired = true;
}
}
// Possible redirect
MessageBytes redirectPathMB = request.getMappingData().redirectPath;
if (!redirectPathMB.isNull()) {
String redirectPath = urlEncoder.encode(redirectPathMB.toString());
String query = request.getQueryString();
if (request.isRequestedSessionIdFromURL()) {
// This is not optimal, but as this is not very common, it
// shouldn't matter
redirectPath = redirectPath + ";" +
SessionConfig.getSessionUriParamName(
request.getContext()) +
"=" + request.getRequestedSessionId();
}
if (query != null) {
// This is not optimal, but as this is not very common, it
// shouldn't matter
redirectPath = redirectPath + "?" + query;
}
response.sendRedirect(redirectPath);
request.getContext().logAccess(request, response, 0, true);
return false;
}
// Filter trace method
if (!connector.getAllowTrace()
&& req.method().equalsIgnoreCase("TRACE")) {
Wrapper wrapper = request.getWrapper();
String header = null;
if (wrapper != null) {
String[] methods = wrapper.getServletMethods();
if (methods != null) {
for (int i=0; i<methods.length; i++) {
if ("TRACE".equals(methods[i])) {
continue;
}
if (header == null) {
header = methods[i];
} else {
header += ", " + methods[i];
}
}
}
}
res.setStatus(405);
res.addHeader("Allow", header);
res.setMessage("TRACE method is not allowed");
request.getContext().logAccess(request, response, 0, true);
return false;
}
doConnectorAuthenticationAuthorization(req, request);
return true;
}
方法很長但是沒做任何刪減,重要的邏輯都加了注釋,因為在講解session的文章中我們查看過解析session的部分方法,所以這里我們只看一個比較重要的,其他的有興趣可以自行查看。根據請求匹配host,context的方法:
//代碼清單14
connector.getMapper().map(serverName,decodedURI, version,request.getMappingData());
我們查看connector
的getMapper()
方法:
//代碼清單15
/**
* Mapper.
*/
protected Mapper mapper = new Mapper();
/**
* Return the mapper.
*/
public Mapper getMapper() {
return (mapper);
}
可以看到返回的mapper
對象就是上篇文章中介紹的Mapper
的實例對象,map()
方法調用的就是Mapper
類的map()
方法:
//代碼清單16
/**
* Map the specified host name and URI, mutating the given mapping data.
*
* @param host Virtual host name
* @param uri URI
* @param mappingData This structure will contain the result of the mapping
* operation
*/
public void map(MessageBytes host, MessageBytes uri, String version,
MappingData mappingData)
throws Exception {
if (host.isNull()) {
host.getCharChunk().append(defaultHostName);
}
host.toChars();
uri.toChars();
internalMap(host.getCharChunk(), uri.getCharChunk(), version,
mappingData);
}
最后調用了internalMap()
方法:
//代碼清單17
/**
* Map the specified URI.
*/
private final void internalMap(CharChunk host, CharChunk uri,
String version, MappingData mappingData) throws Exception {
if (mappingData.host != null) {
// The legacy code (dating down at least to Tomcat 4.1) just
// skipped all mapping work in this case. That behaviour has a risk
// of returning an inconsistent result.
// I do not see a valid use case for it.
throw new AssertionError();
}
uri.setLimit(-1);
// Virtual host mapping
Host[] hosts = this.hosts;
//查找匹配的host
Host mappedHost = exactFindIgnoreCase(hosts, host);
if (mappedHost == null) {
if (defaultHostName == null) {
return;
}
//如果查找不到 使用defaultHostName再次進行查詢
mappedHost = exactFind(hosts, defaultHostName);
if (mappedHost == null) {
return;
}
}
//設置匹配到的host
mappingData.host = mappedHost.object;
// Context mapping
ContextList contextList = mappedHost.contextList;
Context[] contexts = contextList.contexts;
int nesting = contextList.nesting;
//查找匹配的context
int pos = find(contexts, uri);
if (pos == -1) {
return;
}
int lastSlash = -1;
int uriEnd = uri.getEnd();
int length = -1;
boolean found = false;
Context context = null;
while (pos >= 0) {
context = contexts[pos];
//比對context
if (uri.startsWith(context.name)) {
length = context.name.length();
if (uri.getLength() == length) {
found = true;
break;
} else if (uri.startsWithIgnoreCase("/", length)) {
found = true;
break;
}
}
if (lastSlash == -1) {
lastSlash = nthSlash(uri, nesting + 1);
} else {
lastSlash = lastSlash(uri);
}
uri.setEnd(lastSlash);
pos = find(contexts, uri);
}
uri.setEnd(uriEnd);
if (!found) {
if (contexts[0].name.equals("")) {
context = contexts[0];
} else {
context = null;
}
}
if (context == null) {
return;
}
mappingData.contextPath.setString(context.name);
ContextVersion contextVersion = null;
ContextVersion[] contextVersions = context.versions;
final int versionCount = contextVersions.length;
if (versionCount > 1) {
Object[] contextObjects = new Object[contextVersions.length];
for (int i = 0; i < contextObjects.length; i++) {
contextObjects[i] = contextVersions[i].object;
}
mappingData.contexts = contextObjects;
if (version != null) {
contextVersion = exactFind(contextVersions, version);
}
}
if (contextVersion == null) {
// Return the latest version
// The versions array is known to contain at least one element
contextVersion = contextVersions[versionCount - 1];
}
//設置匹配的context
mappingData.context = contextVersion.object;
mappingData.contextSlashCount = contextVersion.slashCount;
// Wrapper mapping
if (!contextVersion.isPaused()) {
//設置匹配的wrapper
internalMapWrapper(contextVersion, uri, mappingData);
}
}
看到這里postParseRequest()
方法就看完了,可以看到該方法對將org.apache.coyote.Request
中的內容解析到org.apache.catalina.connector.Request
中。再多說一句 adapter
是適配器的意思他起到的作用正是把org.apache.coyote.Request
適配給org.apache.catalina.connector.Request
,這里使用的正是設計模式中的適配器模式。
一個HTTP請求的流轉從tomcat接收開始,從endPoint
接收請求到交由Http11ConnectionHandler
處理,再交由Http11Processor
處理,再交由CoyoteAdapter
處理,到這里那么HTTP請求的處理Connector
部分以及將請求從Connector
適配到Container
處理CoyoteAdapter
部分都已講解完畢,后面我們會繼續查看處理的后半段也就是Container
部分是如何處理Connector
傳遞過來的HTTP請求的。