MainClientExec是HTTP請求處理鏈中最后一個請求執行環節,負責與另一終端的請求/響應交互,也是很重要的類。
源碼版本是4.5.2,主要看execute方法,並在里面添加注釋。接着詳細說下獲取連接的過程。
execute方法
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) throws IOException, HttpException {
Args.notNull(route, "HTTP route");
Args.notNull(request, "HTTP request");
Args.notNull(context, "HTTP context");
//Auth相關,這里沒關注
AuthState targetAuthState = context.getTargetAuthState();
if (targetAuthState == null) {
targetAuthState = new AuthState();
context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
}
AuthState proxyAuthState = context.getProxyAuthState();
if (proxyAuthState == null) {
proxyAuthState = new AuthState();
context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
}
if (request instanceof HttpEntityEnclosingRequest) {
RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
}
//userToken后面作為state,用來從連接池中獲取連接的時候使用,默認是null。
//如果設置了值,會設置到連接中,再次獲取的時候,則優先取status相等的連接
Object userToken = context.getUserToken();
//ConnectionRequest用來獲取HttpClientConnection
//為每一個route設置一個連接池,大小可以配置,默認為2
//從route連接池獲取一個連接,優先取status等於userToken的。
//這里沒有實質的操作,只是創建一個ConnectionRequest,並將獲取連接的操作封裝在ConnectionRequest中。
final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
if (execAware != null) {
if (execAware.isAborted()) {
connRequest.cancel();
throw new RequestAbortedException("Request aborted");
} else {
execAware.setCancellable(connRequest);
}
}
final RequestConfig config = context.getRequestConfig();
final HttpClientConnection managedConn;
try {
final int timeout = config.getConnectionRequestTimeout();
//獲取連接,這里才執行從連接池中阻塞獲取連接的操作,並設置超時時間。
//這里返回的connection,不一定是有效的socket連接,長短連接處理方式不同。
//如果連接沒有打開或者不可用,后面會重新建立socket連接。
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final InterruptedException interrupted) {
Thread.currentThread().interrupt();
throw new RequestAbortedException("Request aborted", interrupted);
} catch(final ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
throw new RequestAbortedException("Request execution failed", cause);
}
//將連接加入上下文中,暴露連接。
//context就是一個大容器,收藏各種東西,如果覺得有什么資源是需要在別的地方用到的,那就放入context吧。
context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);
//是否檢查連接的有效性。如果檢查不可用,就關閉連接。對於關閉的連接,后面會從三次握手開始,重新建立socket連接。
//如果配置檢查,就相當於一個悲觀鎖,每次請求都會消耗最多30ms來檢測,影響性能。4.4版本開始就過時了。
if (config.isStaleConnectionCheckEnabled()) {
// validate connection,首先判斷連接是否是打開的
if (managedConn.isOpen()) {
this.log.debug("Stale connection check");
//如果是打開的,進一步判斷是否可用
if (managedConn.isStale()) {
this.log.debug("Stale connection detected");
//不可用的時候,需要關閉連接,后面再重新建立連接
managedConn.close();
}
}
}
final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
try {
if (execAware != null) {
execAware.setCancellable(connHolder);
}
HttpResponse response;
for (int execCount = 1;; execCount++) {
//請求是否冪等的,如果不是,則不能retry,拋異常
if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
throw new NonRepeatableRequestException("Cannot retry request " +
"with a non-repeatable request entity.");
}
if (execAware != null && execAware.isAborted()) {
throw new RequestAbortedException("Request aborted");
}
//如果連接沒有打開,即連接使用的socket為null,則重新建立連接。
if (!managedConn.isOpen()) {
this.log.debug("Opening connection " + route);
try {
//建立socket連接。
//遍歷地址集,成功建立socket連接,就返回,封裝在connection中
establishRoute(proxyAuthState, managedConn, route, request, context);
} catch (final TunnelRefusedException ex) {
if (this.log.isDebugEnabled()) {
this.log.debug(ex.getMessage());
}
response = ex.getResponse();
break;
}
}
final int timeout = config.getSocketTimeout();
if (timeout >= 0) {
//設置socketTimeout
managedConn.setSocketTimeout(timeout);
}
if (execAware != null && execAware.isAborted()) {
throw new RequestAbortedException("Request aborted");
}
if (this.log.isDebugEnabled()) {
this.log.debug("Executing request " + request.getRequestLine());
}
if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
if (this.log.isDebugEnabled()) {
this.log.debug("Target auth state: " + targetAuthState.getState());
}
this.authenticator.generateAuthResponse(request, targetAuthState, context);
}
if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
if (this.log.isDebugEnabled()) {
this.log.debug("Proxy auth state: " + proxyAuthState.getState());
}
this.authenticator.generateAuthResponse(request, proxyAuthState, context);
}
//和服務器具體交互,發送請求頭,如果有響應,再接收響應。
response = requestExecutor.execute(request, managedConn, context);
//根據配置的策略,判斷是否保持連接,永久還是一段時長
// The connection is in or can be brought to a re-usable state.
if (reuseStrategy.keepAlive(response, context)) {
// Set the idle duration of this connection
final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
if (this.log.isDebugEnabled()) {
final String s;
if (duration > 0) {
s = "for " + duration + " " + TimeUnit.MILLISECONDS;
} else {
s = "indefinitely";
}
this.log.debug("Connection can be kept alive " + s);
}
connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
connHolder.markReusable();
} else {
connHolder.markNonReusable();
}
//跳過
if (needAuthentication(
targetAuthState, proxyAuthState, route, response, context)) {
// Make sure the response body is fully consumed, if present
final HttpEntity entity = response.getEntity();
if (connHolder.isReusable()) {
EntityUtils.consume(entity);
} else {
managedConn.close();
if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
&& proxyAuthState.getAuthScheme() != null
&& proxyAuthState.getAuthScheme().isConnectionBased()) {
this.log.debug("Resetting proxy auth state");
proxyAuthState.reset();
}
if (targetAuthState.getState() == AuthProtocolState.SUCCESS
&& targetAuthState.getAuthScheme() != null
&& targetAuthState.getAuthScheme().isConnectionBased()) {
this.log.debug("Resetting target auth state");
targetAuthState.reset();
}
}
// discard previous auth headers
final HttpRequest original = request.getOriginal();
if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
request.removeHeaders(AUTH.WWW_AUTH_RESP);
}
if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
request.removeHeaders(AUTH.PROXY_AUTH_RESP);
}
} else {
break;
}
}
if (userToken == null) {
userToken = userTokenHandler.getUserToken(context);
context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
}
if (userToken != null) {
connHolder.setState(userToken);
}
// check for entity, release connection if possible
//判斷是否讀取了全部的響應,如果是,則釋放連接回連接池,
//否則,也要返回連接,以便后面繼續從流中讀取響應。
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
} else {
return new HttpResponseProxy(response, connHolder);
}
} catch (final ConnectionShutdownException ex) {
final InterruptedIOException ioex = new InterruptedIOException(
"Connection has been shut down");
ioex.initCause(ex);
throw ioex;
} catch (final HttpException ex) {
connHolder.abortConnection();
throw ex;
} catch (final IOException ex) {
connHolder.abortConnection();
throw ex;
} catch (final RuntimeException ex) {
connHolder.abortConnection();
throw ex;
}
}
總結一下關心的大致流程:
- 創建連接請求
- 根據連接請求的參數,從連接池中獲取一個連接
- 配置是否需要校驗連接可用性。如果檢查不可用,就關閉連接。
- 如果連接沒有打開,則創建一個底層的socket連接。
- 發送請求頭部(如果請求中帶有entity,則發送)
- 如果有響應,接收響應(先接收頭部,如果有請求主體,則接收)
這里有一點注意一下:
檢測連接有效性的時候,報的是SocketTimeOut異常,而真正讀響應的時候,報的是Connection reset異常。為什么不一樣呢?我還沒找到方法驗證,但這里很可能是檢測的時間很短,只有1ms,首先觸發了SocketTimeOut異常,而實際讀響應的時候,是不會這么短時間的。
獲取連接
接下來詳細說說根據ConnectionRequest獲取HttpClientConnection。即:
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
首先看ConnectionRequest為何物:
//org.apache.http.impl.conn.PoolingHttpClientConnectionManager
@Override
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
}
//從連接池中獲取一個CPoolEntry(Connection的包裝類)
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
// ConnectionRequest的get方法。調用leaseConnection方法,並且傳入future(CPoolEntry的封裝(connection的封裝))
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
所以,ConnectionRequest的get方法,實際是調用PoolingHttpClientConnectionManager的leaseConnection,返回一個HttpClientConnection。
關於獲取connection的更多詳細信息,可以參考這篇文章,詳細講述了PoolingHttpClientConnectionManager的獲取連接給用戶的方法。
補充
-
關於config.isStaleConnectionCheckEnabled():
如果設置每次請求檢查連接是否可用,會影響性能。4.4版本開始過時,但官方推薦使用org.apache.http.impl.conn.PoolingHttpClientConnectionManager#getValidateAfterInactivity()。詳細了解看這篇最后補充的校驗連接有效的方法,有一個案例分析。 -
最后返回的HttpResponseProxy帶上ConnectionHolder(響應沒有一次讀完),這篇文章有一個案例了解,查看成功日志的最后幾個步驟。RestTemplate讀取擴展字段,第二次讀取數據。
另一篇關於此段源碼的解讀,見這里。