PoolingHttpClientConnectionManager是一個HttpClientConnection的連接池,可以為多線程提供並發請求服務。主要作用就是分配連接,回收連接等。同一個route的請求,會優先使用連接池提供的空閑長連接。
源碼版本4.5.2,因為代碼太多,很多不是自己關心的,為免看起來費力,這里代碼貼的不全。省略代碼的地方用省略號標注。
配置說明
<bean id="ky.pollingConnectionManager" class="org.apache.http.impl.conn.PoolingHttpClientConnectionManager">
<!--整個連接池的最大連接數 -->
<property name="maxTotal" value="1000" />
<!--每個route默認的連接數-->
<property name="defaultMaxPerRoute" value="32" />
</bean>
- maxTotal 是整個連接池的最大連接數
- defaultMaxPerRoute 是每個route默認的最大連接數
- setMaxPerRoute(final HttpRoute route, final int max) route的最大連接數,優先於defaultMaxPerRoute。
public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
final long timeToLive, final TimeUnit tunit) {
super();
this.configData = new ConfigData();
//defaultMaxPerRoute默認為2,maxTotal默認為20
this.pool = new CPool(new InternalConnectionFactory(
this.configData, connFactory), 2, 20, timeToLive, tunit);
//validateAfterInactivity 空閑永久連接檢查間隔,這個牽扯的還比較多
//官方推薦使用這個來檢查永久鏈接的可用性,而不推薦每次請求的時候才去檢查
this.pool.setValidateAfterInactivity(2000);
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
this.isShutDown = new AtomicBoolean(false);
}
獲取連接
獲取連接分兩步,首先新建一個ConnectionRequest,在通過request.get得到HttpClientConnection。
//org.apache.http.impl.conn.PoolingHttpClientConnectionManager
@Override
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
......
//從連接池中獲取一個CPoolEntry(Connection的包裝類)
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
......
// ConnectionRequest的get方法。調用leaseConnection方法,並且傳入future(CPoolEntry的封裝(connection的封裝))
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
final CPoolEntry entry;
try {
//從future中get
entry = future.get(timeout, tunit);
if (entry == null || future.isCancelled()) {
throw new InterruptedException();
}
Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
//封裝返回
return CPoolProxy.newProxy(entry);
} catch (final TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
所以,CPoolEntry(ManagedHttpClientConnection的封裝),實際是調用PoolingHttpClientConnectionManager的leaseConnection,通過future的get獲得。
這里的future是Future
//org.apache.http.pool.AbstractConnPool
@Override
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
......
return new PoolEntryFuture<E>(this.lock, callback) {
@Override
public E getPoolEntry(
final long timeout,
final TimeUnit tunit)
throws InterruptedException, TimeoutException, IOException {
//阻塞獲取CPoolEntry
final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
onLease(entry);
return entry;
}
};
}
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit tunit,
final PoolEntryFuture<E> future)
throws IOException, InterruptedException, TimeoutException {
//設置超時時間點
......
//串行操作
this.lock.lock();
try {
//每一個route都有一個連接池,這里獲取指定route的連接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry = null;
//循環取,直到超時
while (entry == null) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
for (;;) {
//從連接池中去一個空閑的連接,優先取state相同的。state默認是null
entry = pool.getFree(state);
//如果沒有符合的連接,則調出,創建一個新連接
if (entry == null) {
break;
}
//如果連接超時,則關閉
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
//如果是永久連接,且最近周期內沒有檢驗,則校驗連接是否可用。不可用的連接需要關閉
} else if (this.validateAfterInactivity > 0) {
if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(entry)) {
entry.close();
}
}
}
//如果連接已經關閉了,則釋放掉,繼續從池子中取符合條件的連接
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
//entry不為空,則修改連接池的參數,並返回。
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// New connection is needed
//獲取池子的最大連接數,如果池子已經超過容量了,需要把超過的資源回收
//如果池子中連接數沒有超,空閑的連接還比較多,就先從別人的池子里借一個來用
......
//不能借,就自己動手了。新建並返回。
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
讀到這里,看起來拿到的entry要么是剛剛創建的熱乎的,要么是沒有過期的連接,要么是復用的池子中有效的永久連接。是這樣的嗎?再看一下復用的永久連接的情況:
//如果往前validateAfterInactivity ms之內沒有校驗,則校驗entry。校驗不通過則關閉並釋放,繼續從連接池中獲取entry。
//如果往前validateAfterInactivity ms之內有過校驗,則無需再次校驗
if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
if (!validate(entry)) {
entry.close();
}
}
判斷連接是否可用:
//org.apache.http.impl.conn.CPool
protected boolean validate(final CPoolEntry entry) {
return !entry.getConnection().isStale();
}
//org.apache.http.impl.BHttpConnectionBase
//判斷連接是否不可用(go down)
public boolean isStale() {
//沒有打開,即socket為空,則不可用
if (!isOpen()) {
return true;
}
try {
//socket鏈路有了,測試鏈路是否可用
//這里的測試方法是查看很短的時間內(這里是1ms),是否可以從輸入流中讀到數據
//如果測試結果返回-1說明不可用
final int bytesRead = fillInputBuffer(1);
return bytesRead < 0;
} catch (final SocketTimeoutException ex) {
//注意這里SocketTimeoutException時,認為是可用的
return false;
} catch (final IOException ex) {
//有I/O異常,不可用
return true;
}
}
了解下測試連接是否可用的過程,梳理一下調用鏈路:
-
org.apache.http.impl.BHttpConnectionBase
private int fillInputBuffer(final int timeout) throws IOException 不處理異常 -
org.apache.http.impl.io.SessionInputBufferImpl
public int fillBuffer() throws IOException 不處理異常 -
org.apache.http.impl.conn.LoggingInputStream
打印日志,不處理異常
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
try {
final int bytesRead = in.read(b, off, len);
if (bytesRead == -1) {
wire.input("end of stream");
} else if (bytesRead > 0) {
wire.input(b, off, bytesRead);
}
return bytesRead;
} catch (final IOException ex) {
wire.input("[read] I/O error: " + ex.getMessage());
throw ex;
}
}
-
sun.security.ssl.AppInputStream
public synchronized int read(byte[] var1, int var2, int var3) throws IOException 不處理異常 -
sun.security.ssl.SSLSocketImpl
void readDataRecord(InputRecord var1) throws IOException 不處理異常 -
sun.security.ssl.InputRecord
void read(InputStream var1, OutputStream var2) throws IOException -
java.net.SocketInputStream
int read(byte b[], int off, int length, int timeout) throws IOException
通過socket讀取數據,如果發生ConnectionResetException異常,則throw new SocketException("Connection reset");
以上,是對PoolingHttpClientConnectionManager從連接池中獲取一個連接給用戶的過程。用戶拿到的連接有三種:新創建的;未過期的短連接;間隔檢查的永久鏈接。
需要注意,間隔檢查的永久鏈接 如果在間隔時間(這里是2s)內,socket連接出現什么問題,是不知道的,因為沒有進行檢測。另外,檢查鏈接是否可用的方法 isStale
,並不是100%靠譜的,即檢測時出現SocketTimeoutException時,認為是可用的。而這時候,很有可能連接不可用,比如服務端關閉鏈接的情況。