httpclient源碼分析之MainClientExec


MainClientExec是HTTP請求處理鏈中最后一個請求執行環節,負責與另一終端的請求/響應交互,也是很重要的類。

源碼版本是4.5.2,主要看execute方法,並在里面添加注釋。接着詳細說下獲取連接的過程。

execute方法

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) throws IOException, HttpException {
        Args.notNull(route, "HTTP route");
        Args.notNull(request, "HTTP request");
        Args.notNull(context, "HTTP context");

        //Auth相關,這里沒關注
        AuthState targetAuthState = context.getTargetAuthState();
        if (targetAuthState == null) {
            targetAuthState = new AuthState();
            context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);
        }
        AuthState proxyAuthState = context.getProxyAuthState();
        if (proxyAuthState == null) {
            proxyAuthState = new AuthState();
            context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);
        }

        if (request instanceof HttpEntityEnclosingRequest) {
            RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);
        }

        //userToken后面作為state,用來從連接池中獲取連接的時候使用,默認是null。
        //如果設置了值,會設置到連接中,再次獲取的時候,則優先取status相等的連接
        Object userToken = context.getUserToken();

        //ConnectionRequest用來獲取HttpClientConnection
        //為每一個route設置一個連接池,大小可以配置,默認為2
        //從route連接池獲取一個連接,優先取status等於userToken的。
        //這里沒有實質的操作,只是創建一個ConnectionRequest,並將獲取連接的操作封裝在ConnectionRequest中。
        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);
        if (execAware != null) {
            if (execAware.isAborted()) {
                connRequest.cancel();
                throw new RequestAbortedException("Request aborted");
            } else {
                execAware.setCancellable(connRequest);
            }
        }

        final RequestConfig config = context.getRequestConfig();

        final HttpClientConnection managedConn;
        try {
            final int timeout = config.getConnectionRequestTimeout();

            //獲取連接,這里才執行從連接池中阻塞獲取連接的操作,並設置超時時間。
            //這里返回的connection,不一定是有效的socket連接,長短連接處理方式不同。
            //如果連接沒有打開或者不可用,后面會重新建立socket連接。 
            managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new RequestAbortedException("Request aborted", interrupted);
        } catch(final ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause == null) {
                cause = ex;
            }
            throw new RequestAbortedException("Request execution failed", cause);
        }

        //將連接加入上下文中,暴露連接。 
        //context就是一個大容器,收藏各種東西,如果覺得有什么資源是需要在別的地方用到的,那就放入context吧。
        context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);

        //是否檢查連接的有效性。如果檢查不可用,就關閉連接。對於關閉的連接,后面會從三次握手開始,重新建立socket連接。
        //如果配置檢查,就相當於一個悲觀鎖,每次請求都會消耗最多30ms來檢測,影響性能。4.4版本開始就過時了。
        if (config.isStaleConnectionCheckEnabled()) {
            // validate connection,首先判斷連接是否是打開的
            if (managedConn.isOpen()) {
                this.log.debug("Stale connection check");
                //如果是打開的,進一步判斷是否可用
                if (managedConn.isStale()) {
                    this.log.debug("Stale connection detected");
                    //不可用的時候,需要關閉連接,后面再重新建立連接
                    managedConn.close();
                }
            }
        }

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);
        try {
            if (execAware != null) {
                execAware.setCancellable(connHolder);
            }

            HttpResponse response;
            for (int execCount = 1;; execCount++) {

                //請求是否冪等的,如果不是,則不能retry,拋異常
                if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {
                    throw new NonRepeatableRequestException("Cannot retry request " +
                            "with a non-repeatable request entity.");
                }

                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                //如果連接沒有打開,即連接使用的socket為null,則重新建立連接。    
                if (!managedConn.isOpen()) {
                    this.log.debug("Opening connection " + route);
                    try {
                        //建立socket連接。
                        //遍歷地址集,成功建立socket連接,就返回,封裝在connection中
                        establishRoute(proxyAuthState, managedConn, route, request, context);
                    } catch (final TunnelRefusedException ex) {
                        if (this.log.isDebugEnabled()) {
                            this.log.debug(ex.getMessage());
                        }
                        response = ex.getResponse();
                        break;
                    }
                }
                final int timeout = config.getSocketTimeout();
                if (timeout >= 0) {
                    //設置socketTimeout
                    managedConn.setSocketTimeout(timeout);
                }

                if (execAware != null && execAware.isAborted()) {
                    throw new RequestAbortedException("Request aborted");
                }

                if (this.log.isDebugEnabled()) {
                    this.log.debug("Executing request " + request.getRequestLine());
                }

                if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Target auth state: " + targetAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, targetAuthState, context);
                }
                if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Proxy auth state: " + proxyAuthState.getState());
                    }
                    this.authenticator.generateAuthResponse(request, proxyAuthState, context);
                }

                 //和服務器具體交互,發送請求頭,如果有響應,再接收響應。   
                response = requestExecutor.execute(request, managedConn, context);

                //根據配置的策略,判斷是否保持連接,永久還是一段時長
                // The connection is in or can be brought to a re-usable state.
                if (reuseStrategy.keepAlive(response, context)) {
                    // Set the idle duration of this connection
                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                        if (duration > 0) {
                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;
                        } else {
                            s = "indefinitely";
                        }
                        this.log.debug("Connection can be kept alive " + s);
                    }
                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);
                    connHolder.markReusable();
                } else {
                    connHolder.markNonReusable();
                }

                //跳過
                if (needAuthentication(
                        targetAuthState, proxyAuthState, route, response, context)) {
                    // Make sure the response body is fully consumed, if present
                    final HttpEntity entity = response.getEntity();
                    if (connHolder.isReusable()) {
                        EntityUtils.consume(entity);
                    } else {
                        managedConn.close();
                        if (proxyAuthState.getState() == AuthProtocolState.SUCCESS
                                && proxyAuthState.getAuthScheme() != null
                                && proxyAuthState.getAuthScheme().isConnectionBased()) {
                            this.log.debug("Resetting proxy auth state");
                            proxyAuthState.reset();
                        }
                        if (targetAuthState.getState() == AuthProtocolState.SUCCESS
                                && targetAuthState.getAuthScheme() != null
                                && targetAuthState.getAuthScheme().isConnectionBased()) {
                            this.log.debug("Resetting target auth state");
                            targetAuthState.reset();
                        }
                    }
                    // discard previous auth headers
                    final HttpRequest original = request.getOriginal();
                    if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {
                        request.removeHeaders(AUTH.WWW_AUTH_RESP);
                    }
                    if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {
                        request.removeHeaders(AUTH.PROXY_AUTH_RESP);
                    }
                } else {
                    break;
                }
            }

            if (userToken == null) {
                userToken = userTokenHandler.getUserToken(context);
                context.setAttribute(HttpClientContext.USER_TOKEN, userToken);
            }
            if (userToken != null) {
                connHolder.setState(userToken);
            }

            // check for entity, release connection if possible
            //判斷是否讀取了全部的響應,如果是,則釋放連接回連接池,
            //否則,也要返回連接,以便后面繼續從流中讀取響應。
            final HttpEntity entity = response.getEntity();
            if (entity == null || !entity.isStreaming()) {
                // connection not needed and (assumed to be) in re-usable state
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            } else {
                return new HttpResponseProxy(response, connHolder);
            }
        } catch (final ConnectionShutdownException ex) {
            final InterruptedIOException ioex = new InterruptedIOException(
                    "Connection has been shut down");
            ioex.initCause(ex);
            throw ioex;
        } catch (final HttpException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final IOException ex) {
            connHolder.abortConnection();
            throw ex;
        } catch (final RuntimeException ex) {
            connHolder.abortConnection();
            throw ex;
        }
    }

總結一下關心的大致流程:

  • 創建連接請求
  • 根據連接請求的參數,從連接池中獲取一個連接
  • 配置是否需要校驗連接可用性。如果檢查不可用,就關閉連接。
  • 如果連接沒有打開,則創建一個底層的socket連接。
  • 發送請求頭部(如果請求中帶有entity,則發送)
  • 如果有響應,接收響應(先接收頭部,如果有請求主體,則接收)

這里有一點注意一下:
檢測連接有效性的時候,報的是SocketTimeOut異常,而真正讀響應的時候,報的是Connection reset異常。為什么不一樣呢?我還沒找到方法驗證,但這里很可能是檢測的時間很短,只有1ms,首先觸發了SocketTimeOut異常,而實際讀響應的時候,是不會這么短時間的。

獲取連接

接下來詳細說說根據ConnectionRequest獲取HttpClientConnection。即:
managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);

首先看ConnectionRequest為何物:

    //org.apache.http.impl.conn.PoolingHttpClientConnectionManager

    @Override
    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        Args.notNull(route, "HTTP route");
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state) + formatStats(route));
        }
        //從連接池中獲取一個CPoolEntry(Connection的包裝類)
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            // ConnectionRequest的get方法。調用leaseConnection方法,並且傳入future(CPoolEntry的封裝(connection的封裝))
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
    }

所以,ConnectionRequest的get方法,實際是調用PoolingHttpClientConnectionManager的leaseConnection,返回一個HttpClientConnection。
關於獲取connection的更多詳細信息,可以參考這篇文章,詳細講述了PoolingHttpClientConnectionManager的獲取連接給用戶的方法。

補充

  • 關於config.isStaleConnectionCheckEnabled():
    如果設置每次請求檢查連接是否可用,會影響性能。4.4版本開始過時,但官方推薦使用org.apache.http.impl.conn.PoolingHttpClientConnectionManager#getValidateAfterInactivity()。詳細了解看這篇最后補充的校驗連接有效的方法,有一個案例分析。

  • 最后返回的HttpResponseProxy帶上ConnectionHolder(響應沒有一次讀完),這篇文章有一個案例了解,查看成功日志的最后幾個步驟。RestTemplate讀取擴展字段,第二次讀取數據。

另一篇關於此段源碼的解讀,見這里


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM