Hikari連接池目前公認是性能最高的數據庫連接池,同時也是SpringBoot2.0以后默認使用的數據庫連接池。
一、Hikari的使用
1.1、Hikari相關配置
由於Springboot2.0默認就是使用的Hikari連接池,所以無需額外添加Hikari相關的maven依賴。只需要在application.yml添加對應的配置即可,如下:
spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull spring.datasource.username=admin spring.datasource.password=admin spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.maximum-pool-size=15 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=DatebookHikariCP spring.datasource.hikari.max-lifetime=1800000 spring.datasource.hikari.connection-timeout=30000 spring.datasource.hikari.connection-test-query=SELECT 1
1.2、Hikari配置詳解
配置項 | 案例值 | 描述 |
autoCommit | true | 是否自動提交 |
connectionTimeout | 30000 | 客戶端創建連接等待超時時間,如果30秒內沒有獲取連接則拋異常,不再繼續等待 |
idleTimeout | 60000 | 連接允許最長空閑時間,如果連接空閑時間超過1分鍾,則會被關閉 |
maxLifetime | 1800000 | 連接最長生命周期,當連接存活時間達到30分鍾之后會被關閉作退休處理 |
minimumIdle | 1 | 連接池中最小空閑連接數 |
maximumPoolSize | 10 | 連接池中最大連接數 |
readOnly | false | 從池中獲取的連接是否是只讀模式 |
validationTimeout | 5000 | 測試連接是否空閑的間隔 |
leadDetectionThreshold | 60000 | 連接被占用的超時時間,超過1分鍾客戶端沒有釋放連接則強制回收該連接,防止連接泄漏 |
二、Hikari源碼解析
2.1、獲取連接
1、Hikari中的核心類為HikariDataSource,表示Hikari連接池中的數據源,實現了DataSource接口的getConnection方法,getConnection方法源碼如下:
1 /** 連接池對象 2 * fastPathPool 會在初始化時創建 3 * pool 是在獲取連接數創建 4 * volatile修飾pool導致每次讀pool都要從主存加載,每次寫也要寫回主存,性能不如沒volatile修飾的fastPathPool 5 * */ 6 private final HikariPool fastPathPool; 7 private volatile HikariPool pool; 8 9 /** 獲取連接*/ 10 public Connection getConnection() throws SQLException 11 { 12 if (isClosed()) { 13 throw new SQLException("HikariDataSource " + this + " has been closed."); 14 } 15 /** 如果fastPathPool存在則直接獲取連接 */ 16 if (fastPathPool != null) { 17 return fastPathPool.getConnection(); 18 } 19 /** 如果沒有fastPathPool 則創建HikariPool對象 */ 20 HikariPool result = pool; 21 if (result == null) { 22 synchronized (this) { 23 result = pool; 24 if (result == null) { 25 validate(); 26 LOGGER.info("{} - Starting...", getPoolName()); 27 try { 28 /** 初始化創建HikariPool對象*/ 29 pool = result = new HikariPool(this); 30 this.seal(); 31 } 32 catch (PoolInitializationException pie) { 33 // 34 } 35 } 36 } 37 } 38 /** 調用pool的getConnection()方法獲取連接*/ 39 return result.getConnection(); 40 }
1 public HikariDataSource(HikariConfig configuration) 2 { 3 configuration.validate(); 4 configuration.copyStateTo(this); 5 6 LOGGER.info("{} - Starting...", configuration.getPoolName()); 7 pool = fastPathPool = new HikariPool(this); 8 LOGGER.info("{} - Start completed.", configuration.getPoolName()); 9 10 this.seal(); 11 }
getConnection方法邏輯不多,主要是調用了HikariPool的getConnection()方法,而HikariDataSource中有兩個HikariPool對象,一個是fastPathPool是在HikariPool有參構造函數中創建, 如果沒有創建fastPathPool,那么就會在getConnection方法時創建pool對象。
很顯然pool對象是由volatile關鍵字修飾的,而fastPathPool是final類型的,所以fastPathPool的效率會比pool要高,所以推薦使用HikariDataSource有參構造函數進行初始化。
2、由上可知獲取連接的邏輯是在HikariPool的getConnection方法中,繼續分析HikariPool的getConnection方法,源碼如下:
1 /** 獲取連接*/ 2 public Connection getConnection(final long hardTimeout) throws SQLException 3 { 4 /** 獲取鎖*/ 5 suspendResumeLock.acquire(); 6 final long startTime = currentTime(); 7 8 try { 9 long timeout = hardTimeout; 10 do { 11 /** 從ConcurrentBag中借出一個PoolEntry對象 */ 12 PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS); 13 if (poolEntry == null) { 14 break; // We timed out... break and throw exception 15 } 16 17 final long now = currentTime(); 18 /** 判斷連接是否被標記為拋棄 或者 空閑時間過長, 是的話就關閉連接*/ 19 if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) { 20 closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); 21 timeout = hardTimeout - elapsedMillis(startTime); 22 } 23 else { 24 metricsTracker.recordBorrowStats(poolEntry, startTime); 25 /** 通過Javassist創建代理連接*/ 26 return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); 27 } 28 } while (timeout > 0L); 29 metricsTracker.recordBorrowTimeoutStats(startTime); 30 throw createTimeoutException(startTime); 31 } 32 catch (InterruptedException e) { 33 Thread.currentThread().interrupt(); 34 throw new SQLException(poolName + " - Interrupted during connection acquisition", e); 35 } 36 finally { 37 /** 釋放鎖*/ 38 suspendResumeLock.release(); 39 } 40 }
核心步驟只有兩步,一個是調用ConcurrentBag的borrow方法借用一個PoolEntry對象,第二步調用調用PoolEntry的createProxyConnection方法動態生成代理connection對象。
這里涉及到了兩個核心的類,分別是ConcurrentBag和PoolEntry
3、PoolEntry
PoolEntry顧名思義是連接池的節點,實際也可以看作是一個Connection對象的封裝,連接池中存儲的連接就是以PoolEntry的方式進行存儲。
PoolEntry內部屬性如下:
屬性 | 類型 | 描述 |
connection | Connection | 數據庫連接 |
lastAccessed | long | 上一次訪問時間 |
lastBorrowed | long | 上一次借出時間 |
state | volatile int | 當前狀態 |
evict | volatile boolean | 是否該丟棄 |
openStatements | FastList | 打開的statement集合 |
hikariPool | HikariPool | 關聯的HikariPool對象 |
isReadOnly | boolean | 是否只讀 |
isAutoCommit | boolean | 是否自動提交 |
4、ConcurrentBag
ConcurrentBag直意就是並發包,本質就是連接池的主體,存儲連接的封裝對象PoolEntry,另外做了並發控制來解決連接池的並發問題。
ConcurrentBag的內部屬性如下:
屬性 | 類型 | 描述 |
sharedList | CopyOnWriteArrayList | 存放着狀態為未使用、使用中和保留中三種狀態的PoolEntry對象 |
weakThreadLocals | boolean | 是否使用弱引用 |
threadList | ThreadLocal<List<Object>> | 存放當前線程的PoolEntry對象,如果當前線程再次借用則優先會從該列表中獲取,但是也可能會被其他線程借走 |
listener | IBagStateListener | 添加元素的監聽器 |
waiters | AtomicInteger | 當前等待的線程數 |
closed | volatile boolean | 是否關閉標識 |
handOffQueue | SynchronousQueue | 無容量阻塞隊列,插入操作需要等待刪除操作,刪除操作無需等待插入操作 |
5、從ConcurrentBag借出一個元素
ConcurrentBag實現了borrow方法,意思是從並發集合中借出一個元素,對於連接池而言實際就是從連接池中獲取一個連接,源碼如下:
1 /** 借出一個對象 */ 2 public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException 3 { 4 /** 1.從ThreadLocal中獲取當前線程綁定的對象集合 */ 5 final List<Object> list = threadList.get(); 6 /** 1.1.如果當前線程變量中存在就直接從list中返回一個*/ 7 for (int i = list.size() - 1; i >= 0; i--) { 8 final Object entry = list.remove(i); 9 @SuppressWarnings("unchecked") 10 final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; 11 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 12 return bagEntry; 13 } 14 } 15 16 /** 2.當前等待對象數量自增1 */ 17 final int waiting = waiters.incrementAndGet(); 18 try { 19 /** 3.遍歷當前緩存的sharedList, 如果當前狀態為未使用,則通過CAS修改為已使用*/ 20 for (T bagEntry : sharedList) { 21 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 22 /** 4.如果當前等待線程不止1個,則給監聽中添加一個任務 */ 23 if (waiting > 1) { 24 listener.addBagItem(waiting - 1); 25 } 26 return bagEntry; 27 } 28 } 29 30 /** 4.如果當前緩存的sharedList為空或者都在使用中,那么給listener添加一個任務*/ 31 listener.addBagItem(waiting); 32 33 timeout = timeUnit.toNanos(timeout); 34 do { 35 final long start = currentTime(); 36 /** 5.從阻塞隊列中等待超時獲取元素 */ 37 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); 38 /** 6.如果獲取元素失敗或者獲取元素且使用成功則均返回 */ 39 if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 40 return bagEntry; 41 } 42 43 timeout -= elapsedNanos(start); 44 } while (timeout > 10_000); 45 46 return null; 47 } 48 finally { 49 /** 6.等待線程數自減1 */ 50 waiters.decrementAndGet(); 51 } 52 }
從源碼中可以發現方法中共有三個地方出現了return bagEntry,所以可以看出ConcurrentBag借出元素的地方是有三個來源的
第一步:從ThreadLocal中獲取
每個線程從ConcurrentBag中借出連接時都會創建一個ThreadLocal對象,值為一個List,默認大小為16,每次客戶端從ConcurrentBag中獲取連接時都會優先從ThreadLocal中嘗試獲取連接,獲取失敗才會走下一步獲取。
當客戶端將連接歸還給ConcurrentBag時,首先判斷當前是否有其他客戶端等待連接,如果有其他客戶端等待那么就將連接給其他客戶端,如果沒有客戶端等待那么將連接存入ThreadLocal中,每個ThreadLocal最多會存儲50個連接
Tips:使用ThreadLocal可能會存在內存泄露的風險,所以ConcurrentBag內部有一個屬性為booleal weakThreadLocals,當值為true時則ThreadLocal中的引用均是弱引用,在內存不足時GC的時候會被回收,避免了出現內存泄露的問題。
第二步:從sharedList中獲取
從ThreadLocal中獲取連接失敗之后,會再次嘗試從sharedList中獲取,sharedList集合存在初始化的PoolEntry。在ConcurrentBag初始化的,會初始化指定數量的PoolEntry對象存入sharedList,源碼如下:
ConcurrentBag構造函數如下:
1 /** ConcurrentHand 2 * IBagStateListener bag狀態監聽器,HikariPool實現了IBagStateListener接口 3 * 所以構造器傳入的listener實際就是HikariPool對象 4 * */ 5 public ConcurrentBag(final IBagStateListener listener) 6 { 7 this.listener = listener; 8 //是否使用弱引用 9 this.weakThreadLocals = useWeakThreadLocals(); 10 //初始化阻塞隊列 11 this.handoffQueue = new SynchronousQueue<>(true); 12 //初始化等待連接數 13 this.waiters = new AtomicInteger(); 14 //初始化sharedList 15 this.sharedList = new CopyOnWriteArrayList<>(); 16 if (weakThreadLocals) { 17 this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16)); 18 } 19 else { 20 this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16)); 21 } 22 }
HikariPool內部屬性包含了ConcurrentBag對象,在HikariPool初始化時會創建ConcurrentBag對象,所以ConcurrentBag的構造函數是在HikariPool初始化時調用,HikariPool構造函數如下:
1 public HikariPool(final HikariConfig config) 2 { 3 super(config); 4 5 //初始化ConcurrentBag對象 6 this.connectionBag = new ConcurrentBag<>(this); 7 //創建SuspendResumeLock對象 8 this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; 9 /** 初始化線程池,houseKeeping可以理解為保持空間充足的意思,空間也就是連接池,該線程池的作用就是保持連接池中合適的連接數的作用 */ 10 this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); 11 12 /** 設置屬性*/ 13 checkFailFast(); 14 15 if (config.getMetricsTrackerFactory() != null) { 16 setMetricsTrackerFactory(config.getMetricsTrackerFactory()); 17 } 18 else { 19 setMetricRegistry(config.getMetricRegistry()); 20 } 21 22 setHealthCheckRegistry(config.getHealthCheckRegistry()); 23 24 handleMBeans(this, true); 25 26 ThreadFactory threadFactory = config.getThreadFactory(); 27 /** 根據配置的最大連接數,創建鏈表類型阻塞隊列 */ 28 LinkedBlockingQueue<Runnable> addQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize()); 29 this.addConnectionQueue = unmodifiableCollection(addQueue); 30 /** 初始化創建連接線程池*/ 31 this.addConnectionExecutor = createThreadPoolExecutor(addQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); 32 /** 初始化關閉連接線程池*/ 33 this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); 34 35 this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); 36 /** 創建保持連接池連接數量的任務*/ 37 this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS); 38 39 if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { 40 addConnectionExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); 41 addConnectionExecutor.setMaximumPoolSize(Runtime.getRuntime().availableProcessors()); 42 43 final long startTime = currentTime(); 44 while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) { 45 quietlySleep(MILLISECONDS.toMillis(100)); 46 } 47 48 addConnectionExecutor.setCorePoolSize(1); 49 addConnectionExecutor.setMaximumPoolSize(1); 50 } 51 }
這里有一個定時任務houseKeeperTask,該定時任務的作用是定時檢測連接池中連接的數量,執行的內容就是HouseKeep的run方法,邏輯如下:
1 private final class HouseKeeper implements Runnable 2 { 3 private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs); 4 5 @Override 6 public void run() 7 { 8 try { 9 /** 讀取連接池配置 */ 10 connectionTimeout = config.getConnectionTimeout(); 11 validationTimeout = config.getValidationTimeout(); 12 leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); 13 catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog; 14 15 final long idleTimeout = config.getIdleTimeout(); 16 final long now = currentTime(); 17 18 // Detect retrograde time, allowing +128ms as per NTP spec. 19 if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) { 20 logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.", 21 poolName, elapsedDisplayString(previous, now)); 22 previous = now; 23 /** 關閉連接池中需要被丟棄的連接 */ 24 softEvictConnections(); 25 return; 26 } 27 else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) { 28 // No point evicting for forward clock motion, this merely accelerates connection retirement anyway 29 logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now)); 30 } 31 32 previous = now; 33 34 String afterPrefix = "Pool "; 35 if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) { 36 logPoolState("Before cleanup "); 37 afterPrefix = "After cleanup "; 38 39 /** 獲取當前連接池中已經不是使用中的連接集合 */ 40 final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE); 41 int toRemove = notInUse.size() - config.getMinimumIdle(); 42 for (PoolEntry entry : notInUse) { 43 /** 當前空閑的連接如果超過最大空閑時間idleTimeout則關閉空閑連接 */ 44 if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) { 45 closeConnection(entry, "(connection has passed idleTimeout)"); 46 toRemove--; 47 } 48 } 49 } 50 51 logPoolState(afterPrefix); 52 /** 填充連接池,保持連接池數量至少保持minimum個連接數量 */ 53 fillPool(); // Try to maintain minimum connections 54 } 55 catch (Exception e) { 56 logger.error("Unexpected exception in housekeeping task", e); 57 } 58 } 59 }
該定時任務主要是為了維護連接池中連接的數量,首先需要將被標記為需要丟棄的連接進行關閉,然后將空閑超時的連接進行關閉,最后當連接池中的連接少於最小值時就需要對連接池進行補充連接的操作。所以在初始化連接池時,初始化連接的操作就是在fillPool方法中實現的。fillPool方法源碼如下:
1 /** 填充連接池 */ 2 private synchronized void fillPool() 3 { 4 /** 5 * 計算需要添加的連接數量 6 * config.getMaximumPoolSize - getTotalConnections() 表示連接池最大值-當前連接的數量=最多還可以創建的連接數 7 * config.getMinimumIdle() - getIdleConnections() 表示連接池最小值 - 當前空閑的連接數= 當前可以連接數 8 * Math.min計算得到最少需要的連接數 - addConnectionQueue.size() = 還需要創建連接的任務數量 9 * */ 10 final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) 11 - addConnectionQueue.size(); 12 for (int i = 0; i < connectionsToAdd; i++) { 13 /** 向創建連接線程池中提交創建連接的任務 */ 14 addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator); 15 } 16 }
先計算需要創建的連接數量,向創建連接的線程池中提交任務 poolEntryCreator,創建最后一個任務時創建的是postFillPoolEntryCreator, 兩者沒有本質的區別,只是打印的日志不一樣而已.
PoolEntryCreator創建PoolEntry對象的邏輯如下:
1 /** 創建PoolEntry對象線程 */ 2 private final class PoolEntryCreator implements Callable<Boolean> { 3 /** 4 * 日志前綴 5 */ 6 private final String loggingPrefix; 7 8 PoolEntryCreator(String loggingPrefix) { 9 this.loggingPrefix = loggingPrefix; 10 } 11 12 @Override 13 public Boolean call() { 14 long sleepBackoff = 250L; 15 /** 1.當前連接池狀態正常並且需求創建連接時 */ 16 while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) { 17 /** 2.創建PoolEntry對象 */ 18 final PoolEntry poolEntry = createPoolEntry(); 19 if (poolEntry != null) { 20 /** 3.將PoolEntry對象添加到ConcurrentBag對象中的sharedList中 */ 21 connectionBag.add(poolEntry); 22 logger.debug("{} - Added connection {}", poolName, poolEntry.connection); 23 if (loggingPrefix != null) { 24 logPoolState(loggingPrefix); 25 } 26 return Boolean.TRUE; 27 } 28 /** 睡眠指定時間*/ 29 quietlySleep(sleepBackoff); 30 sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5))); 31 } 32 // Pool is suspended or shutdown or at max size 33 return Boolean.FALSE; 34 } 35 }
createPoolEntry方法邏輯如下:
1 /** 創建PoolEntry對象 */ 2 private PoolEntry createPoolEntry() 3 { 4 try { 5 /** 1.初始化PoolEntry對象,會先創建Connection對象傳入PoolEntry的構造函數中 */ 6 final PoolEntry poolEntry = newPoolEntry(); 7 /** 2.獲取連接最大生命周期時長 */ 8 final long maxLifetime = config.getMaxLifetime(); 9 if (maxLifetime > 0) { 10 /** 3.獲取一個隨機值,防止PoolEntry同時創建同時被銷毀,添加隨機值錯開時間差 */ 11 final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; 12 final long lifetime = maxLifetime - variance; 13 /** 4.給PoolEntry添加定時任務,當PoolEntry對象達到最大生命周期時間后觸發定時任務將連接標記為被拋棄 */ 14 poolEntry.setFutureEol(houseKeepingExecutorService.schedule( 15 () -> { 16 /** 5.達到最大生命周期,拋棄連接 */ 17 if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) { 18 /** 6.丟棄一個連接之后,調用addBagItem補充新的PoolEntry對象 */ 19 addBagItem(connectionBag.getWaitingThreadCount()); 20 } 21 }, 22 lifetime, MILLISECONDS)); 23 } 24 25 return poolEntry; 26 } 27 /** 異常捕獲*/ 28 catch (ConnectionSetupException e) { 29 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 30 logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause()); 31 lastConnectionFailure.set(e); 32 } 33 return null; 34 } 35 catch (SQLException e) { 36 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 37 logger.debug("{} - Cannot acquire connection from data source", poolName, e); 38 lastConnectionFailure.set(new ConnectionSetupException(e)); 39 } 40 return null; 41 } 42 catch (Exception e) { 43 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 44 logger.error("{} - Error thrown while acquiring connection from data source", poolName, e); 45 lastConnectionFailure.set(new ConnectionSetupException(e)); 46 } 47 return null; 48 } 49 }
首先創建一個新的PoolEntry對象,PoolEntry構造時會創建Connection對象,另外如果連接設置了最大生命周期時長,那么需要給每個PoolEntry添加定時任務,為了防止多個PoolEntry同時創建同時被關閉,所以每個PoolEntry的最大生命周期時間都不一樣。當PoolEntry達到最大生命周期后會觸發softEvictConnection方法,將PoolEntry標記為需要被丟棄,另外由於拋棄了PoolEntry對象,所以需要重新調用addBagItem方法對PoolEntry對象進行補充。
第三步:通過IBagStateListener創建新的元素
由於第二步可知,IBagStateListener主要有一個addBagItem方法,HikariPool實現了addBagItem方法,方法源碼如下:
1 public void addBagItem(final int waiting) 2 { 3 /** 判斷是否需要創建連接 */ 4 final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional. 5 if (shouldAdd) { 6 /** 向創建連接線程池中提交創建連接的任務 */ 7 addConnectionExecutor.submit(poolEntryCreator); 8 } 9 }
總結:
從ConcurrentBag中獲取連接一共分成三步,首先從當前線程的ThreadLocal中獲取,如果有直接返回一個連接,如果ThreadLocal中沒有則從sharedList中獲取,sharedList可以理解為ConcurrentBag緩存的連接池,每當創建了一個PoolEntry對象之后都會添加到sharedList中去,如果sharedList中的連接狀態都不是可用狀態,此時就需要通過IBagStateListener提交一個創建連接的任務,交給創建連接的線程池去執行,創建新的連接。
新的連接創建成功之后會將PoolEntry對象添加到無容量的阻塞隊列handoffQueue中,等待連接的線程不斷嘗試從handoffQueue隊列中獲取連接直到成功獲取或者超時返回。
2.2、釋放連接
當客戶端釋放連接時會調用collection的close方法,Hikari中的Connection使用的是代理連接ProxyConnection對象,調用close方法時會調用關聯的PoolEntry對象的回收方法recycle方法,PoolEntry的recycle方法源碼如下:
1 void recycle(final long lastAccessed) 2 { 3 if (connection != null) { 4 this.lastAccessed = lastAccessed; 5 /** 調用HikariPool的recycle方法,回收當前PoolEntry對象 */ 6 hikariPool.recycle(this); 7 } 8 }
1 void recycle(final PoolEntry poolEntry) 2 { 3 metricsTracker.recordConnectionUsage(poolEntry); 4 /** 調用ConcurrentBag的回收方法 */ 5 connectionBag.requite(poolEntry); 6 }
1 /** 回收元素方法 */ 2 public void requite(final T bagEntry) 3 { 4 /** 1.設置狀態為未使用 */ 5 bagEntry.setState(STATE_NOT_IN_USE); 6 7 /** 2.如果當前存在等待線程,則優先將元素給等待線程 */ 8 for (int i = 0; waiters.get() > 0; i++) { 9 /** 2.1.將元素添加到無界阻塞隊列中,等待其他線程獲取 */ 10 if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { 11 return; 12 } 13 else if ((i & 0xff) == 0xff) { 14 parkNanos(MICROSECONDS.toNanos(10)); 15 } 16 else { 17 /** 當前線程不再繼續執行 */ 18 yield(); 19 } 20 } 21 /** 3.如果當前連接沒有被其他線程使用,則添加到當前線程的ThreadLocal中 */ 22 final List<Object> threadLocalList = threadList.get(); 23 if (threadLocalList.size() < 50) { 24 threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); 25 } 26 }
回收連接最終會調用ConcurrentBag的requite方法,方法邏輯不復雜,首先將PoolEntry元素狀態設置為未使用,然后判斷當前是否存在等待連接的線程,如果存在則將連接加入到無界阻塞隊列中去,由等待連接的線程從阻塞隊列中去獲取;
如果當前沒有等待連接的線程,則將連接添加到本地線程變量ThreadLocal中,等待當前線程下次獲取連接時直接從ThreadLocal中獲取。
三、Hikari連接池高性能的原因?
1、采用自定義的FastList替代了ArrayList,FastList的get方法去除了范圍檢查rangeCheck邏輯,並且remove方法是從尾部開始掃描的,而並不是從頭部開始掃描的。因為Connection的打開和關閉順序通常是相反的
2、初始化時創建了兩個HikariPool對象,一個采用final類型定義,避免在獲取連接時才初始化,因為獲取連接時才初始化就需要做同步處理
3、Hikari創建連接是通過javassist動態字節碼生成技術創建的,性能更好
4、從連接池中獲取連接時對於同一個線程在threadLocal中添加了緩存,同一線程獲取連接時沒有並發操作
5、Hikari最大的特點是在高並發的情況下盡量的減少鎖競爭