Java 數據持久化系列之池化技術


在上一篇文章《Java 數據持久化系列之JDBC》中,我們了解到使用 JDBC 創建 Connection 可以執行對應的SQL,但是創建 Connection 會消耗很多資源,所以 Java 持久化框架中往往不直接使用 JDBC,而是在其上建立數據庫連接池層。

今天我們就先來了解一下池化技術的必要性、原理;然后使用 Apache-common-Pool2實現一個簡單的數據庫連接池;接着通過實驗,對比簡單連接池、HikariCP、Druid 等數據庫連接池的性能數據,分析實現高性能數據庫連接池的關鍵;最后分析 Pool2 的具體源代碼實現。

對象不是你想要,想要就能要

你我單身狗們經常調侃可以隨便 New 出一個對象,用完就丟。但是有些對象創建的代價比較大,比如線程、tcp連接、數據庫連接等對象。對於這些創建耗時較長,或者資源占用較大(占據操作系統資源,比如說線程,網絡連接等)的對象,往往會引入池化來管理,減少頻繁創建對象的次數,避免創建對象時的耗時,提高性能。

我們就以數據庫連接 Connection 對象為例,詳細說明一下創建該對象花費的時間和資源。下面是MySQL Driver 創建 Connection 對象的方法,在調用 connect 方法創建 Connection 時,會與 MySQL 進行網絡通訊,建立 TCP 連接,這是極其消耗時間的。

connection = driver.connect(URL, props);

使用 Apache-Common-Pool2實現簡易數據庫連接池

下面,我們以 Apache-Common-Pool2為例來看一下池化技術相關的抽象結構。

首先了解一下Pool2中三元一體的 ObjectPool,PooledObject 和 PooledObjectFactory,對他們的解釋如下:

  • ObjectPool 就是對象池,提供了 borrowObjectreturnObject 等一系列函數。
  • PooledObject 是池化對象的封裝類,負責記錄額外信息,比如說對象狀態,對象創建時間,對象空閑時間,對象上次使用時間等。
  • PooledObjectFactory 是負責管理池化對象生命周期的工廠類,提供 makeObjectdestroyObjectactivateObjectvalidateObject 等一系列函數。

上述三者都有其基礎的實現類,分別是 GenericObjectPool,DefaultPooledObject 和 BasePooledObjectFactory。上一節實驗中的 SimpleDatasource 就是使用上述類實現的。

首先,你要實現一個繼承 BasePooledObjectFactory 的工廠類,提供管理池化對象生命周期的具體方法:

  • makeObject:創建池化對象實例,並且使用 PooledObject 將其封裝。
  • validateObject:驗證對象實例是否安全或者可用,比如說 Connection 是否還保存連接狀態。
  • activateObject:將池返回的對象實例進行重新初始化,比如說設置 Connection是否默認AutoCommit等。
  • passivateObject:將返回給池的對象實例進行反初始化,比如說 Connection 中未提交的事務進行 Rollback等。
  • destroyObject:銷毀不再被池需要的對象實例,比如說 Connection不再被需要時,調用其 close 方法。

具體的實現源碼如下所示,每個方法都有詳細的注釋。

public class SimpleJdbcConnectionFactory extends BasePooledObjectFactory<Connection> {
    ....
    @Override
    public Connection create() throws Exception {
        // 用於創建池化對象
        Properties props = new Properties();
        props.put("user", USER_NAME);
        props.put("password", PASSWORD);
        Connection connection = driver.connect(URL, props);
        return connection;
    }

    @Override
    public PooledObject<Connection> wrap(Connection connection) {
        // 將池化對象進行封裝,返回DefaultPooledObject,這里也可以返回自己實現的PooledObject
        return new DefaultPooledObject<>(connection);
    }

    @Override
    public PooledObject<Connection> makeObject() throws Exception {
        return super.makeObject();
    }

    @Override
    public void destroyObject(PooledObject<Connection> p) throws Exception {
        p.getObject().close();
    }

    @Override
    public boolean validateObject(PooledObject<Connection> p) {
        // 使用 SELECT 1 或者其他sql語句驗證Connection是否可用,ConnUtils代碼詳見Github中的項目
        try {
            ConnUtils.validateConnection(p.getObject(), this.validationQuery);
        } catch (Exception e) {
            return false;
        }
        return true;
    }


    @Override
    public void activateObject(PooledObject<Connection> p) throws Exception {
        Connection conn = p.getObject();
        // 對象被借出,需要進行初始化,將其 autoCommit進行設置
        if (conn.getAutoCommit() != defaultAutoCommit) {
            conn.setAutoCommit(defaultAutoCommit);
        }
    }

    @Override
    public void passivateObject(PooledObject<Connection> p) throws Exception {
        // 對象被歸還,進行回收或者處理,比如將未提交的事務進行回滾
        Connection conn = p.getObject();
        if(!conn.getAutoCommit() && !conn.isReadOnly()) {
            conn.rollback();
        }
        conn.clearWarnings();
        if(!conn.getAutoCommit()) {
            conn.setAutoCommit(true);
        }

    }
}

接着,你就可以使用 BasePool 來從池中獲取對象,使用后歸還給池。

Connection connection = pool.borrowObject(); // 從池中獲取連接對象實例
Statement statement = connection.createStatement();
statement.executeQuery("SELECT * FROM activity");
statement.close();
pool.returnObject(connection); // 使用后歸還連接對象實例到池中

如上,我們就使用 Apache Common Pool2 實現了一個簡易的數據庫連接池。下面,我們先來使用 benchmark 驗證一下這個簡易數據庫連接池的性能,再分析 Pool2 的具體源碼實現,

性能試驗

至此,我們分析完了 Pool2的相關原理和實現,下面就修改 Hikari-benchmark 對我們編寫的建議數據庫連接池進行性能測試。修改后的 benchmark 的地址為 https://github.com/ztelur/HikariCP-benchmark。

可以看到 hikari 和 Druid 兩個數據庫連接池的性能是最優的,而我們的簡易數據庫連接池性能排在末尾。在后續系列文章中會對比我們的簡易數據庫分析 Hikari 和 Druid 高性能的原因。下面我們先來看一下簡易數據庫連接池的具體實現。

Apache Common Pool2 源碼分析

我們來簡要分析 Pool2 的源碼( 2.8.0版本 )實現,了解池化技術的基本原理,為后續了解和分析 HikariCP 和 Druid 打下基礎,三者在設計思路具有互通之處。

通過前邊的實例,我們知道通過 borrowObjectreturnObject 從對象池中接取或者歸還對象,進行這些操作時,封裝實例 PooledObject 的狀態也會發生變化,下面就沿着 PooledObject 狀態機的狀態變化路線來講解相關的代碼實現。

上圖是 PooledObject 的狀態機示意圖,藍色元素代表狀態,紅色代表 ObjectPool的相關方法。PooledObject 的狀態有 IDLE、ALLOCATED、RETURNING、ABANDONED、INVALID、EVICTION 和 EVICTION_RETURN_TO_HEAD(所有狀態定義在 PooledObjectState 類中,有些狀態暫時未被使用,這里不進行說明)。

主要涉及三部分的狀態變化,分別是 1、2、3的借出歸還狀態變化,4,5的標記拋棄刪除狀態變化以及6,7,8的檢測驅除狀態變化。后續會分小節詳細介紹這三部分的狀態變化。

在這些狀態變化過程中,不僅涉及 ObjectPool 的方法,也會調用 PooledObjectFactory 的方法進行相關操作。

上圖表明了在 PooledObject 狀態變化過程中涉及的 PooledObjectFactory 的方法。按照前文對 PooledObjectFactory 方法的描述,可以很容易的對應起來。比如說,在編號 1 的對象被借出過程中,先調用 invalidateObject 判斷對象可用性,然后調用 activeObject 將對象默認配置初始化。

借出歸還狀態變化

我們從 GenericObjectPool 的 borrowObject 方法開始了解。該方法可以傳入最大等待時間為參數,如果不傳則使用配置的默認最大等待時間,borrowObject 的源碼如下所示(為了可讀性,對代碼進行刪減)。

public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
    // 1 根據 abandonedConfig 和其他檢測判斷是否要調用 removeAbandoned 方法進行標記刪除操作
    ....
    PooledObject<T> p = null;
    // 當暫時無法獲取對象時是否阻塞
    final boolean blockWhenExhausted = getBlockWhenExhausted();
    while (p == null) {
        create = false;
        // 2 先從 idleObjects 隊列中獲取, pollFisrt 是非阻塞的
        p = idleObjects.pollFirst();
        // 3 沒有則調用 create 方法創建一個新的對象
        if (p == null) {
            p = create();
        }
        // 4 blockWhenExhausted 為true,則根據 borrowMaxWaitMillis 進行阻塞操作
        if (blockWhenExhausted) {
            if (p == null) {
                if (borrowMaxWaitMillis < 0) {
                    p = idleObjects.takeFirst(); // 阻塞到獲取對象為止
                } else {
                    p = idleObjects.pollFirst(borrowMaxWaitMillis,
                            TimeUnit.MILLISECONDS); // 阻塞到最大等待時間或者獲取到對象
                }
            }
        }
        // 5 調用 allocate 進行狀態變化
        if (!p.allocate()) {
            p = null;
        }
        if (p != null) {
            // 6 調用 activateObject 進行對象默認初始化,如果出現問題則調用 destroy 
            factory.activateObject(p);
            // 7 如果配置了 TestOnBorrow,則調用 validateObject 進行可用性校驗,如果不通過則調用 destroy
            if (getTestOnBorrow()) {
                validate = factory.validateObject(p);
            }
        }
    }
    return p.getObject();
}

borrowObject 方法主要做了五步操作:

  • 第一步是根據配置判斷是否要調用 removeAbandoned 方法進行標記刪除操作,這個后續小節再細講。
  • 第二步是嘗試獲取或創建對象,由源碼中2,3,4 步驟組成。
  • 第三步是調用 allocate 進行狀態變更,轉換為 ALLOCATED 狀態,如源碼中的 5 步驟。
  • 第四步是調用 factory 的 activateObject 進行對象的初始化,如果出錯則調用 destroy 方法銷毀對象,如源碼中的 6 步驟。
  • 第五步是根據 TestOnBorrow 配置調用 factory 的 validateObject 進行對象可用性分析,如果不可用,則調用 destroy 方法銷毀對象,如源碼中的 7 步驟。

我們對第二步進行一下細致的分析。idleObjects 是存儲着所有 IDLE狀態 (也有可能是 EVICTION 狀態) PooledObject 的 LinkedBlockingDeque 對象。第二步中先調用其 pollFirst 方法從隊列頭獲取 PooledObject,如果未獲取到則調用 create 方法創建一個新的。

create 也可能未創建成功,則當 blockWhenExhausted 為 true 時,未獲取到對象需要一直阻塞,所以根據最大等待時間 borrowMaxWaitMillis 來調用 takeFirst 或者 pollFirst(time) 方法進行阻塞式獲取;當 blockWhenExhausted 為 false 時,則直接拋出異常返回。

create 方法會判斷當前狀況下是否應該創建新的對象,主要是要防止創建的對象數量超過最大池對象數量。如果可以創建新對象,則調用 PooledObjectFactory 的 makeObject 方法進行新對象創建,然后根據 testOnCreate 配置來判斷是否調用 validateObject 方法進行校驗,源碼如下所示。

private PooledObject<T> create() throws Exception {
    int localMaxTotal = getMaxTotal(); // 獲取池對象最大數量
    final long localStartTimeMillis = System.currentTimeMillis();
    final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0); // 獲取最大等待時間
    Boolean create = null;
    // 一直等待到 create 被賦值,true代表要創建新對象,false代表不能創建
    while (create == null) {
        synchronized (makeObjectCountLock) {
            final long newCreateCount = createCount.incrementAndGet();
            if (newCreateCount > localMaxTotal) {
                // pool已經滿或者正在創建的足夠達到最大數量的對象。
                createCount.decrementAndGet();
                if (makeObjectCount == 0) {
                    // 目前沒有其他的 makeObject 方法被調用,直接返回false
                    create = Boolean.FALSE;
                } else {
                    // 目前有其他的 makeObject 方法被調用,但是可能失敗,所以等待一段時間再試試
                    makeObjectCountLock.wait(localMaxWaitTimeMillis);
                }
            } else {
                // pool未滿 可以創建對象。
                makeObjectCount++;
                create = Boolean.TRUE;
            }
        }

        // 執行超過 maxWaitTimeMillis 則返回false
        if (create == null &&
            (localMaxWaitTimeMillis > 0 &&
                System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
            create = Boolean.FALSE;
        }
    }
    // 如果 create 為false,返回 NULL
    if (!create.booleanValue()) {
        return null;
    }

    final PooledObject<T> p;
    try {
        // 調用 factory 的 makeObject 進行對象創建,並且按照 testOnCreate 配置調用 validateObject 方法
        p = factory.makeObject();
        if (getTestOnCreate() && !factory.validateObject(p)) {
            // 這里代碼有問題,校驗不通過的對象沒有進行銷毀?
            createCount.decrementAndGet();
            return null;
        }
    } catch (final Throwable e) {
        createCount.decrementAndGet();
        throw e;
    } finally {
        synchronized (makeObjectCountLock) {
            // 減少 makeObjectCount
            makeObjectCount--;
            makeObjectCountLock.notifyAll();
        }
    }
    allObjects.put(new IdentityWrapper<>(p.getObject()), p);
    return p;
}

需要注意的是 create 方法創建的對象並沒有第一時間加入到 idleObjects 隊列中,該對象將會在后續使用完畢調用 returnObject 方法時才會加入到隊列中。

接下來,我們看一下 returnObject 方法的實現。該方法主要做了六步操作:

  • 第一步是調用 markReturningState 方法將狀態變更為 RETURNING。
  • 第二步是根據 testOnReturn 配置調用 PooledObjectFactory 的 validateObject 方法進行可用性校驗。如果未通過校驗,則調用 destroy 消耗該對象,然后調用 ensureIdle 確保池中有 IDLE 狀態對象可用,如果沒有會調用 create 方法創建新的對象。
  • 第三步是調用 PooledObjectFactory 的 passivateObject 方法進行反初始化操作。
  • 第四步是調用 deallocate 將狀態變更為 IDLE。
  • 第五步是檢測是否超過了最大 IDLE 對象數量,如果超過了則銷毀當前對象。
  • 第六步是根據 LIFO (last in, first out) 配置將對象放置到隊列的首部或者尾部。
public void returnObject(final T obj) {
    final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
    // 1 將狀態轉換為 RETURNING
    markReturningState(p);

    final long activeTime = p.getActiveTimeMillis();
    // 2 根據配置,對實例進行可用性校驗
    if (getTestOnReturn() && !factory.validateObject(p)) {
        destroy(p);
        // 因為刪除了一個對象,需要確保池內還有對象,如果沒有改方法會創建新對象
        ensureIdle(1, false); 
        updateStatsReturn(activeTime);
        return;
    }
    // 3 調用 passivateObject 將對象反初始化。
    try {
        factory.passivateObject(p);
    } catch (final Exception e1) {
         .... // 和上邊 validateObject 校驗失敗相同操作。
    }
    // 4 將狀態變更為 IDLE
    if (!p.deallocate()) {
        throw new IllegalStateException(
                "Object has already been returned to this pool or is invalid");
    }

    final int maxIdleSave = getMaxIdle();
    // 5 如果超過最大 IDLE 數量,則進行銷毀
    if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
        .... // 同上邊 validateObject 校驗失敗相同操作。
    } else {
        // 6 根據 LIFO 配置,將歸還的對象放置在隊列首部或者尾部。 這邊源碼拼錯了。
        if (getLifo()) {
            idleObjects.addFirst(p);
        } else {
            idleObjects.addLast(p);
        }
    }
    updateStatsReturn(activeTime);
}

下圖介紹了第六步兩種入隊列的場景,LIFO 為 true 時防止在隊列頭部;LIFO 為 false 時,防止在隊列尾部。要根據不同的池化對象選擇不同的場景。但是放置在尾部可以避免並發熱點,因為借對象和還對象都需要操作隊列頭,需要進行並發控制。

標記刪除狀態變化

標記刪除狀態變化操作主要通過 removeAbandoned 實現,它主要是檢查已經借出的對象是否需要刪除,防止對象被借出長時間未使用或者歸還所導致的池對象被耗盡的情況。

removeAbandoned 根據 AbandonedConfig 可能會在 borrowObject 或者 檢測驅除對象的 evict 方法執行時被調用。

public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
    
    final AbandonedConfig ac = this.abandonedConfig;
    // 當配置了 removeAbandonedOnBorrow 並且 當前 idle 對象數量少於2,活躍對象數量只比最大對象數量少3.
    if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
            (getNumIdle() < 2) &&
            (getNumActive() > getMaxTotal() - 3) ) {
        removeAbandoned(ac);
    }
    ....
}

public void evict() throws Exception {
    ....
    final AbandonedConfig ac = this.abandonedConfig;
        // 設置了 removeAbandonedOnMaintenance
        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
            removeAbandoned(ac);
        }
}

removeAbandoned 使用典型的標記刪除策略:標記階段是先對所有的對象進行遍歷,如果該對象是 ALLOCATED 並且上次使用時間已經超過超時時間,則將其狀態變更為 ABANDONED 狀態,並加入到刪除隊列中;刪除階段則遍歷刪除隊列,依次調用 invalidateObject 方法刪除並銷毀對象。

private void removeAbandoned(final AbandonedConfig ac) {
    // 收集需要 abandoned 的對象
    final long now = System.currentTimeMillis();
    // 1 根據配置的時間計算超時時間
    final long timeout =
            now - (ac.getRemoveAbandonedTimeout() * 1000L);
    final ArrayList<PooledObject<T>> remove = new ArrayList<>();
    final Iterator<PooledObject<T>> it = allObjects.values().iterator();
    while (it.hasNext()) {
        final PooledObject<T> pooledObject = it.next();
        // 2 遍歷所有的對象,如果它是已經分配狀態,並且該對象的最近一次使用時間小於超時時間
        synchronized (pooledObject) {
            if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                    pooledObject.getLastUsedTime() <= timeout) {
                // 3 將對象狀態更改為 ABANDONED,並加入到刪除隊列
                pooledObject.markAbandoned();
                remove.add(pooledObject);
            }
        }
    }

    // 4 遍歷刪除隊列
    final Iterator<PooledObject<T>> itr = remove.iterator();
    while (itr.hasNext()) {
        final PooledObject<T> pooledObject = itr.next();
        // 5 調用 invalidateObject 方法刪除對象
        invalidateObject(pooledObject.getObject());
    }
}

invalidateObject 方法直接調用了 destroy 方法,destroy 方法在上邊的源碼分析中也反復出現,它主要進行了四步操作:

  • 1 將對象狀態變更為 INVALID。
  • 2 將對象從隊列和集合中刪除。
  • 3 調用 PooledObjectFactory 的 destroyObject 方法銷毀對象。
  • 4 更新統計數據
private void destroy(final PooledObject<T> toDestroy) throws Exception {
    // 1 將狀態變更為 INVALID
    toDestroy.invalidate();
    // 2 從隊列和池中刪除
    idleObjects.remove(toDestroy);
    allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
    // 3 調用 destroyObject 回收對象
    try {
        factory.destroyObject(toDestroy);
    } finally {
        // 4 更新統計數據
        destroyedCount.incrementAndGet();
        createCount.decrementAndGet();
    }
}

檢測驅除狀態變化

檢測驅除狀態變化主要由 evict 方法操作,在后台線程中獨立完成,主要檢測池中的 IDLE 狀態的空閑對象是否需要驅除,超時時間通過 EvictionConfig 進行配置。

驅逐者 Evictor,在 BaseGenericObjectPool 中定義,本質是由 java.util.TimerTask 定義的定時任務。

final void startEvictor(final long delay) {
    synchronized (evictionLock) {
        if (delay > 0) {
            // 定時執行 evictor 線程
            evictor = new Evictor();
            EvictionTimer.schedule(evictor, delay, delay);
        }
    }
}

在 Evictor 線程中會調用 evict 方法,該方法主要是遍歷所有的 IDLE 對象,然后對每個對象執行檢測驅除操作,具體源碼如下所示:

  • 調用 startEvictionTest 方法將狀態更改為 EVICTED。
  • 根據驅除策略和對象超時時間判斷是否要驅除。
  • 如果需要被驅除則調用 destroy 方法銷毀對象。
  • 如果設置了 testWhileIdle 則調用 PooledObject 的 validateObject 進行可用性校驗。
  • 調用 endEvictionTest 將狀態更改為 IDLE。
public void evict() throws Exception {
    if (idleObjects.size() > 0) {
        ....
        final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
        synchronized (evictionLock) {
            for (int i = 0, m = getNumTests(); i < m; i++) {
                // 1 遍歷所有 idle 的對象
                try {
                    underTest = evictionIterator.next();
                } catch (final NoSuchElementException nsee) {
                }
                // 2 調用 startEvictionTest 將狀態變更為 EVICTED
                if (!underTest.startEvictionTest()) {
                    continue;
                }
                // 3 根據驅除策略判斷是否要驅除
                boolean evict = evictionPolicy.evict(evictionConfig, underTest,
                        idleObjects.size());

                if (evict) {
                    // 4 進行驅除
                    destroy(underTest);
                    destroyedByEvictorCount.incrementAndGet();
                } else {
                    // 5 如果需要檢測,則進行可用性檢測
                    if (testWhileIdle) {
                        factory.activateObject(underTest);
                        factory.validateObject(underTest));
                        factory.passivateObject(underTest);
                        }
                    // 5 變更狀態為 IDLE
                    if (!underTest.endEvictionTest(idleObjects)) {
                    }
                }
            }
        }
    }
    .... // abandoned 相關的操作
}

后記

后續會分析 Hikari 和 Druid 數據庫連接池的實現,請大家多多關注。

個人博客,歡迎來玩

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM