一、Druid的使用
1.1、Springboot項目集成druid
1.1.1、配置maven
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.15</version
</dependency>
1.1.2、添加數據源相關配置
1 spring: 2 datasource:
druid: 3 url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&zeroDataTimeBehavior=convertToNull&useSSL=false 4 username: root 5 password: root 6 type: com.alibaba.druid.pool.DruidDataSource 7 initialSize: 5 8 maxInactive: 10 9 minIdle: 5 10 timeBetweenEvictionRunsMillis: 5000 11 minEvictableIdleTimeMillis: 10000 12 filters: stat,wall 13 testOnBorrow: false
1.1.3、定義DruidConfig配置文件
1 package com.lucky.test.config; 2 3 import com.alibaba.druid.pool.DruidDataSource; 4 import org.springframework.boot.context.properties.ConfigurationProperties; 5 import org.springframework.context.annotation.Configuration; 6 7 import javax.sql.DataSource; 8 9 /** 10 * @Auther: Lucky 11 * @Date: 2020/12/14 下午8:16 12 * @Desc: 13 */ 14 @Configuration 15 public class DruidConfig { 16 17 @ConfigurationProperties(prefix = "spring.datasource") 18 public DataSource druidDataSource(){ 19 return new DruidDataSource(); 20 } 21 }
定義了DruidDataSource數據源的bean之后,項目中使用的就是數據源就是DruidDataSource了
1.2、Druid數據源的配置
配置項 | 案例值 | 描述 |
url | jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&zeroDataTimeBehavior=convertToNull&useSSL=false |
數據庫連接地址 |
username
|
root | 數據庫連接用戶名 |
password | 123456 | 數據庫連接用戶密碼 |
initialSize | 10 | 連接池初始化連接數 |
minIdle | 1 | 連接池最小連接數 |
maxActive | 20 | 連接池最大活躍連接數 |
maxWait | 60000 | 客戶端獲取連接等待超時時間,單位為毫秒,此處的超時時間和創建連接超時時間是不一樣的,客戶端獲取連接超時有可能是創建連接超時,也有可能是當前連接數達到最大值並且其他客戶端正在使用,客戶端一直排隊等待可用連接超時了,所以盡量避免慢SQL,否則一旦可用連接被占用了且都在執行慢SQL,就會導致其他客戶端長時間無法獲取連接而超時 |
timeBetweenEvictionRunsMillis | 60000 | 連接空閑檢測間隔時長,單位為毫秒,當連接長時間空閑時,有定時任務會間隔間隔一段時間檢測一次,如果發現連接空閑時間足夠長,則關閉連接 |
minEvictableIdleTimeMillis | 60000 | 連接最小生成時間,雖然空閑連接會被關閉,但是並非所有空閑的連接都會關閉,而是要看連接空閑了多長時間,比如配置了60000毫秒,那么當連接空閑超過1分鍾時才會被關閉,否則可以繼續空閑等待客戶端 |
validationQuery | select 'X' | 檢測SQL |
testwhileIdle | false | 空閑的時候檢測執行validtionQuery嚴重連接是否有效,開啟會消耗性能 |
testOnReturn | false | 歸還連接時執行validationQuery驗證連接是否有效,開啟會消耗性能 |
poolPreparedStatements | true | 是否開啟Prepared緩存,開啟會提高重復查詢的效率,但是會消耗一定的內存 |
maxOpenPreparedStatements | 20 | 每個Connection的prepared緩存語句數量 |
二、Druid源碼解析
連接池的主要作用是提供連接給應用程序,所以需要實現數據源DataSource接口,Druid提供的數據源為DruidDataSource實現了DataSource接口,核心邏輯實際就是實現了DataSource接口的getConnection方法,在分析getConnecction方法實現邏輯之前,首先需要了解DruidDataSource的主要屬性,分別如下:
1. /** 初始化連接數,默認為0 */ 2 protected volatile int initialSize = DEFAULT_INITIAL_SIZE; 3 /** 最大連接數,默認是8 */ 4 protected volatile int maxActive = DEFAULT_MAX_ACTIVE_SIZE; 5 /** 最小空閑連接數,默認是0 */ 6 protected volatile int minIdle = DEFAULT_MIN_IDLE; 7 /** 最大空閑連接數,默認數8 */ 8 protected volatile int maxIdle = DEFAULT_MAX_IDLE; 9 /** 最大等待超時時間, 默認為-1,表示不會超時 */ 10 protected volatile long maxWait = DEFAULT_MAX_WAIT;
DruidDataSource的getConnection方法邏輯如下:
1 /** 獲取數據庫連接*/ 2 public DruidPooledConnection getConnection() throws SQLException { 3 return getConnection(maxWait); 4 } 5 6 public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { 7 /** 初始化*/ 8 init(); 9 /** 初始化過濾器*/ 10 if (filters.size() > 0) { 11 FilterChainImpl filterChain = new FilterChainImpl(this); 12 return filterChain.dataSource_connect(this, maxWaitMillis); 13 } else { 14 /** 直接獲取連接*/ 15 return getConnectionDirect(maxWaitMillis); 16 } 17 }
獲取數據庫連接時首先需要對連接池進行初始化,然后才能從連接池中獲取連接,分別對應了方法init方法和getConnectionDirect方法,兩個方法邏輯分別如下
2.1、連接池的初始化
init方法邏輯如下:
1 /** 連接池初始化 */ 2 public void init() throws SQLException { 3 /** 如果已經初始化直接返回 */ 4 if (inited) { 5 return; 6 } 7 8 final ReentrantLock lock = this.lock; 9 try { 10 /*** 加鎖處理 */ 11 lock.lockInterruptibly(); 12 } catch (InterruptedException e) { 13 throw new SQLException("interrupt", e); 14 } 15 try { 16 /** 1.創建數據源ID */ 17 this.id = DruidDriver.createDataSourceId(); 18 19 /** 2.初始化過濾器 */ 20 for (Filter filter : filters) { 21 filter.init(this); 22 } 23 24 /** 25 * 3.maxActive、maxActive、minIdle、initialSize等參數校驗以及JDBC等對象初始化 26 * */ 27 28 /** 4.初始化連接數組,數組大小為最大連接數*/ 29 connections = new DruidConnectionHolder[maxActive]; 30 31 SQLException connectError = null; 32 33 /** 5.根據初始化大小,初始化數據庫連接*/ 34 for (int i = 0, size = getInitialSize(); i < size; ++i) { 35 //1.創建連接 36 Connection conn = createPhysicalConnection(); 37 //2.將連接封裝成DruidConnectionHolder對象 38 DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); 39 //3.將連接添加到連接數組中 40 connections[poolingCount] = holder; 41 incrementPoolingCount();//連接池中連接數自增+1 42 } 43 44 /** 創建並開啟日志線程 */ 45 createAndLogThread(); 46 /** 創建並開啟創建連接線程*/ 47 createAndStartCreatorThread(); 48 /** 創建並開啟銷毀連接線程*/ 49 createAndStartDestroyThread(); 50 /** 等待 創建連接線程 和 銷毀連接線程 全部開啟才算初始化完成 */ 51 initedLatch.await(); 52 53 }finally { 54 /** 標記已經初始化*/ 55 inited = true; 56 /** 釋放鎖*/ 57 lock.unlock(); 58 } 59 }
連接池初始化的邏輯主要如下:
1、判斷是否已經初始化,如果已經初始化直接跳出;如果沒有初始化則繼續初始化
2、防止並發初始化需要加鎖處理
3、初始化過濾器並進行初始化參數校驗
4、初始化連接數組,並根據配置的初始化大小創建指定數量的連接存入數組中,初始化的連接數就是傳入的參數值initialSIze的值
5、創建並開啟創建連接和銷毀連接的線程
6、標記初始化完成並釋放鎖
連接池初始化時會創建執指定數量的連接,並存入數組中。但是通常情況下連接池中的連接數量不是固定不變的,通常需要隨着並發量提高要增加,隨着並發量小而減少。所以在初始化的時候分別創建了創建連接的線程和銷毀連接的線程,用於動態的創建連接和銷毀連接,從而達到連接池動態的增刪連接的效果。
2.1.1、創建連接的線程createAndStartCreatorThread方法源碼解析
1 protected void createAndStartCreatorThread() { 2 if (createScheduler == null) { 3 String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); 4 createConnectionThread = new CreateConnectionThread(threadName); 5 createConnectionThread.start(); 6 return; 7 } 8 9 initedLatch.countDown(); 10 }
該方法的主要作用就是創建了一個創建連接的線程CreateConnectionThread對象,並且啟動了線程,所以核心邏輯就是需要分析該線程主要的流程,邏輯如下:
1 /** 創建連接線程*/ 2 public class CreateConnectionThread extends Thread { 3 4 public CreateConnectionThread(String name){ 5 super(name); 6 this.setDaemon(true); 7 } 8 9 public void run() { 10 initedLatch.countDown(); 11 for (;;) { 12 try { 13 lock.lockInterruptibly(); 14 } catch (InterruptedException e2) { 15 break; 16 } 17 18 try { 19 /** 20 * poolingCount:連接池中的空閑連接數量 21 * notEmptyWaitThreadCount:等待連接的線程數量 22 * 當連接足夠時,睡眠線程 23 * */ 24 // 必須存在線程等待,才創建連接 25 if (poolingCount >= notEmptyWaitThreadCount) { 26 empty.await(); 27 } 28 29 // 防止創建超過maxActive數量的連接 30 /** 31 * activeCount: 活躍的連接數 32 * maxActive: 最大線程數 33 * 當活躍連接數 + 空閑連接數 >= 最大連接數時 睡眠線程 34 * */ 35 if (activeCount + poolingCount >= maxActive) { 36 empty.await(); 37 continue; 38 } 39 40 } catch (InterruptedException e) { 41 break; 42 } finally { 43 lock.unlock(); 44 } 45 46 /** 47 * 當等待連接的線程超過空閑線程;並且總連接數沒有超過最大連接數時,創建新連接 48 * */ 49 Connection connection = null; 50 51 try { 52 /** 1.創建新連接 */ 53 connection = createPhysicalConnection(); 54 } catch (SQLException e) { 55 LOG.error("create connection error", e); 56 break; 57 } 58 59 if (connection == null) { 60 continue; 61 } 62 /** 2.將連接放入連接池中 */ 63 put(connection); 64 } 65 } 66 }
邏輯並不復雜,就是在一個死循環中不斷判斷當前連接數是否夠用,並且是否超過最大上限,如果滿足條件就創建新的連接,並且將連接添加到連接池中。
在這里有一個Condition對象empty,該對象主要用於監視當前的連接池是否需要創建連接了,如果不需要創建連接則調用await進行等待,等待連接不足時進行喚醒。
當連接創建之后,會調用put方法將連接放到連接數組中,邏輯如下:
1 protected void put(Connection connection) { 2 DruidConnectionHolder holder = null; 3 try { 4 /** 1.將連接封裝成功DruidConnectionHolder對象 */ 5 holder = new DruidConnectionHolder(DruidDataSource.this, connection); 6 } catch (SQLException ex) { 7 lock.lock(); 8 try { 9 /** 10 * createScheduler 是創建連接的線程池 11 * createTaskCount 是當前需要創建連接的任務個數 12 * 當線程池不為空時,任務個數減1 13 * */ 14 if (createScheduler != null) { 15 createTaskCount--; 16 } 17 } finally { 18 lock.unlock(); 19 } 20 LOG.error("create connection holder error", ex); 21 return; 22 } 23 24 lock.lock(); 25 try { 26 /** 2.存入連接池數組中 */ 27 connections[poolingCount] = holder; 28 /** 3.空閑連接數poolingCount自增*/ 29 incrementPoolingCount(); 30 if (poolingCount > poolingPeak) { 31 /** 4.超過峰值則記錄連接數量的峰值 */ 32 poolingPeak = poolingCount; 33 poolingPeakTime = System.currentTimeMillis(); 34 } 35 /** 3.喚醒notEmpty,因為該連接是非初始化創建而是動態額外添加的,所以需要喚醒銷毀線程准備銷毀該連接 */ 36 notEmpty.signal(); 37 notEmptySignalCount++; 38 39 if (createScheduler != null) { 40 createTaskCount--; 41 42 if (poolingCount + createTaskCount < notEmptyWaitThreadCount // 43 && activeCount + poolingCount + createTaskCount < maxActive) { 44 /** 如果連接數還是不足,則繼續喚醒empty */ 45 emptySignal(); 46 } 47 } 48 } finally { 49 lock.unlock(); 50 } 51 } 52 53 /** 喚醒empty */ 54 private void emptySignal() { 55 if (createScheduler == null) { 56 empty.signal(); 57 return; 58 } 59 60 if (createTaskCount >= maxCreateTaskCount) { 61 return; 62 } 63 64 if (activeCount + poolingCount + createTaskCount >= maxActive) { 65 return; 66 } 67 68 createTaskCount++; 69 CreateConnectionTask task = new CreateConnectionTask();//創建連接的Task邏輯和CreateConneactionThread線程的邏輯完全一致 70 createScheduler.submit(task); 71 }
2.2.1.2、銷毀連接的線程createAndStartDestroyThread方法源碼解析
銷毀連接的線程方法邏輯基本和創建連接的邏輯相反,主要邏輯如下:
1 /** 銷毀連接線程 */ 2 public class DestroyConnectionThread extends Thread { 3 4 public DestroyConnectionThread(String name){ 5 super(name); 6 this.setDaemon(true); 7 } 8 9 public void run() { 10 initedLatch.countDown(); 11 12 for (;;) { 13 // 從前面開始刪除 14 try { 15 if (closed) { 16 break; 17 } 18 /** 如果設置了檢查間隔,則睡眠線程指定時長,否則就默認睡眠1秒*/ 19 if (timeBetweenEvictionRunsMillis > 0) { 20 Thread.sleep(timeBetweenEvictionRunsMillis); 21 } else { 22 Thread.sleep(1000); // 23 } 24 25 if (Thread.interrupted()) { 26 break; 27 } 28 /** 執行銷毀連接的任務 */ 29 destoryTask.run(); 30 } catch (InterruptedException e) { 31 break; 32 } 33 } 34 } 35 36 }
銷毀連接的任務交給了DestroyTask來實現,邏輯如下:
1 /** 銷毀連接任務*/ 2 public class DestroyTask implements Runnable { 3 4 @Override 5 public void run() { 6 /** 1.銷毀超過最大空閑時間的連接 */ 7 shrink(true); 8 9 /** 2.強制回收超過超時時間的連接*/ 10 if (isRemoveAbandoned()) { 11 removeAbandoned(); 12 } 13 } 14 }
銷毀連接的任務主要有兩個核心邏輯:
1、銷毀空閑連接
當一個連接長時間沒有被使用,如果不及時清理就會造成資源浪費,所以需要定時檢查空閑時間過長的連接進行斷開連接銷毀
2、回收超時連接
當一個連接被一個線程長時間占有沒有被歸還,有可能是程序出故障了或是有漏洞導致吃吃沒有歸還連接,這樣就可能會導致連接池中的連接不夠用,所以需要定時檢查霸占連接時間過長的線程,如果超過規定時間沒有歸還連接,則強制回收該連接。
銷毀空閑連接邏輯如下:
1 /** 連接空閑時間,默認為30分鍾 */ 2 public static final long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 1000L * 60L * 30L; 3 4 /** 銷毀空閑連接 */ 5 public void shrink(boolean checkTime) { 6 /** 1.需要從連接池中去除的連接列表 */ 7 final List<DruidConnectionHolder> evictList = new ArrayList<DruidConnectionHolder>(); 8 try { 9 lock.lockInterruptibly(); 10 } catch (InterruptedException e) { 11 return; 12 } 13 14 try { 15 /** 2.獲取需要去除的個數 */ 16 final int checkCount = poolingCount - minIdle; 17 final long currentTimeMillis = System.currentTimeMillis(); 18 for (int i = 0; i < checkCount; ++i) { 19 DruidConnectionHolder connection = connections[i]; 20 /** 是否校驗連接的空閑時間*/ 21 if (checkTime) { 22 long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis(); 23 /** 3.1.如果連接空閑時間超過設置的值,則去除*/ 24 if (idleMillis >= minEvictableIdleTimeMillis) { 25 evictList.add(connection); 26 } else { 27 break; 28 } 29 } else { 30 /** 3.2.如果不校驗時間,則按順序去除*/ 31 evictList.add(connection); 32 } 33 } 34 35 int removeCount = evictList.size(); 36 /** 4.從數組中將多余的連接移除*/ 37 if (removeCount > 0) { 38 System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); 39 Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); 40 poolingCount -= removeCount; 41 } 42 } finally { 43 lock.unlock(); 44 } 45 46 /** 5.依次斷開被移除的連接 */ 47 for (DruidConnectionHolder item : evictList) { 48 Connection connection = item.getConnection(); 49 JdbcUtils.close(connection); 50 destroyCount.incrementAndGet(); 51 } 52 }
回收超時連接邏輯如下:
1 /** 強制回收連接 */ 2 public int removeAbandoned() { 3 int removeCount = 0; 4 5 long currrentNanos = System.nanoTime(); 6 7 /** 1.定義需要回收的連接列表*/ 8 List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>(); 9 10 synchronized (activeConnections) { 11 Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator(); 12 13 for (; iter.hasNext();) { 14 DruidPooledConnection pooledConnection = iter.next(); 15 if (pooledConnection.isRunning()) { 16 continue; 17 } 18 long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); 19 /** 2.遍歷判斷超時未回收的連接,並加入列表中 */ 20 if (timeMillis >= removeAbandonedTimeoutMillis) { 21 iter.remove(); 22 pooledConnection.setTraceEnable(false); 23 abandonedList.add(pooledConnection); 24 } 25 } 26 } 27 28 if (abandonedList.size() > 0) { 29 /** 3.遍歷回收連接列表,進行連接回收*/ 30 for (DruidPooledConnection pooledConnection : abandonedList) { 31 synchronized (pooledConnection) { 32 if (pooledConnection.isDisable()) { 33 continue; 34 } 35 } 36 /** 3.1.強制斷開連接*/ 37 JdbcUtils.close(pooledConnection); 38 pooledConnection.abandond(); 39 removeAbandonedCount++; 40 removeCount++; 41 42 /** 3.2.日志打印*/ 43 if (isLogAbandoned()) { 44 StringBuilder buf = new StringBuilder(); 45 buf.append("abandon connection, owner thread: "); 46 buf.append(pooledConnection.getOwnerThread().getName()); 47 buf.append(", connected time nano: "); 48 buf.append(pooledConnection.getConnectedTimeNano()); 49 buf.append(", open stackTrace\n"); 50 51 StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); 52 for (int i = 0; i < trace.length; i++) { 53 buf.append("\tat "); 54 buf.append(trace[i].toString()); 55 buf.append("\n"); 56 } 57 58 LOG.error(buf.toString()); 59 } 60 } 61 } 62 return removeCount; 63 }
2.2、連接池中獲取連接
1 /** 從連接池中獲取連接 */ 2 public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { 3 int notFullTimeoutRetryCnt = 0; 4 for (;;) { 5 // handle notFullTimeoutRetry 6 DruidPooledConnection poolableConnection; 7 try { 8 /** 1.獲取連接 */ 9 poolableConnection = getConnectionInternal(maxWaitMillis); 10 } catch (GetConnectionTimeoutException ex) { 11 if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { 12 notFullTimeoutRetryCnt++; 13 if (LOG.isWarnEnabled()) { 14 LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt); 15 } 16 continue; 17 } 18 throw ex; 19 } 20 21 /** 2.判斷獲取的連接是否有效 */ 22 if (isTestOnBorrow()) { 23 boolean validate = testConnectionInternal(poolableConnection.getConnection()); 24 if (!validate) { 25 if (LOG.isDebugEnabled()) { 26 LOG.debug("skip not validate connection."); 27 } 28 /** 2.1 連接無效則拋棄連接 */ 29 Connection realConnection = poolableConnection.getConnection(); 30 discardConnection(realConnection); 31 continue; 32 } 33 } else { 34 Connection realConnection = poolableConnection.getConnection(); 35 if (realConnection.isClosed()) { 36 discardConnection(null); // 傳入null,避免重復關閉 37 continue; 38 } 39 /** 3.如果沒有判斷連接有效性,則判斷該連接是否空閑*/ 40 if (isTestWhileIdle()) { 41 final long currentTimeMillis = System.currentTimeMillis(); 42 final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis(); 43 final long idleMillis = currentTimeMillis - lastActiveTimeMillis; 44 long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis(); 45 if (timeBetweenEvictionRunsMillis <= 0) { 46 timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; 47 } 48 /** 4.如連接空閑時間過長,則強制校驗連接的有效性 */ 49 if (idleMillis >= timeBetweenEvictionRunsMillis) { 50 boolean validate = testConnectionInternal(poolableConnection.getConnection()); 51 if (!validate) { 52 if (LOG.isDebugEnabled()) { 53 LOG.debug("skip not validate connection."); 54 } 55 discardConnection(realConnection); 56 continue; 57 } 58 } 59 } 60 } 61 /** 4.給連接添加監聽,超過時間未歸還,則強制回收該連接*/ 62 if (isRemoveAbandoned()) { 63 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 64 poolableConnection.setConnectStackTrace(stackTrace); 65 poolableConnection.setConnectedTimeNano(); 66 poolableConnection.setTraceEnable(true); 67 68 synchronized (activeConnections) { 69 activeConnections.put(poolableConnection, PRESENT); 70 } 71 } 72 /** 5.設置是否自動提交 */ 73 if (!this.isDefaultAutoCommit()) { 74 poolableConnection.setAutoCommit(false); 75 } 76 return poolableConnection; 77 } 78 }
獲取連接的邏輯步驟不多,首先是從連接池中獲取連接,獲取到連接之后根據配置項判斷是否需要對連接進行有效性檢測,防止獲取到了一個無效的連接。
獲取連接的方法getConnectionInternal方法邏輯如下:
1 /** 獲取連接*/ 2 private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { 3 /** 1.連接池狀態判斷*/ 4 if (closed) { 5 connectErrorCount.incrementAndGet(); 6 throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); 7 } 8 9 if (!enable) { 10 connectErrorCount.incrementAndGet(); 11 throw new DataSourceDisableException(); 12 } 13 14 final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); 15 final int maxWaitThreadCount = getMaxWaitThreadCount(); 16 17 DruidConnectionHolder holder; 18 try { 19 lock.lockInterruptibly(); 20 } catch (InterruptedException e) { 21 connectErrorCount.incrementAndGet(); 22 throw new SQLException("interrupt", e); 23 } 24 25 try { 26 /** 2.判斷等待獲取連接的線程是否超過最大值 */ 27 if (maxWaitThreadCount > 0) { 28 if (notEmptyWaitThreadCount >= maxWaitThreadCount) { 29 connectErrorCount.incrementAndGet(); 30 throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " 31 + lock.getQueueLength()); 32 } 33 } 34 35 connectCount++; 36 if (maxWait > 0) { 37 /** 3.1 如果設置超時時間,則堵塞指定時長獲取連接*/ 38 holder = pollLast(nanos); 39 } else { 40 /** 3.2 如果沒有設置超時時間,則堵塞獲取連接*/ 41 holder = takeLast(); 42 } 43 44 if (holder != null) { 45 activeCount++; 46 if (activeCount > activePeak) { 47 activePeak = activeCount; 48 activePeakTime = System.currentTimeMillis(); 49 } 50 } 51 } catch (InterruptedException e) { 52 connectErrorCount.incrementAndGet(); 53 throw new SQLException(e.getMessage(), e); 54 } catch (SQLException e) { 55 connectErrorCount.incrementAndGet(); 56 throw e; 57 } finally { 58 lock.unlock(); 59 } 60 61 /** 4.如果獲取不到連接,則打印錯誤日志並拋異常 */ 62 if (holder == null) { 63 long waitNanos = waitNanosLocal.get(); 64 65 StringBuilder buf = new StringBuilder(); 66 buf.append("wait millis ")// 67 .append(waitNanos / (1000 * 1000))// 68 .append(", active " + activeCount)// 69 .append(", maxActive " + maxActive)// 70 ; 71 72 List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); 73 for (int i = 0; i < sqlList.size(); ++i) { 74 if (i != 0) { 75 buf.append('\n'); 76 } else { 77 buf.append(", "); 78 } 79 JdbcSqlStatValue sql = sqlList.get(i); 80 buf.append("runningSqlCount "); 81 buf.append(sql.getRunningCount()); 82 buf.append(" : "); 83 buf.append(sql.getSql()); 84 } 85 86 String errorMessage = buf.toString(); 87 88 if (this.createError != null) { 89 throw new GetConnectionTimeoutException(errorMessage, createError); 90 } else { 91 throw new GetConnectionTimeoutException(errorMessage); 92 } 93 } 94 95 holder.incrementUseCount(); 96 /** 5.構造連接對象並返回 */ 97 DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); 98 return poolalbeConnection; 99 }
獲取連接主要看有沒有設置超時時間,如果設置了超時時間則調用pollLast方法進行嘗試獲取連接,超時沒有獲取則返回空;takeLast方法是一直堵塞當前線程直到獲取連接成功才會返回。
1 /** 一直堵塞獲取連接*/ 2 DruidConnectionHolder takeLast() throws InterruptedException, SQLException { 3 try { 4 /** 1.如果當前空閑連接數為0 */ 5 while (poolingCount == 0) { 6 /** 2.發送信號等待創建連接*/ 7 emptySignal(); // send signal to CreateThread create connection 8 notEmptyWaitThreadCount++; 9 if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { 10 notEmptyWaitThreadPeak = notEmptyWaitThreadCount; 11 } 12 try { 13 /** 等待信號*/ 14 notEmpty.await(); // signal by recycle or creator 15 } finally { 16 notEmptyWaitThreadCount--; 17 } 18 notEmptyWaitCount++; 19 20 if (!enable) { 21 connectErrorCount.incrementAndGet(); 22 throw new DataSourceDisableException(); 23 } 24 } 25 } catch (InterruptedException ie) { 26 notEmpty.signal(); // propagate to non-interrupted thread 27 notEmptySignalCount++; 28 throw ie; 29 } 30 31 decrementPoolingCount(); 32 /** 獲取連接池最后一位的連接*/ 33 DruidConnectionHolder last = connections[poolingCount]; 34 /** 將數組對應位置置為空*/ 35 connections[poolingCount] = null; 36 return last; 37 }
1 private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { 2 long estimate = nanos; 3 4 for (;;) { 5 if (poolingCount == 0) { 6 emptySignal(); // send signal to CreateThread create connection 7 8 if (estimate <= 0) { 9 waitNanosLocal.set(nanos - estimate); 10 return null; 11 } 12 13 notEmptyWaitThreadCount++; 14 if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { 15 notEmptyWaitThreadPeak = notEmptyWaitThreadCount; 16 } 17 18 try { 19 long startEstimate = estimate; 20 /** 等待信號指定時長*/ 21 estimate = notEmpty.awaitNanos(estimate); // signal by 22 // recycle or 23 // creator 24 notEmptyWaitCount++; 25 notEmptyWaitNanos += (startEstimate - estimate); 26 27 if (!enable) { 28 connectErrorCount.incrementAndGet(); 29 throw new DataSourceDisableException(); 30 } 31 } catch (InterruptedException ie) { 32 notEmpty.signal(); // propagate to non-interrupted thread 33 notEmptySignalCount++; 34 throw ie; 35 } finally { 36 notEmptyWaitThreadCount--; 37 } 38 39 if (poolingCount == 0) { 40 if (estimate > 0) { 41 continue; 42 } 43 44 waitNanosLocal.set(nanos - estimate); 45 return null; 46 } 47 } 48 49 decrementPoolingCount(); 50 DruidConnectionHolder last = connections[poolingCount]; 51 connections[poolingCount] = null; 52 53 return last; 54 } 55 }
takeLast和pollLast的邏輯基本上一直,主要是看等待連接時是一直等待還是超時等待,一般都會設置超時時間,防止程序一直堵塞着。這里又使用到了emptySignal和notEmptySignal, 后面仔細分析。
2.3、連接歸還到連接池
當程序使用完連接需要將連接歸還到線程池,通過會執行connection.close方法進行關閉,Druid中的連接對象為DruidPooledConnection,close方法中執行了回收的方法recycle(),該方法會將連接回收到連接池中,邏輯如下:
1 public void recycle() throws SQLException { 2 if (this.disable) { 3 return; 4 } 5 6 DruidConnectionHolder holder = this.holder; 7 if (holder == null) { 8 if (dupCloseLogEnable) { 9 LOG.error("dup close"); 10 } 11 return; 12 } 13 14 if (!this.abandoned) { 15 /** 獲取數據源頭像 */ 16 DruidAbstractDataSource dataSource = holder.getDataSource(); 17 /** 執行數據源的recycle方法進行連接回收*/ 18 dataSource.recycle(this); 19 } 20 21 this.holder = null; 22 conn = null; 23 transactionInfo = null; 24 closed = true; 25 }
主要步驟為先獲取該連接所屬的數據源對象,然后直接執行DataSource對象的recycle方法進行連接的回收,DruidDataSource中的recycle方法邏輯如下:
1 /** 2 * 回收連接 3 */ 4 protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { 5 final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); 6 7 if (holder == null) { 8 LOG.warn("connectionHolder is null"); 9 return; 10 } 11 12 final Connection physicalConnection = holder.getConnection(); 13 14 if (pooledConnection.isTraceEnable()) { 15 synchronized (activeConnections) { 16 if (pooledConnection.isTraceEnable()) { 17 Object oldInfo = activeConnections.remove(pooledConnection); 18 if (oldInfo == null) { 19 if (LOG.isWarnEnabled()) { 20 LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); 21 } 22 } 23 pooledConnection.setTraceEnable(false); 24 } 25 } 26 } 27 28 final boolean isAutoCommit = holder.isUnderlyingAutoCommit(); 29 final boolean isReadOnly = holder.isUnderlyingReadOnly(); 30 final boolean testOnReturn = this.isTestOnReturn(); 31 32 try { 33 // check need to rollback? 34 if ((!isAutoCommit) && (!isReadOnly)) { 35 pooledConnection.rollback(); 36 } 37 38 //校驗回收線程和獲取線程是否一致,並對連接持有對象進行重置 39 boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread(); 40 if (!isSameThread) { 41 synchronized (pooledConnection) { 42 holder.reset(); 43 } 44 } else { 45 holder.reset(); 46 } 47 48 if (holder.isDiscard()) { 49 return; 50 } 51 52 /** 校驗連接*/ 53 if (testOnReturn) { 54 boolean validate = testConnectionInternal(physicalConnection); 55 if (!validate) { 56 JdbcUtils.close(physicalConnection); 57 58 destroyCount.incrementAndGet(); 59 60 lock.lock(); 61 try { 62 activeCount--; 63 closeCount++; 64 } finally { 65 lock.unlock(); 66 } 67 return; 68 } 69 } 70 71 if (!enable) { 72 /** 如果連接池不可用則丟棄連接*/ 73 discardConnection(holder.getConnection()); 74 return; 75 } 76 77 final long lastActiveTimeMillis = System.currentTimeMillis(); 78 lock.lockInterruptibly(); 79 try { 80 /** 統計數據修改*/ 81 activeCount--; 82 closeCount++; 83 /** 將連接添加到連接池數組的尾部 */ 84 putLast(holder, lastActiveTimeMillis); 85 recycleCount++; 86 } finally { 87 lock.unlock(); 88 } 89 } catch (Throwable e) { 90 holder.clearStatementCache(); 91 92 if (!holder.isDiscard()) { 93 this.discardConnection(physicalConnection); 94 holder.setDiscard(true); 95 } 96 97 LOG.error("recyle error", e); 98 recycleErrorCount.incrementAndGet(); 99 } 100 }
1 void putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { 2 e.setLastActiveTimeMillis(lastActiveTimeMillis); 3 /** 將連接放到連接池數組的尾部 */ 4 connections[poolingCount] = e; 5 incrementPoolingCount(); 6 7 if (poolingCount > poolingPeak) { 8 poolingPeak = poolingCount; 9 poolingPeakTime = lastActiveTimeMillis; 10 } 11 /** 喚醒notEmpty */ 12 notEmpty.signal(); 13 notEmptySignalCount++; 14 }
三、Druid的實現細節
3.1、核心類
3.1.1、DruidDataSource類
DruidDataSource是Druid提供的數據類,實現了DataSource接口,實現了獲取連接的getConnection方法,用於應用程序使用的數據對象,內部持有連接的數組DruidConnectionHolder[] connections表示連接池
3.1.2、DruidPooledConnection類
DruidPooledConnection表示數據庫連接對象,應用程序獲取DruidPooledConnection執行SQL,使用完畢調用close方法釋放連接
3.1.3、DruidConnectionHolder類型
DruidConnectionHolder是DruidPooledConnection類的封裝,表示連接池中持有的連接對象,連接池添加連接時實際是創建DruidConnectionHolder對象放入數組中,獲取連接就是從數組尾部獲取DruidConnectionHolder對象
3.2、ReentrantLock和Condition的使用
DruidDataSource內部有一個ReentrantLock lock對象和兩個Condition對象,分別為empty和notEmpty,主要用於連接的創建和銷毀的等待和通知。
數據庫連接池初始化的時候會初始化固定數量的連接,但是隨着應用程序的運行,數據庫連接的需求往往是動態變化的,比如初始化時創建了10個連接,但是高峰期的時候需要15個連接才可以滿足需求,此時連接池就需要動態的對連接池進行擴容,而等到高峰期過了之后,數據庫連接池還需要將多余創建的5個連接進行釋放,不然在空閑時間也會占據着連接造成資源的浪費。連接池中連接的動態增刪就是依靠了empty和notEmpty這兩個Condition對象。
empty用於通知創建連接,notEmpty用於通知應用獲取應用
初始化時啟動創建連接的線程,判斷當前是否需要創建連接,如果不需要創建則調用empty.await()方法進行等待,等待empty被喚醒之后進行創建連接,一旦empty被喚醒就會創建連接,創建完成之后通知notEmpty,讓用戶不再阻塞,拿到連接對象。
客戶端調用getConnection方法獲取連接時,如果當前沒有可用連接,則調用empty.signal()方法喚醒empty,並調用notEmpty.await()睡眠等待連接創建完成
3.3、Druid工作流程圖
1、 getConnection方法流程如下:
2、創建連接線程和銷毀連接線程流程如下: