池化技術(二)HikariCP是如何管理數據庫連接的?


基於依賴程序的版本信息:HikariCP:3.3.1               驅動程序mysql-connector-java:8.0.17

上一篇:Druid是如何管理數據庫連接的

零、類圖和流程圖

開始前先來了解下HikariCP獲取一個連接時類間的交互流程,方便下面詳細流程的閱讀。

獲取連接時的類間交互:

圖1

一、主流程1:獲取連接流程

HikariCP獲取連接時的入口是HikariDataSource里的getConnection方法,現在來看下該方法的具體流程:

主流程1

上述為HikariCP獲取連接時的流程圖,由圖1可知,每個datasource對象里都會持有一個HikariPool對象,記為pool,初始化后的datasource對象pool是空的,所以第一次getConnection的時候會進行實例化pool屬性(參考主流程1),初始化的時候需要將當前datasource里的config屬性傳過去,用於pool的初始化,最終標記sealed,然后根據pool對象調用getConnection方法(參考流程1.1),獲取成功后返回連接對象。

二、主流程2:初始化池對象

主流程2

 該流程用於初始化整個連接池,這個流程會給連接池內所有的屬性做初始化的工作,其中比較主要的幾個流程上圖已經指出,簡單概括一下:

  1. 利用config初始化各種連接池屬性,並且產生一個用於生產物理連接的數據源DriverDataSource
  2. 初始化存放連接對象的核心類connectionBag
  3. 初始化一個延時任務線程池類型的對象houseKeepingExecutorService,用於后續執行一些延時/定時類任務(比如連接泄漏檢查延時任務,參考流程2.2以及主流程4,除此之外maxLifeTime后主動回收關閉連接也是交由該對象來執行的,參考主流程3
  4. 預熱連接池,HikariCP會在該流程的checkFailFast里初始化好一個連接對象放進池子內,當然觸發該流程得保證initializationTimeout > 0時(默認值1),這個配置屬性表示留給預熱操作的時間(默認值1在預熱失敗時不會發生重試)。與Druid通過initialSize控制預熱連接對象數不一樣的是,HikariCP僅預熱進池一個連接對象。
  5. 初始化一個線程池對象addConnectionExecutor,用於后續擴充連接對象
  6. 初始化一個線程池對象closeConnectionExecutor,用於關閉一些連接對象,怎么觸發關閉任務呢?可以參考流程1.1.2

三、流程1.1:通過HikariPool獲取連接對象

流程1.1

從最開始的結構圖可知,每個HikariPool里都維護一個ConcurrentBag對象,用於存放連接對象,由上圖可以看到,實際上HikariPoolgetConnection就是從ConcurrentBag里獲取連接的(調用其borrow方法獲得,對應ConnectionBag主流程),在長連接檢查這塊,與之前說的Druid不同,這里的長連接判活檢查在連接對象沒有被標記為“已丟棄”時,只要距離上次使用超過500ms每次取出都會進行檢查(500ms是默認值,可通過配置com.zaxxer.hikari.aliveBypassWindowMs的系統參數來控制),emmmm,也就是說HikariCP對長連接的活性檢查很頻繁,但是其並發性能依舊優於Druid,說明頻繁的長連接檢查並不是導致連接池性能高低的關鍵所在。

這個其實是由於HikariCP無鎖實現,在高並發時對CPU的負載沒有其他連接池那么高而產生的並發性能差異,后面會說HikariCP的具體做法,即使是Druid,在獲取連接、生成連接、歸還連接時都進行了鎖控制,因為通過上篇文章可以知道,Druid里的連接池資源是多線程共享的,不可避免的會有鎖競爭,有鎖競爭意味着線程狀態的變化會很頻繁,線程狀態變化頻繁意味着CPU上下文切換也將會很頻繁。

回到流程1.1,如果拿到的連接為空,直接報錯,不為空則進行相應的檢查,如果檢查通過,則包裝成ConnectionProxy對象返回給業務方,不通過則調用closeConnection方法關閉連接(對應流程1.1.2,該流程會觸發ConcurrentBagremove方法丟棄該連接,然后把實際的驅動連接交給closeConnectionExecutor線程池,異步關閉驅動連接)。

四、流程1.1.1:連接判活

流程1.1.1

承接上面的流程1.1里的判活流程,來看下判活是如何做的,首先說驗證方法(注意這里該方法接受的這個connection對象不是poolEntry,而是poolEntry持有的實際驅動的連接對象),在之前介紹Druid的時候就知道,Druid是根據驅動程序里是否存在ping方法來判斷是否啟用ping的方式判斷連接是否存活,但是到了HikariCP則更加簡單粗暴,僅根據是否配置了connectionTestQuery覺定是否啟用ping:


this.isUseJdbc4Validation = config.getConnectionTestQuery() == null;

所以一般驅動如果不是特別低的版本,不建議配置該項,否則便會走createStatement+excute的方式,相比ping簡單發送心跳數據,這種方式顯然更低效。

此外,這里在剛進來還會通過驅動的連接對象重新給它設置一遍networkTimeout的值,使之變成validationTimeout,表示一次驗證的超時時間,為啥這里要重新設置這個屬性呢?因為在使用ping方法校驗時,是沒辦法通過類似statement那樣可以setQueryTimeout的,所以只能由網絡通信的超時時間來控制,這個時間可以通過jdbc的連接參數socketTimeout來控制:


jdbc:mysql://127.0.0.1:3306/xxx?socketTimeout=250

這個值最終會被賦值給HikariCP的networkTimeout字段,這就是為什么最后那一步使用這個字段來還原驅動連接超時屬性的原因;說到這里,最后那里為啥要再次還原呢?這就很容易理解了,因為驗證結束了,連接對象還存活的情況下,它的networkTimeout的值這時仍然等於validationTimeout(不合預期),顯然在拿出去用之前,需要恢復成本來的值,也就是HikariCP里的networkTimeout屬性。

五、流程1.1.2:關閉連接對象

流程1.1.2

這個流程簡單來說就是把流程1.1.1中驗證不通過的死連接,主動關閉的一個流程,首先會把這個連接對象從ConnectionBag里移除,然后把實際的物理連接交給一個線程池去異步執行,這個線程池就是在主流程2里初始化池的時候初始化的線程池closeConnectionExecutor,然后異步任務內開始實際的關連接操作,因為主動關閉了一個連接相當於少了一個連接,所以還會觸發一次擴充連接池(參考主流程5)操作。 

六、流程2.1:HikariCP監控設置

不同於Druid那樣監控指標那么多,HikariCP會把我們非常關心的幾項指標暴露給我們,比如當前連接池內閑置連接數、總連接數、一個連接被用了多久歸還、創建一個物理連接花費多久等,HikariCP的連接池的監控我們這一節專門詳細的分解一下,首先找到HikariCP下面的metrics文件夾,這下面放置了一些規范實現的監控接口等,還有一些現成的實現(比如HikariCP自帶對prometheus、micrometer、dropwizard的支持,不太了解后面兩個,prometheus下文直接稱為普羅米修斯):

圖2

下面,來着重看下接口的定義:


//這個接口的實現主要負責收集一些動作的耗時
public interface IMetricsTracker extends AutoCloseable
{
    //這個方法觸發點在創建實際的物理連接時(主流程3),用於記錄一個實際的物理連接創建所耗費的時間
    default void recordConnectionCreatedMillis(long connectionCreatedMillis) {}

    //這個方法觸發點在getConnection時(主流程1),用於記錄獲取一個連接時實際的耗時
    default void recordConnectionAcquiredNanos(final long elapsedAcquiredNanos) {}

    //這個方法觸發點在回收連接時(主流程6),用於記錄一個連接從被獲取到被回收時所消耗的時間
    default void recordConnectionUsageMillis(final long elapsedBorrowedMillis) {}

    //這個方法觸發點也在getConnection時(主流程1),用於記錄獲取連接超時的次數,每發生一次獲取連接超時,就會觸發一次該方法的調用
    default void recordConnectionTimeout() {}

    @Override
    default void close() {}
}

觸發點都了解清楚后,再來看看MetricsTrackerFactory的接口定義:


//用於創建IMetricsTracker實例,並且按需記錄PoolStats對象里的屬性(這個對象里的屬性就是類似連接池當前閑置連接數之類的線程池狀態類指標)
public interface MetricsTrackerFactory
{
    //返回一個IMetricsTracker對象,並且把PoolStats傳了過去
    IMetricsTracker create(String poolName, PoolStats poolStats);
}

上面的接口用法見注釋,針對新出現的PoolStats類,我們來看看它做了什么:


public abstract class PoolStats {
    private final AtomicLong reloadAt; //觸發下次刷新的時間(時間戳)
    private final long timeoutMs; //刷新下面的各項屬性值的頻率,默認1s,無法改變

    // 總連接數
    protected volatile int totalConnections;
    // 閑置連接數
    protected volatile int idleConnections;
    // 活動連接數
    protected volatile int activeConnections;
    // 由於無法獲取到可用連接而阻塞的業務線程數
    protected volatile int pendingThreads;
    // 最大連接數
    protected volatile int maxConnections;
    // 最小連接數
    protected volatile int minConnections;

    public PoolStats(final long timeoutMs) {
        this.timeoutMs = timeoutMs;
        this.reloadAt = new AtomicLong();
    }

    //這里以獲取最大連接數為例,其他的跟這個差不多
    public int getMaxConnections() {
        if (shouldLoad()) { //是否應該刷新
            update(); //刷新屬性值,注意這個update的實現在HikariPool里,因為這些屬性值的直接或間接來源都是HikariPool
        }

        return maxConnections;
    }
    
    protected abstract void update(); //實現在↑上面已經說了

    private boolean shouldLoad() { //按照更新頻率來決定是否刷新屬性值
        for (; ; ) {
            final long now = currentTime();
            final long reloadTime = reloadAt.get();
            if (reloadTime > now) {
                return false;
            } else if (reloadAt.compareAndSet(reloadTime, plusMillis(now, timeoutMs))) {
                return true;
            }
        }
    }
}

實際上這里就是這些屬性獲取和觸發刷新的地方,那么這個對象是在哪里被生成並且丟給MetricsTrackerFactorycreate方法的呢?這就是本節所需要講述的要點:主流程2里的設置監控器的流程,來看看那里發生了什么事吧:


//監控器設置方法(此方法在HikariPool中,metricsTracker屬性就是HikariPool用來觸發IMetricsTracker里方法調用的)
public void setMetricsTrackerFactory(MetricsTrackerFactory metricsTrackerFactory) {
    if (metricsTrackerFactory != null) {
        //MetricsTrackerDelegate是包裝類,是HikariPool的一個靜態內部類,是實際持有IMetricsTracker對象的類,也是實際觸發IMetricsTracker里方法調用的類
        //這里首先會觸發MetricsTrackerFactory類的create方法拿到IMetricsTracker對象,然后利用getPoolStats初始化PoolStat對象,然后也一並傳給MetricsTrackerFactory
        this.metricsTracker = new MetricsTrackerDelegate(metricsTrackerFactory.create(config.getPoolName(), getPoolStats()));
    } else {
        //不啟用監控,直接等於一個沒有實現方法的空類
        this.metricsTracker = new NopMetricsTrackerDelegate();
    }
}

private PoolStats getPoolStats() {
    //初始化PoolStats對象,並且規定1s觸發一次屬性值刷新的update方法
    return new PoolStats(SECONDS.toMillis(1)) {
        @Override
        protected void update() {
            //實現了PoolStat的update方法,刷新各個屬性的值
            this.pendingThreads = HikariPool.this.getThreadsAwaitingConnection();
            this.idleConnections = HikariPool.this.getIdleConnections();
            this.totalConnections = HikariPool.this.getTotalConnections();
            this.activeConnections = HikariPool.this.getActiveConnections();
            this.maxConnections = config.getMaximumPoolSize();
            this.minConnections = config.getMinimumIdle();
        }
    };
}

到這里HikariCP的監控器就算是注冊進去了,所以要想實現自己的監控器拿到上面的指標,要經過如下步驟:

  1. 新建一個類實現IMetricsTracker接口,我們這里將該類記為IMetricsTrackerImpl
  2. 新建一個類實現MetricsTrackerFactory接口,我們這里將該類記為MetricsTrackerFactoryImpl,並且將上面的IMetricsTrackerImpl在其create方法內實例化
  3. 將MetricsTrackerFactoryImpl實例化后調用HikariPoolsetMetricsTrackerFactory方法注冊到Hikari連接池。

上面沒有提到PoolStats里的屬性怎么監控,這里來說下,由於create方法是調用一次就沒了,create方法只是接收了PoolStats對象的實例,如果不處理,那么隨着create調用的結束,這個實例針對監控模塊來說就失去持有了,所以這里如果想要拿到PoolStats里的屬性,就需要開啟一個守護線程,讓其持有PoolStats對象實例,並且定時獲取其內部屬性值,然后push給監控系統,如果是普羅米修斯等使用pull方式獲取監控數據的監控系統,可以效仿HikariCP原生普羅米修斯監控的實現,自定義一個Collector對象來接收PoolStats實例,這樣普羅米修斯就可以定期拉取了,比如HikariCP根據普羅米修斯監控系統自己定義的MetricsTrackerFactory實現(對應圖2里的PrometheusMetricsTrackerFactory類):


@Override
public IMetricsTracker create(String poolName, PoolStats poolStats) {
    getCollector().add(poolName, poolStats); //將接收到的PoolStats對象直接交給Collector,這樣普羅米修斯服務端每觸發一次采集接口的調用,PoolStats都會跟着執行一遍內部屬性獲取流程
    return new PrometheusMetricsTracker(poolName, this.collectorRegistry); //返回IMetricsTracker接口的實現類
}

//自定義的Collector
private HikariCPCollector getCollector() {
    if (collector == null) {
        //注冊到普羅米修斯收集中心
        collector = new HikariCPCollector().register(this.collectorRegistry);
    }
    return collector;
}

通過上面的解釋可以知道在HikariCP中如何自定義一個自己的監控器,以及相比Druid的監控,有什么區別。

工作中很多時候都是需要自定義的,我司雖然也是用的普羅米修斯監控,但是因為HikariCP原生的普羅米修斯收集器里面對監控指標的命名並不符合我司的規范,所以就自定義了一個,有類似問題的不妨也試一試。

這一節沒有畫圖,純代碼,因為畫圖不太好解釋這部分的東西,這部分內容與連接池整體流程關系也不大,充其量獲取了連接池本身的一些屬性,在連接池里的觸發點也在上面代碼段的注釋里說清楚了,看代碼定義可能更好理解一些。

七、流程2.2:連接泄漏的檢測與告警

本節對應主流程2里的子流程2.2,在初始化池對象時,初始化了一個叫做leakTaskFactory的屬性,本節來看下它具體是用來做什么的。

7.1:它是做什么的?

一個連接被拿出去使用時間超過leakDetectionThreshold(可配置,默認0)未歸還的,會觸發一個連接泄漏警告,通知業務方目前存在連接泄漏的問題。

7.2:過程詳解

該屬性是ProxyLeakTaskFactory類型對象,且它還會持有houseKeepingExecutorService這個線程池對象,用於生產ProxyLeakTask對象,然后利用上面的houseKeepingExecutorService延時運行該對象里的run方法。該流程的觸發點在上面的流程1.1最后包裝成ProxyConnection對象的那一步,來看看具體的流程圖:

流程2.2

每次在流程1.1那里生成ProxyConnection對象時,都會觸發上面的流程,由流程圖可以知道,ProxyConnection對象持有PoolEntryProxyLeakTask的對象,其中初始化ProxyLeakTask對象時就用到了leakTaskFactory對象,通過其schedule方法可以進行ProxyLeakTask的初始化,並將其實例傳遞給ProxyConnection進行初始化賦值(ps:由圖知ProxyConnection在觸發回收事件時,會主動取消這個泄漏檢查任務,這也是ProxyConnection需要持有ProxyLeakTask對象的原因)。

在上面的流程圖中可以知道,只有在leakDetectionThreshold不等於0的時候才會生成一個帶有實際延時任務的ProxyLeakTask對象,否則返回無實際意義的空對象。所以要想啟用連接泄漏檢查,首先要把leakDetectionThreshold配置設置上,這個屬性表示經過該時間后借出去的連接仍未歸還,則觸發連接泄漏告警。

ProxyConnection之所以要持有ProxyLeakTask對象,是因為它可以監聽到連接是否觸發歸還操作,如果觸發,則調用cancel方法取消延時任務,防止誤告。

由此流程可以知道,跟Druid一樣,HikariCP也有連接對象泄漏檢查,與Druid主動回收連接相比,HikariCP實現更加簡單,僅僅是在觸發時打印警告日志,不會采取具體的強制回收的措施。

與Druid一樣,默認也是關閉這個流程的,因為實際開發中一般使用第三方框架,框架本身會保證及時的close連接,防止連接對象泄漏,開啟與否還是取決於業務是否需要,如果一定要開啟,如何設置leakDetectionThreshold的大小也是需要考慮的一件事。

八、主流程3:生成連接對象

本節來講下主流程2里的createEntry方法,這個方法利用PoolBase里的DriverDataSource對象生成一個實際的連接對象(如果忘記DriverDatasource是哪里初始化的了,可以看下主流程2PoolBaseinitializeDataSource方法的作用),然后用PoolEntry類包裝成PoolEntry對象,現在來看下這個包裝類有哪些主要屬性:


final class PoolEntry implements IConcurrentBagEntry {
    private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class);
    //通過cas來修改state屬性
    private static final AtomicIntegerFieldUpdater stateUpdater;

    Connection connection; //實際的物理連接對象
    long lastAccessed; //觸發回收時刷新該時間,表示“最近一次使用時間”
    long lastBorrowed; //getConnection里borrow成功后刷新該時間,表示“最近一次借出的時間”

    @SuppressWarnings("FieldCanBeLocal")
    private volatile int state = 0; //連接狀態,枚舉值:IN_USE(使用中)、NOT_IN_USE(閑置中)、REMOVED(已移除)、RESERVED(標記為保留中)
    private volatile boolean evict; //是否被標記為廢棄,很多地方用到(比如流程1.1靠這個判斷連接是否已被廢棄,再比如主流程4里時鍾回撥時觸發的直接廢棄邏輯)

    private volatile ScheduledFuture<?> endOfLife; //用於在超過連接生命周期(maxLifeTime)時廢棄連接的延時任務,這里poolEntry要持有該對象,主要是因為在對象主動被關閉時(意味着不需要在超過maxLifeTime時主動失效了),需要cancel掉該任務

    private final FastList openStatements; //當前該連接對象上生成的所有的statement對象,用於在回收連接時主動關閉這些對象,防止存在漏關的statement
    private final HikariPool hikariPool; //持有pool對象

    private final boolean isReadOnly; //是否為只讀
    private final boolean isAutoCommit; //是否存在事務
}

上面就是整個PoolEntry對象里所有的屬性,這里再說下endOfLife對象,它是一個利用houseKeepingExecutorService這個線程池對象做的延時任務,這個延時任務一般在創建好連接對象后maxLifeTime左右的時間觸發,具體來看下createEntry代碼:


private PoolEntry createPoolEntry() {

        final PoolEntry poolEntry = newPoolEntry(); //生成實際的連接對象

        final long maxLifetime = config.getMaxLifetime(); //拿到配置好的maxLifetime
        if (maxLifetime > 0) { //<=0的時候不啟用主動過期策略
            // 計算需要減去的隨機數
            // 源注釋:variance up to 2.5% of the maxlifetime
            final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong(maxLifetime / 40) : 0;
            final long lifetime = maxLifetime - variance; //生成實際的延時時間
            poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
                    () -> { //實際的延時任務,這里直接觸發softEvictConnection,而softEvictConnection內則會標記該連接對象為廢棄狀態,然后嘗試修改其狀態為STATE_RESERVED,若成功,則觸發closeConnection(對應流程1.1.2)
                        if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) {
                            addBagItem(connectionBag.getWaitingThreadCount()); //回收完畢后,連接池內少了一個連接,就會嘗試新增一個連接對象
                        }
                    },
                    lifetime, MILLISECONDS)); //給endOfLife賦值,並且提交延時任務,lifetime后觸發
        }

        return poolEntry;
    }

    //觸發新增連接任務
    public void addBagItem(final int waiting) {
        //前排提示:addConnectionQueue和addConnectionExecutor的關系和初始化參考主流程2

        //當添加連接的隊列里已提交的任務超過那些因為獲取不到連接而發生阻塞的線程個數時,就進行提交連接新增連接的任務
        final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
        if (shouldAdd) {
            //提交任務給addConnectionExecutor這個線程池,PoolEntryCreator是一個實現了Callable接口的類,下面將通過流程圖的方式介紹該類的call方法
            addConnectionExecutor.submit(poolEntryCreator);
        }
    }

通過上面的流程,可以知道,HikariCP一般通過createEntry方法來新增一個連接入池,每個連接被包裝成PoolEntry對象,在創建好對象時,同時會提交一個延時任務來關閉廢棄該連接,這個時間就是我們配置的maxLifeTime,為了保證不在同一時間失效,HikariCP還會利用maxLifeTime減去一個隨機數作為最終的延時任務延遲時間,然后在觸發廢棄任務時,還會觸發addBagItem,進行連接添加任務(因為廢棄了一個連接,需要往池子里補充一個),該任務則交給由主流程2里定義好的addConnectionExecutor線程池執行,那么,現在來看下這個異步添加連接對象的任務流程:

 

addConnectionExecutor的call流程

這個流程就是往連接池里加連接用的,跟createEntry結合起來說是因為這倆流程是緊密相關的,除此之外,主流程5(fillPool,擴充連接池)也會觸發該任務。

 九、主流程4:連接池縮容

HikariCP會按照minIdle定時清理閑置過久的連接,這個定時任務在主流程2初始化連接池對象時被啟用,跟上面的流程一樣,也是利用houseKeepingExecutorService這個線程池對象做該定時任務的執行器。

來看下主流程2里是怎么啟用該任務的:


//housekeepingPeriodMs的默認值是30s,所以定時任務的間隔為30s
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);

那么本節主要來說下HouseKeeper這個類,該類實現了Runnable接口,回收邏輯主要在其run方法內,來看看run方法的邏輯流程圖:

主流程4:連接池縮容

上面的流程就是HouseKeeper的run方法里具體做的事情,由於系統時間回撥會導致該定時任務回收一些連接時產生誤差,因此存在如下判斷:


//now就是當前系統時間,previous就是上次觸發該任務時的時間,housekeepingPeriodMs就是隔多久觸發該任務一次
//也就是說plusMillis(previous, housekeepingPeriodMs)表示當前時間
//如果系統時間沒被回撥,那么plusMillis(now, 128)一定是大於當前時間的,如果被系統時間被回撥
//回撥的時間超過128ms,那么下面的判斷就成立,否則永遠不會成立
if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs))

這是hikariCP在解決系統時鍾被回撥時做出的一種措施,通過流程圖可以看到,它是直接把池子里所有的連接對象取出來挨個兒的標記成廢棄,並且嘗試把狀態值修改為STATE_RESERVED(后面會說明這些狀態,這里先不深究)。如果系統時鍾沒有發生改變(絕大多數情況會命中這一塊的邏輯),由圖知,會把當前池內所有處於閑置狀態(STATE_NOT_IN_USE)的連接拿出來,然后計算需要檢查的范圍,然后循環着修改連接的狀態:


//拿到所有處於閑置狀態的連接
final List notInUse = connectionBag.values(STATE_NOT_IN_USE);
//計算出需要被檢查閑置時間的數量,簡單來說,池內需要保證最小minIdle個連接活着,所以需要計算出超出這個范圍的閑置對象進行檢查
int toRemove = notInUse.size() - config.getMinIdle();
for (PoolEntry entry : notInUse) {
  //在檢查范圍內,且閑置時間超出idleTimeout,然后嘗試將連接對象狀態由STATE_NOT_IN_USE變為STATE_RESERVED成功
  if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) {
    closeConnection(entry, "(connection has passed idleTimeout)"); //滿足上述條件,進行連接關閉
    toRemove--;
  }
}
fillPool(); //因為可能回收了一些連接,所以要再次觸發連接池擴充流程檢查下是否需要新增連接。

上面的代碼就是流程圖里對應的沒有回撥系統時間時的流程邏輯。該流程在idleTimeout大於0(默認等於0)並且minIdle小於maxPoolSize的時候才會啟用,默認是不啟用的,若需要啟用,可以按照條件來配置。

十、主流程5:擴充連接池

這個流程主要依附HikariPool里的fillPool方法,這個方法已經在上面很多流程里出現過了,它的作用就是在觸發連接廢棄、連接池連接不夠用時,發起擴充連接數的操作,這是個很簡單的過程,下面看下源碼(為了使代碼結構更加清晰,對源碼做了細微改動):


// PoolEntryCreator關於call方法的實現流程在主流程3里已經看過了,但是這里卻有倆PoolEntryCreator對象,
// 這是個較細節的地方,用於打日志用,不再說這部分,為了便於理解,只需要知道這倆對象執行的是同一塊call方法即可
private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(null);
private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("After adding ");

private synchronized void fillPool() {
  // 這個判斷就是根據當前池子里相關數據,推算出需要擴充的連接數,
  // 判斷方式就是利用最大連接數跟當前連接總數的差值,與最小連接數與當前池內閑置的連接數的差值,取其最小的那一個得到
  int needAdd = Math.min(maxPoolSize - connectionBag.size(),
  minIdle - connectionBag.getCount(STATE_NOT_IN_USE));

  //減去當前排隊的任務,就是最終需要新增的連接數
  final int connectionsToAdd = needAdd - addConnectionQueue.size();
  for (int i = 0; i < connectionsToAdd; i++) {
    //一般循環的最后一次會命中postFillPoolEntryCreator任務,其實就是在最后一次會打印一次日志而已(可以忽略該干擾邏輯)
    addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator);
  }
}

由該過程可以知道,最終這個新增連接的任務也是交由addConnectionExecutor線程池來處理的,而任務的主題也是PoolEntryCreator,這個流程可以參考主流程3.

然后needAdd的推算:

Math.min(最大連接數 - 池內當前連接總數, 最小連接數 - 池內閑置的連接數)

根據這個方式判斷,可以保證池內的連接數永遠不會超過maxPoolSize,也永遠不會低於minIdle。在連接吃緊的時候,可以保證每次觸發都以minIdle的數量擴容。因此如果在maxPoolSizeminIdle配置的值一樣的話,在池內連接吃緊的時候,就不會發生任何擴容了。

十一、主流程6:連接回收

 最開始說過,最終真實的物理連接對象會被包裝成PoolEntry對象,存放進ConcurrentBag,然后獲取時,PoolEntry對象又會被再次包裝成ProxyConnection對象暴露給使用方的,那么觸發連接回收,實際上就是觸發ProxyConnection里的close方法:


public final void close() throws SQLException {
  // 原注釋:Closing statements can cause connection eviction, so this must run before the conditional below
  closeStatements(); //此連接對象在業務方使用過程中產生的所有statement對象,進行統一close,防止漏close的情況
  if (delegate != ClosedConnection.CLOSED_CONNECTION) {
    leakTask.cancel(); //取消連接泄漏檢查任務,參考流程2.2
    try {
      if (isCommitStateDirty && !isAutoCommit) { //在存在執行語句后並且還打開了事務,調用close時需要主動回滾事務
        delegate.rollback(); //回滾
        lastAccess = currentTime(); //刷新"最后一次使用時間"
      }
    } finally {
      delegate = ClosedConnection.CLOSED_CONNECTION;
      poolEntry.recycle(lastAccess); //觸發回收
    }
  }
}

這個就是ProxyConnection里的close方法,可以看到它最終會調用PoolEntry的recycle方法進行回收,除此之外,連接對象的最后一次使用時間也是在這個時候刷新的,該時間是個很重要的屬性,可以用來判斷一個連接對象的閑置時間,來看下PoolEntry的recycle方法:


void recycle(final long lastAccessed) {
  if (connection != null) {
    this.lastAccessed = lastAccessed; //刷新最后使用時間
    hikariPool.recycle(this); //觸發HikariPool的回收方法,把自己傳過去
  }
}

之前有說過,每個PoolEntry對象都持有HikariPool的對象,方便觸發連接池的一些操作,由上述代碼可以看到,最終還是會觸發HikariPool里的recycle方法,再來看下HikariPool的recycle方法:


void recycle(final PoolEntry poolEntry) {
  metricsTracker.recordConnectionUsage(poolEntry); //監控指標相關,忽略
  connectionBag.requite(poolEntry); //最終觸發connectionBag的requite方法歸還連接,該流程參考ConnectionBag主流程里的requite方法部分
}

以上就是連接回收部分的邏輯,相比其他流程,還是比較簡潔的。

十二、ConcurrentBag主流程

 這個類用來存放最終的PoolEntry類型的連接對象,提供了基本的增刪查的功能,被HikariPool持有,上面那么多的操作,幾乎都是在HikariPool中完成的,HikariPool用來管理實際的連接生產動作和回收動作,實際操作的卻是ConcurrentBag類,梳理下上面所有流程的觸發點:

  • 主流程2:初始化HikariPool時初始化ConcurrentBag(構造方法),預熱時通過createEntry拿到連接對象,調用ConcurrentBag.add添加連接到ConcurrentBag。
  • 流程1.1:通過HikariPool獲取連接時,通過調用ConcurrentBag.borrow拿到一個連接對象。
  • 主流程6:通過ConcurrentBag.requite歸還一個連接。
  • 流程1.1.2:觸發關閉連接時,會通過ConcurrentBag.remove移除連接對象,由前面的流程可知關閉連接觸發點為:連接超過最大生命周期maxLifeTime主動廢棄、健康檢查不通過主動廢棄、連接池縮容。
  • 主流程3:通過異步添加連接時,通過調用ConcurrentBag.add添加連接到ConcurrentBag,由前面的流程可知添加連接觸發點為:連接超過最大生命周期maxLifeTime主動廢棄連接后、連接池擴容。
  • 主流程4:連接池縮容任務,通過調用ConcurrentBag.values篩選出需要的做操作的連接對象,然后再通過ConcurrentBag.reserve完成對連接對象狀態的修改,然后會通過流程1.1.2觸發關閉和移除連接操作。

通過觸發點整理,可以知道該結構里的主要方法,就是上面觸發點里標記為橙色的部分,然后來具體看下該類的基本定義和主要方法:


public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {

    private final CopyOnWriteArrayList<T> sharedList; //最終存放PoolEntry對象的地方,它是一個CopyOnWriteArrayList
    private final boolean weakThreadLocals; //默認false,為true時可以讓一個連接對象在下方threadList里的list內處於弱引用狀態,防止內存泄漏(參見備注1)

    private final ThreadLocal<List<Object>> threadList; //線程級的緩存,從sharedList拿到的連接對象,會被緩存進當前線程內,borrow時會先從緩存中拿,從而達到池內無鎖實現
    private final IBagStateListener listener; //內部接口,HikariPool實現了該接口,主要用於ConcurrentBag主動通知HikariPool觸發添加連接對象的異步操作(也就是主流程3里的addConnectionExecutor所觸發的流程)
    private final AtomicInteger waiters; //當前因為獲取不到連接而發生阻塞的業務線程數,這個在之前的流程里也出現過,比如主流程3里addBagItem就會根據該指標進行判斷是否需要新增連接
    private volatile boolean closed; //標記當前ConcurrentBag是否已被關閉

    private final SynchronousQueue<T> handoffQueue; //這是個即產即銷的隊列,用於在連接不夠用時,及時獲取到add方法里新創建的連接對象,詳情可以參考下面borrow和add的代碼

    //內部接口,PoolEntry類實現了該接口
    public interface IConcurrentBagEntry {

        //連接對象的狀態,前面的流程很多地方都已經涉及到了,比如主流程4的縮容
        int STATE_NOT_IN_USE = 0; //閑置
        int STATE_IN_USE = 1; //使用中
        int STATE_REMOVED = -1; //已廢棄
        int STATE_RESERVED = -2; //標記保留,介於閑置和廢棄之間的中間狀態,主要由縮容那里觸發修改

        boolean compareAndSet(int expectState, int newState); //嘗試利用cas修改連接對象的狀態值

        void setState(int newState); //設置狀態值

        int getState(); //獲取狀態值
    }

    //參考上面listener屬性的解釋
    public interface IBagStateListener {
        void addBagItem(int waiting);
    }

    //獲取連接方法
    public T borrow(long timeout, final TimeUnit timeUnit) {
        // 省略...
    }

    //回收連接方法
    public void requite(final T bagEntry) {
        //省略...
    }

    //添加連接方法
    public void add(final T bagEntry) {
        //省略...
    }

    //移除連接方法
    public boolean remove(final T bagEntry) {
        //省略...
    }

    //根據連接狀態值獲取當前池子內所有符合條件的連接集合
    public List values(final int state) {
        //省略...
    }

    //獲取當前池子內所有的連接
    public List values() {
        //省略...
    }

    //利用cas把傳入的連接對象的state從 STATE_NOT_IN_USE 變為 STATE_RESERVED
    public boolean reserve(final T bagEntry) {
        //省略...
    }

    //獲取當前池子內符合傳入狀態值的連接數量
    public int getCount(final int state) {
        //省略...
    }
}

從這個基本結構就可以稍微看出HikariCP是如何優化傳統連接池實現的了,相比Druid來說,HikariCP更加偏向無鎖實現,盡量避免鎖競爭的發生。

12.1:borrow

這個方法用來獲取一個可用的連接對象,觸發點為流程1.1,HikariPool就是利用該方法獲取連接的,下面來看下該方法做了什么:


    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
        // 源注釋:Try the thread-local list first
        final List<Object> list = threadList.get(); //首先從當前線程的緩存里拿到之前被緩存進來的連接對象集合
        for (int i = list.size() - 1; i >= 0; i--) {
            final Object entry = list.remove(i); //先移除,回收方法那里會再次add進來
            final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; //默認不啟用弱引用
            // 獲取到對象后,通過cas嘗試把其狀態從STATE_NOT_IN_USE 變為 STATE_IN_USE,注意,這里如果其他線程也在使用這個連接對象,
            // 並且成功修改屬性,那么當前線程的cas會失敗,那么就會繼續循環嘗試獲取下一個連接對象
            if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry; //cas設置成功后,表示當前線程繞過其他線程干擾,成功獲取到該連接對象,直接返回
            }
        }

        // 源注釋:Otherwise, scan the shared list ... then poll the handoff queue
        final int waiting = waiters.incrementAndGet(); //如果緩存內找不到一個可用的連接對象,則認為需要“回源”,waiters+1
        try {
            for (T bagEntry : sharedList) {
                //循環sharedList,嘗試把連接狀態值從STATE_NOT_IN_USE 變為 STATE_IN_USE
                if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                    // 源注釋:If we may have stolen another waiter's connection, request another bag add.
                    if (waiting > 1) { //阻塞線程數大於1時,需要觸發HikariPool的addBagItem方法來進行添加連接入池,這個方法的實現參考主流程3
                        listener.addBagItem(waiting - 1);
                    }
                    return bagEntry; //cas設置成功,跟上面的邏輯一樣,表示當前線程繞過其他線程干擾,成功獲取到該連接對象,直接返回
                }
            }

            //走到這里說明不光線程緩存里的列表競爭不到連接對象,連sharedList里也找不到可用的連接,這時則認為需要通知HikariPool,該觸發添加連接操作了
            listener.addBagItem(waiting);

            timeout = timeUnit.toNanos(timeout); //這時候開始利用timeout控制獲取時間
            do {
                final long start = currentTime();
                //嘗試從handoffQueue隊列里獲取最新被加進來的連接對象(一般新入的連接對象除了加進sharedList之外,還會被offer進該隊列)
                final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
                //如果超出指定時間后仍然沒有獲取到可用的連接對象,或者獲取到對象后通過cas設置成功,這兩種情況都不需要重試,直接返回對象
                if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                    return bagEntry;
                }
                //走到這里說明從隊列內獲取到了連接對象,但是cas設置失敗,說明又該對象又被其他線程率先拿去用了,若時間還夠,則再次嘗試獲取
                timeout -= elapsedNanos(start); //timeout減去消耗的時間,表示下次循環可用時間
            } while (timeout > 10_000); //剩余時間大於10s時才繼續進行,一般情況下,這個循環只會走一次,因為timeout很少會配的比10s還大

            return null; //超時,仍然返回null
        } finally {
            waiters.decrementAndGet(); //這一步出去后,HikariPool收到borrow的結果,算是走出阻塞,所以waiters-1
        }
    }

仔細看下注釋,該過程大致分成三個主要步驟:

  1. 從線程緩存獲取連接
  2. 獲取不到再從sharedList里獲取
  3. 都獲取不到則觸發添加連接邏輯,並嘗試從隊列里獲取新生成的連接對象

12.2:add

這個流程會添加一個連接對象進入bag,通常由主流程3里的addBagItem方法通過addConnectionExecutor異步任務觸發添加操作,該方法主流程如下:


    public void add(final T bagEntry) {

        sharedList.add(bagEntry); //直接加到sharedList里去

        // 源注釋:spin until a thread takes it or none are waiting
        // 參考borrow流程,當存在線程等待獲取可用連接,並且當前新入的這個連接狀態仍然是閑置狀態,且隊列里無消費者等待獲取時,發起一次線程調度
        while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) { //注意這里會offer一個連接對象入隊列
            yield();
        }
    }

結合borrow來理解的話,這里在存在等待線程時會添加一個連接對象入隊列,可以讓borrow里發生等待的地方更容易poll到這個連接對象。

12.3:requite

這個流程會回收一個連接,該方法的觸發點在主流程6,具體代碼如下:


    public void requite(final T bagEntry) {
        bagEntry.setState(STATE_NOT_IN_USE); //回收意味着使用完畢,更改state為STATE_NOT_IN_USE狀態

        for (int i = 0; waiters.get() > 0; i++) { //如果存在等待線程的話,嘗試傳給隊列,讓borrow獲取
            if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
                return;
            }
            else if ((i & 0xff) == 0xff) {
                parkNanos(MICROSECONDS.toNanos(10));
            }
            else {
                yield();
            }
        }

        final List<Object> threadLocalList = threadList.get();
        if (threadLocalList.size() < 50) { //線程內連接集合的緩存最多50個,這里回收連接時會再次加進當前線程的緩存里,方便下次borrow獲取
            threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); //默認不啟用弱引用,若啟用的話,則緩存集合里的連接對象沒有內存泄露的風險
        }
    }

12.4:remove

這個負責從池子里移除一個連接對象,觸發點在流程1.1.2,代碼如下:


    public boolean remove(final T bagEntry) {
        // 下面兩個cas操作,都是從其他狀態變為移除狀態,任意一個成功,都不會走到下面的warn log
        if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
            LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
            return false;
        }

        // 直接從sharedList移除掉
        final boolean removed = sharedList.remove(bagEntry);
        if (!removed && !closed) {
            LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
        }

        return removed;
    }

這里需要注意的是,移除時僅僅移除了sharedList里的對象,各個線程內緩存的那一份集合里對應的對象並沒有被移除,這個時候會不會存在該連接再次從緩存里拿到呢?會的,但是不會返回出去,而是直接remove掉了,仔細看borrow的代碼發現狀態不是閑置狀態的時候,取出來時就會remove掉,然后也拿不出去,自然也不會觸發回收方法。

12.5:values

該方法存在重載方法,用於返回當前池子內連接對象的集合,觸發點在主流程4,代碼如下:


    public List values(final int state) {
        //過濾出來符合狀態值的對象集合逆序后返回出去
        final List list = sharedList.stream().filter(e -> e.getState() == state).collect(Collectors.toList());
        Collections.reverse(list);
        return list;
    }

    public List values() {
        //返回全部連接對象(注意下方clone為淺拷貝)
        return (List) sharedList.clone();
    }

12.6:reserve

該方法單純將連接對象的狀態值由STATE_NOT_IN_USE修改為STATE_RESERVED,觸發點仍然是主流程4,縮容時使用,代碼如下:


   public boolean reserve(final T bagEntry){
      return bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
   }

12.7:getCount

該方法用於返回池內符合某個狀態值的連接的總數量,觸發點為主流程5,擴充連接池時用於獲取閑置連接總數,代碼如下:


   public int getCount(final int state){
      int count = 0;
      for (IConcurrentBagEntry e : sharedList) {
         if (e.getState() == state) {
            count++;
         }
      }
      return count;
   }

以上就是ConcurrentBag的主要方法和處理連接對象的主要流程。

十三、總結

到這里基本上一個連接的生產到獲取到回收到廢棄一整個生命周期在HikariCP內是如何管理的就說完了,相比之前的Druid的實現,有很大的不同,主要是HikariCP的無鎖獲取連接,本篇沒有涉及FastList的說明,因為從連接管理這個角度確實很少用到該結構,用到FastList的地方主要在存儲連接對象生成的statement對象以及用於存儲線程內緩存起來的連接對象;

除此之外HikariCP還利用javassist技術編譯期生成了ProxyConnection的初始化,這里也沒有相關說明,網上有關HikariCP的優化有很多文章,大多數都提到了字節碼優化、fastList、concurrentBag的實現,本篇主要通過深入解析HikariPoolConcurrentBag的實現,來說明HikariCP相比Druid具體做了哪些不一樣的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM