HttpClient連接池的一些思考


前言

在分布式系統中,http服務調用少不了HttpClient工具類。相信大家使用apache的HttpClient進行http的交互處理已經很長時間了,而httpclient內部其實使用了http連接池,想必大家也沒有關心過連接池的管理。事實上,通過分析httpclient源碼,發現它很優雅地隱藏了所有的連接池管理細節,開發者完全不用花太多時間去思考連接池的問題。但是連接池的一些常用配置和原理都需要我們多少掌握一些,避免錯用導致生產環境出現無法預知的bug。

我之前遇到過生產環境系統使用HttpClient工具類導致系統假死的問題。

情景1:httpclient調用服務時沒有設置超時時間,http線程都處於自身阻塞狀態,拖垮應用服務器。

情景2:httpclient連接池單個路由最大連接數設置不合理,業務高峰期時無法及時從連接池中獲取連接,導致http線程都處於BLOCKED(阻塞於鎖)狀態,逐漸拖垮應用服務器。

從以上兩個場景分析一下,首先對於超時時間的設置,要考慮http請求連接超時、http請求讀取內容超時、從連接池中獲取可用http連接超時,要根據具體的業務場景合理化配置。還有就是最大連接池數和每個路由最大連接數配置,也需要考慮哪些業務是否有業務高峰期然后合理配置,針對特殊的業務設置使用唯一的HttpClient,即為該業務創建指定的線程池。

Apache官網例子

CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpget = new HttpGet("http://localhost/");
CloseableHttpResponse response = httpclient.execute(httpget);
try {
    HttpEntity entity = response.getEntity();
    if (entity != null) {
        long len = entity.getContentLength();
        if (len != -1 && len < 2048) {
            System.out.println(EntityUtils.toString(entity));
        } else {
            // Stream content out
        }
    }
} finally {
    response.close();
}

HttpClient及其連接池配置

  • 整個線程池中最大連接數 MAX_CONNECTION_TOTAL = 800
  • 路由到某台主機最大並發數,是MAX_CONNECTION_TOTAL(整個線程池中最大連接數)的一個細分 ROUTE_MAX_COUNT = 500
  • 重試次數,防止失敗情況 RETRY_COUNT = 3
  • 客戶端和服務器建立連接的超時時間 CONNECTION_TIME_OUT = 5000
  • 客戶端從服務器讀取數據的超時時間 READ_TIME_OUT = 7000
  • 從連接池中獲取連接的超時時間 CONNECTION_REQUEST_TIME_OUT = 5000
  • 連接空閑超時,清楚閑置的連接 CONNECTION_IDLE_TIME_OUT = 5000
  • 連接保持存活時間 DEFAULT_KEEP_ALIVE_TIME_MILLIS = 20 * 1000

MaxtTotal和DefaultMaxPerRoute的區別

  • MaxtTotal是整個池子的大小;
  • DefaultMaxPerRoute是根據連接到的主機對MaxTotal的一個細分;

比如:MaxtTotal=400,DefaultMaxPerRoute=200,而我只連接到http://hjzgg.com時,到這個主機的並發最多只有200;而不是400;而我連接到http://qyxjj.com 和 http://httls.com時,到每個主機的並發最多只有200;即加起來是400(但不能超過400)。所以起作用的設置是DefaultMaxPerRoute。

HttpClient連接池模型

HttpClient從連接池中獲取連接源碼分析

org.apache.http.pool.AbstractConnPool

private E getPoolEntryBlocking(
        final T route, final Object state,
        final long timeout, final TimeUnit tunit,
        final PoolEntryFuture<E> future)
            throws IOException, InterruptedException, TimeoutException {


    Date deadline = null;
    if (timeout > 0) {
        deadline = new Date
            (System.currentTimeMillis() + tunit.toMillis(timeout));
    }


    this.lock.lock();
    try {
        final RouteSpecificPool<T, C, E> pool = getPool(route);//這是每一個路由細分出來的連接池
        E entry = null;
        while (entry == null) {
            Asserts.check(!this.isShutDown, "Connection pool shut down");
            //從池子中獲取一個可用連接並返回
            for (;;) {
                entry = pool.getFree(state);
                if (entry == null) {
                    break;
                }
                if (entry.isExpired(System.currentTimeMillis())) {
                    entry.close();
                } else if (this.validateAfterInactivity > 0) {
                    if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                        if (!validate(entry)) {
                            entry.close();
                        }
                    }
                }
                if (entry.isClosed()) {
                    this.available.remove(entry);
                    pool.free(entry, false);
                } else {
                    break;
                }
            }
            if (entry != null) {
                this.available.remove(entry);
                this.leased.add(entry);
                onReuse(entry);
                return entry;
            }

            //創建新的連接
            // New connection is needed
            final int maxPerRoute = getMax(route);//獲取當前路由最大並發數
            // Shrink the pool prior to allocating a new connection
            final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
            if (excess > 0) {//如果當前路由對應的連接池的連接超過最大路由並發數,獲取到最后使用的一次連接,釋放掉
                for (int i = 0; i < excess; i++) {
                    final E lastUsed = pool.getLastUsed();
                    if (lastUsed == null) {
                        break;
                    }
                    lastUsed.close();
                    this.available.remove(lastUsed);
                    pool.remove(lastUsed);
                }
            }

            //嘗試創建新的連接 
            if (pool.getAllocatedCount() < maxPerRoute) {//當前路由對應的連接池可用空閑連接數+當前路由對應的連接池已用連接數 < 當前路由對應的連接池最大並發數
                final int totalUsed = this.leased.size();
                final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                if (freeCapacity > 0) {
                    final int totalAvailable = this.available.size();
                    if (totalAvailable > freeCapacity - 1) {//線程池中可用空閑連接數 > (線程池中最大連接數 - 線程池中已用連接數 - 1)
                        if (!this.available.isEmpty()) {
                            final E lastUsed = this.available.removeLast();
                            lastUsed.close();
                            final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                            otherpool.remove(lastUsed);
                        }
                    }
                    final C conn = this.connFactory.create(route);
                    entry = pool.add(conn);
                    this.leased.add(entry);
                    return entry;
                }
            }


            boolean success = false;
            try {
                pool.queue(future);
                this.pending.add(future);
                success = future.await(deadline);
            } finally {
                // In case of 'success', we were woken up by the
                // connection pool and should now have a connection
                // waiting for us, or else we're shutting down.
                // Just continue in the loop, both cases are checked.
                pool.unqueue(future);
                this.pending.remove(future);
            }
            // check for spurious wakeup vs. timeout
            if (!success && (deadline != null) &&
                (deadline.getTime() <= System.currentTimeMillis())) {
                break;
            }
        }
        throw new TimeoutException("Timeout waiting for connection");
    } finally {
        this.lock.unlock();
    }
}

HttpClient從連接池中獲取連接流程圖

連接重用和保持策略

http的長連接復用, 其判定規則主要分兩類。
  1. http協議支持+請求/響應header指定
  2. 一次交互處理的完整性(響應內容消費干凈)
  對於前者, httpclient引入了ConnectionReuseStrategy來處理, 默認的采用如下的約定:

  • HTTP/1.0通過在Header中添加Connection:Keep-Alive來表示支持長連接。
  • HTTP/1.1默認支持長連接, 除非在Header中顯式指定Connection:Close, 才被視為短連接模式。

HttpClientBuilder創建MainClientExec

ConnectionReuseStrategy(連接重用策略) 

org.apache.http.impl.client.DefaultClientConnectionReuseStrategy

MainClientExec處理連接

處理完請求后,獲取到response,通過ConnectionReuseStrategy判斷連接是否可重用,如果是通過ConnectionKeepAliveStrategy獲取到連接最長有效時間,並設置連接可重用標記。

連接重用判斷邏輯

  • request首部中包含Connection:Close,不復用
  • response中Content-Length長度設置不正確,不復用
  • response首部包含Connection:Close,不復用
  • reponse首部包含Connection:Keep-Alive,復用
  • 都沒命中的情況下,如果HTTP版本高於1.0則復用

更多參考:https://www.cnblogs.com/mumuxinfei/p/9121829.html

連接釋放原理分析

HttpClientBuilder會構建一個InternalHttpClient實例,也是CloseableHttpClient實例。InternalHttpClient的doExecute方法來完成一次request的執行。

會繼續調用MainClientExec的execute方法,通過連接池管理者獲取連接(HttpClientConnection)。

構建ConnectionHolder類型對象,傳遞連接池管理者對象和當前連接對象。

請求執行完返回HttpResponse類型對象,然后包裝成HttpResponseProxy對象(是CloseableHttpResponse實例)返回。

CloseableHttpClient類其中一個execute方法如下,finally方法中會調用HttpResponseProxy對象的close方法釋放連接。

最終調用ConnectionHolder的releaseConnection方法釋放連接。

CloseableHttpClient類另一個execute方法如下,返回一個HttpResponseProxy對象(是CloseableHttpResponse實例)。 

這種情況下調用者獲取了HttpResponseProxy對象,可以直接拿到HttpEntity對象。大家關心的就是操作完HttpEntity對象,使用完InputStream到底需不需要手動關閉流呢?

其實調用者不需要手動關閉流,因為HttpResponseProxy構造方法里有增強HttpEntity的處理方法,如下。

調用者最終拿到的HttpEntity對象是ResponseEntityProxy實例。

ResponseEntityProxy重寫了獲取InputStream的方法,返回的是EofSensorInputStream類型的InputStream對象。

EofSensorInputStream對象每次讀取都會調用checkEOF方法,判斷是否已經讀取完畢。

checkEOF方法會調用ResponseEntityProxy(實現了EofSensorWatcher接口)對象的eofDetected方法。

EofSensorWatcher#eofDetected方法中會釋放連接並關閉流。

 

綜上,通過CloseableHttpClient實例處理請求,無需調用者手動釋放連接

HttpClient在Spring中應用

創建ClientHttpRequestFactory

@Bean
public ClientHttpRequestFactory clientHttpRequestFactory()
        throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
    HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
    SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (arg0, arg1) -> true).build();

    httpClientBuilder.setSSLContext(sslContext)
            .setMaxConnTotal(MAX_CONNECTION_TOTAL)
            .setMaxConnPerRoute(ROUTE_MAX_COUNT)
            .evictIdleConnections(CONNECTION_IDLE_TIME_OUT, TimeUnit.MILLISECONDS);

    httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(RETRY_COUNT, true));
    httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
    CloseableHttpClient client = httpClientBuilder.build();

    HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(client);
    clientHttpRequestFactory.setConnectTimeout(CONNECTION_TIME_OUT);
    clientHttpRequestFactory.setReadTimeout(READ_TIME_OUT);
    clientHttpRequestFactory.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT);
    clientHttpRequestFactory.setBufferRequestBody(false);
    return clientHttpRequestFactory;
}

創建RestTemplate

@Bean
public RestTemplate restTemplate()  {
   RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory());
        restTemplate.setErrorHandler(new DefaultResponseErrorHandler());
        // 修改StringHttpMessageConverter內容轉換器
        restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
        return restTemplate;
}

 Spring官網例子

@SpringBootApplication
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String args[]) {
        SpringApplication.run(Application.class);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();
    }

    @Bean
    public CommandLineRunner run(RestTemplate restTemplate) throws Exception {
        return args -> {
            Quote quote = restTemplate.getForObject( "https://gturnquist-quoters.cfapps.io/api/random", Quote.class);
            log.info(quote.toString());
        };
    }
}

應用場景和效果

項目中主要應用場景

微門戶系統和騰訊、支付寶第三方服務交互,自定義專屬Httpclient連接池。

微門戶內部系統之間交互,通用HttpClient連接池。尤其查詢用戶信息、和微幣、手機號段、自助服務這種核心查詢服務,量級非常大,一定要使用連接池。

微門戶系統和跨團隊系統、跨部門系統使用CSF客戶端,新版CSF客戶端也是使用HttpClient連接池技術。

連接池優點和效果提升

連接池的連接資源重用策略,避免了頻繁創建、釋放連接引起的大量性能開銷,增加了系統運行的平穩性。

連接池統一連接管理策略,避免連接資源泄漏或者大量連接資源創建導致內存溢出。

模擬並發場景,和騰訊使用連接池平均響應時間在50ms~80ms之間。為啥性能提升的這么明顯呢?其主要原因就是省去了大量連接建立與釋放的時間。

總結

Apache的HttpClient組件可謂良心之作,細細的品味一下源碼可以學到很多設計模式和比編碼規范。不過在閱讀源碼之前最好了解一下不同版本的HTTP協議,尤其是HTTP協議的Keep-Alive模式。使用Keep-Alive模式(又稱持久連接、連接重用)時,Keep-Alive功能使客戶端到服 務器端的連接持續有效,當出現對服務器的后繼請求時,Keep-Alive功能避免了建立或者重新建立連接。這里推薦一篇參考鏈接:https://www.jianshu.com/p/49551bda6619。

個人感覺分布式系統項目中都應該封裝一套HttpClient工具類,針對常用參數、連接池信息、服務調用、出網代理、支持Https方式等合理包裝,哪些業務可以共用一個HttpClient,哪些業務只能使用專屬的HttpClient,避免誤用導致生產環境發生無法預知的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM