Http 持久連接與 HttpClient 連接池


一、背景

HTTP協議是無狀態的協議,即每一次請求都是互相獨立的。因此它的最初實現是,每一個http請求都會打開一個tcp socket連接,當交互完畢后會關閉這個連接。

HTTP協議是全雙工的協議,所以建立連接與斷開連接是要經過三次握手與四次揮手的。顯然在這種設計中,每次發送Http請求都會消耗很多的額外資源,即連接的建立與銷毀。

於是,HTTP協議的也進行了發展,通過持久連接的方法來進行socket連接復用。

 

從圖中可以看到:

  1. 在串行連接中,每次交互都要打開關閉連接

  2. 在持久連接中,第一次交互會打開連接,交互結束后連接並不關閉,下次交互就省去了建立連接的過程。

持久連接的實現有兩種:HTTP/1.0+的keep-alive與HTTP/1.1的持久連接。

二、HTTP/1.0+的Keep-Alive

從1996年開始,很多HTTP/1.0瀏覽器與服務器都對協議進行了擴展,那就是“keep-alive”擴展協議。

注意,這個擴展協議是作為1.0的補充的“實驗型持久連接”出現的。keep-alive已經不再使用了,最新的HTTP/1.1規范中也沒有對它進行說明,只是很多應用延續了下來。

使用HTTP/1.0的客戶端在首部中加上”Connection:Keep-Alive”,請求服務端將一條連接保持在打開狀態。服務端如果願意將這條連接保持在打開狀態,就會在響應中包含同樣的首部。如果響應中沒有包含”Connection:Keep-Alive”首部,則客戶端會認為服務端不支持keep-alive,會在發送完響應報文之后關閉掉當前連接。

通過keep-alive補充協議,客戶端與服務器之間完成了持久連接,然而仍然存在着一些問題:

在HTTP/1.0中keep-alive不是標准協議,客戶端必須發送Connection:Keep-Alive來激活keep-alive連接。

代理服務器可能無法支持keep-alive,因為一些代理是”盲中繼”,無法理解首部的含義,只是將首部逐跳轉發。所以可能造成客戶端與服務端都保持了連接,但是代理不接受該連接上的數據。

三、HTTP/1.1的持久連接

HTTP/1.1采取持久連接的方式替代了Keep-Alive。

HTTP/1.1的連接默認情況下都是持久連接。如果要顯式關閉,需要在報文

中加上Connection:Close首部。即在HTTP/1.1中,所有的連接都進行了復用。

然而如同Keep-Alive一樣,空閑的持久連接也可以隨時被客戶端與服務端關閉。不發送Connection:Close不意味着服務器承諾連接永遠保持打開。

四、HttpClient如何生成持久連接

HttpClien中使用了連接池來管理持有連接,同一條TCP鏈路上,連接是可以復用的。HttpClient通過連接池的方式進行連接持久化。

其實“池”技術是一種通用的設計,其設計思想並不復雜:

  1. 當有連接第一次使用的時候建立連接

  2. 結束時對應連接不關閉,歸還到池中

  3. 下次同個目的的連接可從池中獲取一個可用連接

  4. 定期清理過期連接

所有的連接池都是這個思路,不過我們看HttpClient源碼主要關注兩點:

  • 連接池的具體設計方案,以供以后自定義連接池參考

  • 如何與HTTP協議對應上,即理論抽象轉為代碼的實現

4.1 HttpClient連接池的實現

HttpClient關於持久連接的處理在下面的代碼中可以集中體現,下面從MainClientExec摘取了和連接池相關的部分,去掉了其他部分:

MainClientExec摘取了和連接池相關的部分,去掉了其他部分:

 

public class MainClientExec implements ClientExecChain {

 

    @Override

    public CloseableHttpResponse execute(

            final HttpRoute route,

            final HttpRequestWrapper request,

            final HttpClientContext context,

            final HttpExecutionAware execAware) throws IOException, HttpException {

     //從連接管理器HttpClientConnectionManager中獲取一個連接請求ConnectionRequest

        final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);final HttpClientConnection managedConn;

        final int timeout = config.getConnectionRequestTimeout();

        //從連接請求ConnectionRequest中獲取一個被管理的連接HttpClientConnection

        managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);

     //將連接管理器HttpClientConnectionManager與被管理的連接HttpClientConnection交給一個ConnectionHolder持有

        final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);

        try {

            HttpResponse response;

            if (!managedConn.isOpen()) {

          //如果當前被管理的連接不是出於打開狀態,需要重新建立連接

                establishRoute(proxyAuthState, managedConn, route, request, context);

            }

       //通過連接HttpClientConnection發送請求

            response = requestExecutor.execute(request, managedConn, context);

       //通過連接重用策略判斷是否連接可重用         

            if (reuseStrategy.keepAlive(response, context)) {

                //獲得連接有效期

                final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);

                //設置連接有效期

                connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);

          //將當前連接標記為可重用狀態

                connHolder.markReusable();

            } else {

                connHolder.markNonReusable();

            }

        }

        final HttpEntity entity = response.getEntity();

        if (entity == null || !entity.isStreaming()) {

            //將當前連接釋放到池中,供下次調用

            connHolder.releaseConnection();

            return new HttpResponseProxy(response, null);

        } else {

            return new HttpResponseProxy(response, connHolder);

        }

}

 

這里看到了在Http請求過程中對連接的處理是和協議規范是一致的,這里要展開講一下具體實現。

 

PoolingHttpClientConnectionManager是HttpClient默認的連接管理器,首先通過requestConnection()獲得一個連接的請求,注意這里不是連接。

 

public ConnectionRequest requestConnection(

            final HttpRoute route,

            final Object state) {final Future<CPoolEntry> future = this.pool.lease(route, state, null);

        return new ConnectionRequest() {

            @Override

            public boolean cancel() {

                return future.cancel(true);

            }

            @Override

            public HttpClientConnection get(

                    final long timeout,

                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {

                final HttpClientConnection conn = leaseConnection(future, timeout, tunit);

                if (conn.isOpen()) {

                    final HttpHost host;

                    if (route.getProxyHost() != null) {

                        host = route.getProxyHost();

                    } else {

                        host = route.getTargetHost();

                    }

                    final SocketConfig socketConfig = resolveSocketConfig(host);

                    conn.setSocketTimeout(socketConfig.getSoTimeout());

                }

                return conn;

            }

        };

    }

 

可以看到返回的ConnectionRequest對象實際上是一個持有了Future<CPoolEntry>,CPoolEntry是被連接池管理的真正連接實例。

 

從上面的代碼我們應該關注的是:

 

  • Future<CPoolEntry> future = this.pool.lease(route, state, null)

    如何從連接池CPool中獲得一個異步的連接,Future<CPoolEntry>

  • HttpClientConnection conn = leaseConnection(future, timeout, tunit)

    如何通過異步連接Future<CPoolEntry>獲得一個真正的連接HttpClientConnection

 

4.2 Future<CPoolEntry>

 

看一下CPool是如何釋放一個Future<CPoolEntry>的,AbstractConnPool核心代碼如下:

 

private E getPoolEntryBlocking(

            final T route, final Object state,

            final long timeout, final TimeUnit tunit,

            final Future<E> future) throws IOException, InterruptedException, TimeoutException {

     //首先對當前連接池加鎖,當前鎖是可重入鎖ReentrantLockthis.lock.lock();

        try {

        //獲得一個當前HttpRoute對應的連接池,對於HttpClient的連接池而言,總池有個大小,每個route對應的連接也是個池,所以是“池中池”

            final RouteSpecificPool<T, C, E> pool = getPool(route);

            E entry;

            for (;;) {

                Asserts.check(!this.isShutDown, "Connection pool shut down");

          //死循環獲得連接

                for (;;) {

            //從route對應的池中拿連接,可能是null,也可能是有效連接

                    entry = pool.getFree(state);

            //如果拿到null,就退出循環

                    if (entry == null) {

                        break;

                    }

            //如果拿到過期連接或者已關閉連接,就釋放資源,繼續循環獲取

                    if (entry.isExpired(System.currentTimeMillis())) {

                        entry.close();

                    }

                    if (entry.isClosed()) {

                        this.available.remove(entry);

                        pool.free(entry, false);

                    } else {

              //如果拿到有效連接就退出循環

                        break;

                    }

                }

          //拿到有效連接就退出

                if (entry != null) {

                    this.available.remove(entry);

                    this.leased.add(entry);

                    onReuse(entry);

                    return entry;

                }

          //到這里證明沒有拿到有效連接,需要自己生成一個                

                final int maxPerRoute = getMax(route);

                //每個route對應的連接最大數量是可配置的,如果超過了,就需要通過LRU清理掉一些連接

                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);

                if (excess > 0) {

                    for (int i = 0; i < excess; i++) {

                        final E lastUsed = pool.getLastUsed();

                        if (lastUsed == null) {

                            break;

                        }

                        lastUsed.close();

                        this.available.remove(lastUsed);

                        pool.remove(lastUsed);

                    }

                }

          //當前route池中的連接數,沒有達到上線

                if (pool.getAllocatedCount() < maxPerRoute) {

                    final int totalUsed = this.leased.size();

                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);

            //判斷連接池是否超過上線,如果超過了,需要通過LRU清理掉一些連接

                    if (freeCapacity > 0) {

                        final int totalAvailable = this.available.size();

               //如果空閑連接數已經大於剩余可用空間,則需要清理下空閑連接

                        if (totalAvailable > freeCapacity - 1) {

                            if (!this.available.isEmpty()) {

                                final E lastUsed = this.available.removeLast();

                                lastUsed.close();

                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());

                                otherpool.remove(lastUsed);

                            }

                        }

              //根據route建立一個連接

                        final C conn = this.connFactory.create(route);

              //將這個連接放入route對應的“小池”中

                        entry = pool.add(conn);

              //將這個連接放入“大池”中

                        this.leased.add(entry);

                        return entry;

                    }

                }

         //到這里證明沒有從獲得route池中獲得有效連接,並且想要自己建立連接時當前route連接池已經到達最大值,即已經有連接在使用,但是對當前線程不可用

                boolean success = false;

                try {

                    if (future.isCancelled()) {

                        throw new InterruptedException("Operation interrupted");

                    }

            //將future放入route池中等待

                    pool.queue(future);

            //將future放入大連接池中等待

                    this.pending.add(future);

            //如果等待到了信號量的通知,success為true

                    if (deadline != null) {

                        success = this.condition.awaitUntil(deadline);

                    } else {

                        this.condition.await();

                        success = true;

                    }

                    if (future.isCancelled()) {

                        throw new InterruptedException("Operation interrupted");

                    }

                } finally {

                    //從等待隊列中移除

                    pool.unqueue(future);

                    this.pending.remove(future);

                }

                //如果沒有等到信號量通知並且當前時間已經超時,則退出循環

                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {

                    break;

                }

            }

       //最終也沒有等到信號量通知,沒有拿到可用連接,則拋異常

            throw new TimeoutException("Timeout waiting for connection");

        } finally {

       //釋放對大連接池的鎖

            this.lock.unlock();

        }

    }

 

上面的代碼邏輯有幾個重要點:

  • 連接池有個最大連接數,每個route對應一個小連接池,也有個最大連接數

  • 不論是大連接池還是小連接池,當超過數量的時候,都要通過LRU釋放一些連接

  • 如果拿到了可用連接,則返回給上層使用

  • 如果沒有拿到可用連接,HttpClient會判斷當前route連接池是否已經超過了最大數量,沒有到上限就會新建一個連接,並放入池中

  • 如果到達了上限,就排隊等待,等到了信號量,就重新獲得一次,等待不到就拋超時異常

  • 通過線程池獲取連接要通過ReetrantLock加鎖,保證線程安全

 

到這里為止,程序已經拿到了一個可用的CPoolEntry實例,或者拋異常終止了程序。

4.3 HttpClientConnection

 

protected HttpClientConnection leaseConnection(

            final Future<CPoolEntry> future,

            final long timeout,

            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {

        final CPoolEntry entry;

        try {

       //從異步操作Future<CPoolEntry>中獲得CPoolEntry

            entry = future.get(timeout, tunit);

            if (entry == null || future.isCancelled()) {

                throw new InterruptedException();

            }

            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");

            if (this.log.isDebugEnabled()) {

                this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));

            }

       //獲得一個CPoolEntry的代理對象,對其操作都是使用同一個底層的HttpClientConnection

            return CPoolProxy.newProxy(entry);

        } catch (final TimeoutException ex) {

            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");

        }

    }

 

五、HttpClient如何復用持久連接?

 

在上一章中,我們看到了HttpClient通過連接池來獲得連接,當需要使用連接的時候從池中獲得。

 

對應着第三章的問題:

 

  1. 當有連接第一次使用的時候建立連接

  2. 結束時對應連接不關閉,歸還到池中

  3. 下次同個目的的連接可從池中獲取一個可用連接

  4. 定期清理過期連接

 

我們在第四章中看到了HttpClient是如何處理1、3的問題的,那么第2個問題是怎么處理的呢?

 

即HttpClient如何判斷一個連接在使用完畢后是要關閉,還是要放入池中供他人復用?再看一下MainClientExec的代碼

 

//發送Http連接

                response = requestExecutor.execute(request, managedConn, context);

                //根據重用策略判斷當前連接是否要復用

                if (reuseStrategy.keepAlive(response, context)) {

                    //需要復用的連接,獲取連接超時時間,以response中的timeout為准

                    final long duration = keepAliveStrategy.getKeepAliveDuration(response, context);

                    if (this.log.isDebugEnabled()) {

                        final String s;

               //timeout的是毫秒數,如果沒有設置則為-1,即沒有超時時間

                        if (duration > 0) {

                            s = "for " + duration + " " + TimeUnit.MILLISECONDS;

                        } else {

                            s = "indefinitely";

                        }

                        this.log.debug("Connection can be kept alive " + s);

                    }

            //設置超時時間,當請求結束時連接管理器會根據超時時間決定是關閉還是放回到池中

                    connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);

                    //將連接標記為可重用

            connHolder.markReusable();

                } else {

            //將連接標記為不可重用

                    connHolder.markNonReusable();

                }

 

可以看到,當使用連接發生過請求之后,有連接重試策略來決定該連接是否要重用,如果要重用就會在結束后交給HttpClientConnectionManager放入池中。

那么連接復用策略的邏輯是怎么樣的呢?

 

public class DefaultClientConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

 

    public static final DefaultClientConnectionReuseStrategy INSTANCE = new DefaultClientConnectionReuseStrategy();

 

    @Override

    public boolean keepAlive(final HttpResponse response, final HttpContext context) {

     //從上下文中拿到request

        final HttpRequest request = (HttpRequest) context.getAttribute(HttpCoreContext.HTTP_REQUEST);

        if (request != null) {

       //獲得Connection的Header

            final Header[] connHeaders = request.getHeaders(HttpHeaders.CONNECTION);

            if (connHeaders.length != 0) {

                final TokenIterator ti = new BasicTokenIterator(new BasicHeaderIterator(connHeaders, null));

                while (ti.hasNext()) {

                    final String token = ti.nextToken();

            //如果包含Connection:Close首部,則代表請求不打算保持連接,會忽略response的意願,該頭部這是HTTP/1.1的規范

                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {

                        return false;

                    }

                }

            }

        }

     //使用父類的的復用策略

        return super.keepAlive(response, context);

    }

}

 

看一下父類的復用策略

 

if (canResponseHaveBody(request, response)) {

                final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN);

                //如果reponse的Content-Length沒有正確設置,則不復用連接

          //因為對於持久化連接,兩次傳輸之間不需要重新建立連接,則需要根據Content-Length確認內容屬於哪次請求,以正確處理“粘包”現象

                //所以,沒有正確設置Content-Length的response連接不能復用

                if (clhs.length == 1) {

                    final Header clh = clhs[0];

                    try {

                        final int contentLen = Integer.parseInt(clh.getValue());

                        if (contentLen < 0) {

                            return false;

                        }

                    } catch (final NumberFormatException ex) {

                        return false;

                    }

                } else {

                    return false;

                }

            }

        if (headerIterator.hasNext()) {

            try {

                final TokenIterator ti = new BasicTokenIterator(headerIterator);

                boolean keepalive = false;

                while (ti.hasNext()) {

                    final String token = ti.nextToken();

            //如果response有Connection:Close首部,則明確表示要關閉,則不復用

                    if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) {

                        return false;

            //如果response有Connection:Keep-Alive首部,則明確表示要持久化,則復用

                    } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) {

                        keepalive = true;

                    }

                }

                if (keepalive) {

                    return true;

                }

            } catch (final ParseException px) {

                return false;

            }

        }

     //如果response中沒有相關的Connection首部說明,則高於HTTP/1.0版本的都復用連接  

        return !ver.lessEquals(HttpVersion.HTTP_1_0);

 

總結一下:

  • 如果request首部中包含Connection:Close,不復用

  • 如果response中Content-Length長度設置不正確,不復用

  • 如果response首部包含Connection:Close,不復用

  • 如果reponse首部包含Connection:Keep-Alive,復用

  • 都沒命中的情況下,如果HTTP版本高於1.0則復用

從代碼中可以看到,其實現策略與我們第二、三章協議層的約束是一致的。

六、HttpClient如何清理過期連接

 

在HttpClient4.4版本之前,在從連接池中獲取重用連接的時候會檢查下是否過期,過期則清理。

 

之后的版本則不同,會有一個單獨的線程來掃描連接池中的連接,發現有離最近一次使用超過設置的時間后,就會清理。默認的超時時間是2秒鍾。

 

public CloseableHttpClient build() {

            //如果指定了要清理過期連接與空閑連接,才會啟動清理線程,默認是不啟動的

            if (evictExpiredConnections || evictIdleConnections) {

          //創造一個連接池的清理線程

                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,

                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS,

                        maxIdleTime, maxIdleTimeUnit);

                closeablesCopy.add(new Closeable() {

                    @Override

                    public void close() throws IOException {

                        connectionEvictor.shutdown();

                        try {

                            connectionEvictor.awaitTermination(1L, TimeUnit.SECONDS);

                        } catch (final InterruptedException interrupted) {

                            Thread.currentThread().interrupt();

                        }

                    }

 

                });

          //執行該清理線程

                connectionEvictor.start();

}

 

可以看到在HttpClientBuilder進行build的時候,如果指定了開啟清理功能,會創建一個連接池清理線程並運行它。


public IdleConnectionEvictor(

            final HttpClientConnectionManager connectionManager,

            final ThreadFactory threadFactory,

            final long sleepTime, final TimeUnit sleepTimeUnit,

            final long maxIdleTime, final TimeUnit maxIdleTimeUnit) {

        this.connectionManager = Args.notNull(connectionManager, "Connection manager");

        this.threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory();

        this.sleepTimeMs = sleepTimeUnit != null ? sleepTimeUnit.toMillis(sleepTime) : sleepTime;

        this.maxIdleTimeMs = maxIdleTimeUnit != null ? maxIdleTimeUnit.toMillis(maxIdleTime) : maxIdleTime;

        this.thread = this.threadFactory.newThread(new Runnable() {

            @Override

            public void run() {

                try {

            //死循環,線程一直執行

                    while (!Thread.currentThread().isInterrupted()) {

              //休息若干秒后執行,默認10秒

                        Thread.sleep(sleepTimeMs);

               //清理過期連接

                        connectionManager.closeExpiredConnections();

               //如果指定了最大空閑時間,則清理空閑連接

                        if (maxIdleTimeMs > 0) {

                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);

                        }

                    }

                } catch (final Exception ex) {

                    exception = ex;

                }

 

            }

        });

    }

 

總結一下:

 

    • 只有在HttpClientBuilder手動設置后,才會開啟清理過期與空閑連接

    • 手動設置后,會啟動一個線程死循環執行,每次執行sleep一定時間,調用HttpClientConnectionManager的清理方法清理過期與空閑連接。

七、本文總結

  • HTTP協議通過持久連接的方式,減輕了早期設計中的過多連接問題

  • 持久連接有兩種方式:HTTP/1.0+的Keep-Avlive與HTTP/1.1的默認持久連接

  • HttpClient通過連接池來管理持久連接,連接池分為兩個,一個是總連接池,一個是每個route對應的連接池

  • HttpClient通過異步的Future<CPoolEntry>來獲取一個池化的連接

  • 默認連接重用策略與HTTP協議約束一致,根據response先判斷Connection:Close則關閉,在判斷Connection:Keep-Alive則開啟,最后版本大於1.0則開啟

  • 只有在HttpClientBuilder中手動開啟了清理過期與空閑連接的開關后,才會清理連接池中的連接

  • HttpClient4.4之后的版本通過一個死循環線程清理過期與空閑連接,該線程每次執行都sleep一會,以達到定期執行的效果

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM