httpclient源碼分析之 PoolingHttpClientConnectionManager 獲取連接


PoolingHttpClientConnectionManager是一個HttpClientConnection的連接池,可以為多線程提供並發請求服務。主要作用就是分配連接,回收連接等。同一個route的請求,會優先使用連接池提供的空閑長連接。

源碼版本4.5.2,因為代碼太多,很多不是自己關心的,為免看起來費力,這里代碼貼的不全。省略代碼的地方用省略號標注。

配置說明

    <bean id="ky.pollingConnectionManager" class="org.apache.http.impl.conn.PoolingHttpClientConnectionManager">
        <!--整個連接池的最大連接數 -->
        <property name="maxTotal" value="1000" />
        <!--每個route默認的連接數-->
        <property name="defaultMaxPerRoute" value="32" />
    </bean>
  • maxTotal 是整個連接池的最大連接數
  • defaultMaxPerRoute 是每個route默認的最大連接數
  • setMaxPerRoute(final HttpRoute route, final int max) route的最大連接數,優先於defaultMaxPerRoute。
    public PoolingHttpClientConnectionManager(
        final HttpClientConnectionOperator httpClientConnectionOperator,
        final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
        final long timeToLive, final TimeUnit tunit) {
        super();
        this.configData = new ConfigData();
        //defaultMaxPerRoute默認為2,maxTotal默認為20
        this.pool = new CPool(new InternalConnectionFactory(
                this.configData, connFactory), 2, 20, timeToLive, tunit);
        //validateAfterInactivity 空閑永久連接檢查間隔,這個牽扯的還比較多
        //官方推薦使用這個來檢查永久鏈接的可用性,而不推薦每次請求的時候才去檢查        
        this.pool.setValidateAfterInactivity(2000);
        this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
        this.isShutDown = new AtomicBoolean(false);
    }

獲取連接

獲取連接分兩步,首先新建一個ConnectionRequest,在通過request.get得到HttpClientConnection。

    //org.apache.http.impl.conn.PoolingHttpClientConnectionManager

    @Override
    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        ......
        
        //從連接池中獲取一個CPoolEntry(Connection的包裝類)
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
			......

            // ConnectionRequest的get方法。調用leaseConnection方法,並且傳入future(CPoolEntry的封裝(connection的封裝))
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
    }


    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {
            //從future中get
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");

            //封裝返回
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }

所以,CPoolEntry(ManagedHttpClientConnection的封裝),實際是調用PoolingHttpClientConnectionManager的leaseConnection,通過future的get獲得。

這里的future是Future future = this.pool.lease(route, state, null);得到的。

    //org.apache.http.pool.AbstractConnPool
    @Override
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        ......

        return new PoolEntryFuture<E>(this.lock, callback) {
            @Override
            public E getPoolEntry(
                    final long timeout,
                    final TimeUnit tunit)
                        throws InterruptedException, TimeoutException, IOException {
                //阻塞獲取CPoolEntry        
                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                onLease(entry);
                return entry;
            }

        };
    }

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture<E> future)
                throws IOException, InterruptedException, TimeoutException {
        //設置超時時間點        
        ......

        //串行操作
        this.lock.lock();
        try {
            //每一個route都有一個連接池,這里獲取指定route的連接池
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry = null;
            //循環取,直到超時
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    //從連接池中去一個空閑的連接,優先取state相同的。state默認是null
                    entry = pool.getFree(state);
                    //如果沒有符合的連接,則調出,創建一個新連接
                    if (entry == null) {
                        break;
                    }
                    //如果連接超時,則關閉
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    //如果是永久連接,且最近周期內沒有檢驗,則校驗連接是否可用。不可用的連接需要關閉    
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    //如果連接已經關閉了,則釋放掉,繼續從池子中取符合條件的連接
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                //entry不為空,則修改連接池的參數,並返回。
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                //獲取池子的最大連接數,如果池子已經超過容量了,需要把超過的資源回收
                //如果池子中連接數沒有超,空閑的連接還比較多,就先從別人的池子里借一個來用
                 ......

                //不能借,就自己動手了。新建並返回。
                final C conn = this.connFactory.create(route);
                entry = pool.add(conn);
                this.leased.add(entry);
                return entry;  
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

讀到這里,看起來拿到的entry要么是剛剛創建的熱乎的,要么是沒有過期的連接,要么是復用的池子中有效的永久連接。是這樣的嗎?再看一下復用的永久連接的情況:

//如果往前validateAfterInactivity ms之內沒有校驗,則校驗entry。校驗不通過則關閉並釋放,繼續從連接池中獲取entry。
//如果往前validateAfterInactivity ms之內有過校驗,則無需再次校驗
 if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
     if (!validate(entry)) {
         entry.close();
     }
 }

判斷連接是否可用:

	//org.apache.http.impl.conn.CPool
    protected boolean validate(final CPoolEntry entry) {
        return !entry.getConnection().isStale();
    }

	//org.apache.http.impl.BHttpConnectionBase
	//判斷連接是否不可用(go down)
    public boolean isStale() {
	    //沒有打開,即socket為空,則不可用
        if (!isOpen()) {
            return true;
        }
        try {
        //socket鏈路有了,測試鏈路是否可用
        //這里的測試方法是查看很短的時間內(這里是1ms),是否可以從輸入流中讀到數據
        //如果測試結果返回-1說明不可用
            final int bytesRead = fillInputBuffer(1);
            return bytesRead < 0;
        } catch (final SocketTimeoutException ex) {
	        //注意這里SocketTimeoutException時,認為是可用的
            return false;
        } catch (final IOException ex) {
            //有I/O異常,不可用
            return true;
        }
    }

了解下測試連接是否可用的過程,梳理一下調用鏈路:

  • org.apache.http.impl.BHttpConnectionBase
    private int fillInputBuffer(final int timeout) throws IOException 不處理異常

  • org.apache.http.impl.io.SessionInputBufferImpl
    public int fillBuffer() throws IOException 不處理異常

  • org.apache.http.impl.conn.LoggingInputStream
    打印日志,不處理異常

    @Override
    public int read(final byte[] b, final int off, final int len) throws IOException {
        try {
            final int bytesRead = in.read(b, off, len);
            if (bytesRead == -1) {
                wire.input("end of stream");
            } else if (bytesRead > 0) {
                wire.input(b, off, bytesRead);
            }
            return bytesRead;
        } catch (final IOException ex) {
            wire.input("[read] I/O error: " + ex.getMessage());
            throw ex;
        }
    } 
  • sun.security.ssl.AppInputStream
    public synchronized int read(byte[] var1, int var2, int var3) throws IOException 不處理異常

  • sun.security.ssl.SSLSocketImpl
    void readDataRecord(InputRecord var1) throws IOException 不處理異常

  • sun.security.ssl.InputRecord
    void read(InputStream var1, OutputStream var2) throws IOException

  • java.net.SocketInputStream
    int read(byte b[], int off, int length, int timeout) throws IOException
    通過socket讀取數據,如果發生ConnectionResetException異常,則throw new SocketException("Connection reset");

以上,是對PoolingHttpClientConnectionManager從連接池中獲取一個連接給用戶的過程。用戶拿到的連接有三種:新創建的;未過期的短連接;間隔檢查的永久鏈接。
需要注意,間隔檢查的永久鏈接 如果在間隔時間(這里是2s)內,socket連接出現什么問題,是不知道的,因為沒有進行檢測。另外,檢查鏈接是否可用的方法 isStale ,並不是100%靠譜的,即檢測時出現SocketTimeoutException時,認為是可用的。而這時候,很有可能連接不可用,比如服務端關閉鏈接的情況。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM