對象池技術其實蠻常見的,比如線程池、數據庫連接池
他們的特點是:對象創建代價較高、比較消耗資源、比較耗時;
比如 mysql數據庫連接建立就要先建立 tcp三次握手、發送用戶名/密碼、進行身份校驗、權限校驗等很多步驟才算是 db連接建立成功;要是每次使用的時候才去創建會比較影響性能,而且也不能無限制的創建太多
所以,這種對象使用完后不立即釋放資源,一般是先放到一個池子里暫存起來,下次就能直接從池子里拿出現成可用的對象
對象池需要具備的能力
所以,為了讓這類資源對象的使用方能夠復用資源、快速獲取可用對象,這個池子得具備的能力有哪些?
- 首先有個容器的數據結構,能存放多個對象,也有數量上限
- 維持一定數量的常駐對象,這個數量如果和
qps * rt
匹配的話,業務處理就都能直接獲取可用對象,不需要消耗對象創建的時間了 - 能應對突發流量
- 超時獲取,一定時間沒有獲取成功就拋出異常,不卡死業務線程
- 具有活性檢測機制, 從容器拿出來的對象得是可用的
1 核心流程
1.1對象獲取流程
1.2 活性檢測
2 實現
為了實現前面提到的容器具備的能力,以及對象獲取流程,需要考慮幾個東西:
-
容器的數據結構選擇
用 List、 Map 還是 Queue ?亦或是組合起來用? -
空閑對象要不要單獨用要給集合存一份?方便判斷是否空、阻塞等待?
比如將空閑對象,用一個blockingqueue存一下,就能利用阻塞隊列的能力實現超時等待 -
檢測機制
- 在什么時候檢測:常見的有 testOnBorrow 在申請到的時候檢測、testOnReturn在歸還的時候檢測 這兩個對性能有些影響; 單獨開個檢查線程,定時去掃描檢查,這個是異步的 不會有testOnBorrow和testOnReturn的性能影響
- 檢測哪些對象: 比如空閑超過 500ms 的對象
- 如何檢查:這個需要根據具體對象的類型來,比如db連接的話一般是發送 “select 1” 看是否能正常執行
3 一個通用實現 apache commons pool
通過前面的介紹,可以知道對象池技術的核心過程大同小異,可以將對象獲取流程、活性檢測機制等封裝成一個通用的工具,將對象本身的創建、活性檢測邏輯開放給具體的對象實現來完成; apache commons pool 就是這么個工具, jedis底層的連接池就是直接用的這個
3.1 核心數據結構
LinkedBlockingDeque<PooledObject<T>> idleObjects
空閑對象雙向阻塞隊列Map<IdentityWrapper<T>, PooledObject<T>> allObjects = new ConcurrentHashMap<>();
所有對象的map
apache commons pool 的容器用的 ConcurrentHashMap,並且將空閑的對象用一個雙向阻塞隊列單獨連接起來;
這樣他就能利用這個阻塞隊列本身的特性,達到阻塞獲取的邏輯,如果 idleObjects 是空的,就能 take()/poll(timeout) 阻塞在這里,等待其他線程歸還對象隊列里
3.2 核心對象定義
- PooledObject 可池化的對象:包含真實對象、狀態扭轉及其創建時間、取出時間、空閑時間等指標信息
- PooledObjectFactory 對象工廠,負責對象的創建、銷毀、檢查等邏輯;它有個默認實現
DefaultPooledObject 提供了基本的實現,一般只要繼承它重寫對象創建和驗活邏輯就可以了 - GenericObjectPool 就是對象容器了
3.3 代碼細節
從池子中獲取對象
T borrowObject(final long borrowMaxWaitMillis) {
//省略一些代碼 ...
PooledObject<T> p = null;
// Get local copy of current config so it is consistent for entire
// method execution
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
while (p == null) {
create = false;
// 空閑隊列 隊首如果是空的,則創建一個新的對象
// 創建的邏輯里會校驗是否超過最大連接數,然后利用 PooledObjectFactory創建對象
p = idleObjects.pollFirst();
if (p == null) {
p = create();
if (p != null) {
create = true;
}
}
// 阻塞從 idleObject 空閑阻塞隊列獲取對象
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst();
} else {
//超時等待
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
// 狀態轉換為已分配 ALLOCATE,記錄借出時間等信息
if (!p.allocate()) {
p = null;
}
if (p != null) {
try {
// 允許 PooledObjectFactory 在成功獲取到對象后做一些事,
// 比如jedis連接池獲取到連接后會執行 select db 切換db
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
// 如果 testOnBorrow=true, 或者 testOnCreate=true + 此次對象是新建的
// 則會去校驗對象的有效性 PooledObjectFactory#validateObject()
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
// 如果對象有效性校驗失敗,則銷毀掉
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
歸還對象
public void returnObject(final T obj) {
// 校驗下對象是否還存在
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
if (p == null) {
if (!isAbandonedConfig()) {
throw new IllegalStateException(
"Returned object not currently part of this pool");
}
return; // Object was abandoned and removed
}
// 狀態標記為 “歸還中”
synchronized(p) {
final PooledObjectState state = p.getState();
if (state != PooledObjectState.ALLOCATED) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
p.markReturning(); // Keep from being marked abandoned
}
final long activeTime = p.getActiveTimeMillis();
// 如果 testOnReturn=true,則在歸回時校驗對象是否還有效,如果無效了就銷毀掉
if (getTestOnReturn()) {
if (!factory.validateObject(p)) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
}
try {
factory.passivateObject(p);
} catch (final Exception e1) {
swallowException(e1);
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
// 如果此時對象池已經關閉了, 或者當前空閑對象數量大於maxIdle(最大空閑數量)則直接銷毀掉
final int maxIdleSave = getMaxIdle();
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
} else {
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
開啟定期檢查任務
final void startEvictor(final long delay) {
synchronized (evictionLock) {
// 關閉前已有的清理任務
if (null != evictor) {
EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
evictor = null;
evictionIterator = null;
}
// 間隔時間大於0的話(默認為-1),才創建定時清理任務Evictor
// Evictor 是一個 Runable任務, 它會檢查空閑隊列里的對象數量是否超過 maxIdle,空閑時長是否超過 minEvictableTimeMillis
if (delay > 0) {
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
總結
apache commons pool 的對象池實現,比較通用,在性能要求不是太苛刻的情況下可以直接使用;
但是默認的對象實現在狀態扭轉等地方是用 synchronized 加鎖來處理並發的,如果對性能要求比較高的話,需要考慮自定義其他實現方式,比如用 cas + retry 或 threadlocal 等方式減少並發沖突