簡介
DBCP
用於創建和管理連接,利用“池”的方式復用連接減少資源開銷,和其他連接池一樣,也具有連接數控制、連接有效性檢測、連接泄露控制、緩存語句等功能。目前,tomcat
自帶的連接池就是DBCP
,Spring開發組也推薦使用DBCP
,阿里的druid
也是參照DBCP
開發出來的。
DBCP
除了我們熟知的使用方式外,還支持通過JNDI
獲取數據源,並支持獲取JTA
或XA
事務中用於2PC
(兩階段提交)的連接對象,本文也將以例子說明。
本文將包含以下內容(因為篇幅較長,可根據需要選擇閱讀):
DBCP
的使用方法(入門案例說明);DBCP
的配置參數詳解;DBCP
主要源碼分析;DBCP
其他特性的使用方法,如JNDI
和JTA
支持。
使用例子
需求
使用DBCP
連接池獲取連接對象,對用戶數據進行簡單的增刪改查。
工程環境
JDK
:1.8.0_201
maven
:3.6.1
IDE
:eclipse 4.12
mysql-connector-java
:8.0.15
mysql
:5.7.28
DBCP
:2.6.0
主要步驟
-
編寫
dbcp.properties
,設置數據庫連接參數和連接池基本參數等。 -
通過
BasicDataSourceFactory
加載dbcp.properties
,並獲得BasicDataDource
對象。 -
通過
BasicDataDource
對象獲取Connection
對象。 -
使用
Connection
對象對用戶表進行增刪改查。
創建項目
項目類型Maven Project,打包方式war(其實jar也可以,之所以使用war是為了測試JNDI
)。
引入依賴
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- dbcp -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
<!-- log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- mysql驅動的jar包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
編寫dbcp.prperties
路徑resources
目錄下,因為是入門例子,這里僅給出數據庫連接參數和連接池基本參數,后面源碼會對配置參數進行詳細說明。另外,數據庫sql
腳本也在該目錄下。
#連接基本屬性
driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true
username=root
password=root
#-------------連接池大小和連接超時參數--------------------------------
#初始化連接數量:連接池啟動時創建的初始化連接數量
#默認為0
initialSize=0
#最大活動連接數量:連接池在同一時間能夠分配的最大活動連接的數量, 如果設置為負數則表示不限制
#默認為8
maxTotal=8
#最大空閑連接:連接池中容許保持空閑狀態的最大連接數量,超過的空閑連接將被釋放,如果設置為負數表示不限制
#默認為8
maxIdle=8
#最小空閑連接:連接池中容許保持空閑狀態的最小連接數量,低於這個數量將創建新的連接,如果設置為0則不創建
#注意:timeBetweenEvictionRunsMillis為正數時,這個參數才能生效。
#默認為0
minIdle=0
#最大等待時間
#當沒有可用連接時,連接池等待連接被歸還的最大時間(以毫秒計數),超過時間則拋出異常,如果設置為<=0表示無限等待
#默認-1
maxWaitMillis=-1
獲取連接池和獲取連接
項目中編寫了JDBCUtils
來初始化連接池、獲取連接和釋放資源等,具體參見項目源碼。
路徑:cn.zzs.dbcp
// 導入配置文件
Properties properties = new Properties();
InputStream in = JDBCUtil.class.getClassLoader().getResourceAsStream("dbcp.properties");
properties.load(in);
// 根據配置文件內容獲得數據源對象
DataSource dataSource = BasicDataSourceFactory.createDataSource(properties);
// 獲得連接
Connection conn = dataSource.getConnection();
編寫測試類
這里以保存用戶為例,路徑test目錄下的cn.zzs.dbcp
。
@Test
public void save() throws SQLException {
// 創建sql
String sql = "insert into demo_user values(null,?,?,?,?,?)";
Connection connection = null;
PreparedStatement statement = null;
try {
// 獲得連接
connection = JDBCUtils.getConnection();
// 開啟事務設置非自動提交
connection.setAutoCommit(false);
// 獲得Statement對象
statement = connection.prepareStatement(sql);
// 設置參數
statement.setString(1, "zzf003");
statement.setInt(2, 18);
statement.setDate(3, new Date(System.currentTimeMillis()));
statement.setDate(4, new Date(System.currentTimeMillis()));
statement.setBoolean(5, false);
// 執行
statement.executeUpdate();
// 提交事務
connection.commit();
} finally {
// 釋放資源
JDBCUtils.release(connection, statement, null);
}
}
配置文件詳解
這部分內容從網上參照過來,同樣的內容發的到處都是,暫時沒找到出處。因為內容太過雜亂,而且最新版本更新了不少內容,所以我花了好大功夫才改好,后面找到出處再補上參考資料吧。
基本連接屬性
注意,這里在url
后面拼接了多個參數用於避免亂碼、時區報錯問題。 補充下,如果不想加入時區的參數,可以在mysql
命令窗口執行如下命令:set global time_zone='+8:00'
。
driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true
username=root
password=root
連接池大小參數
這幾個參數都比較常用,具體設置多少需根據項目調整。
#-------------連接池大小和連接超時參數--------------------------------
#初始化連接數量:連接池啟動時創建的初始化連接數量
#默認為0
initialSize=0
#最大活動連接數量:連接池在同一時間能夠分配的最大活動連接的數量, 如果設置為負數則表示不限制
#默認為8
maxTotal=8
#最大空閑連接:連接池中容許保持空閑狀態的最大連接數量,超過的空閑連接將被釋放,如果設置為負數表示不限制
#默認為8
maxIdle=8
#最小空閑連接:連接池中容許保持空閑狀態的最小連接數量,低於這個數量將創建新的連接,如果設置為0則不創建
#注意:timeBetweenEvictionRunsMillis為正數時,這個參數才能生效。
#默認為0
minIdle=0
#最大等待時間
#當沒有可用連接時,連接池等待連接被歸還的最大時間(以毫秒計數),超過時間則拋出異常,如果設置為<=0表示無限等待
#默認-1
maxWaitMillis=-1
#連接池創建的連接的默認的數據庫名,如果是使用DBCP的XA連接必須設置,不然注冊不了多個資源管理器
#defaultCatalog=github_demo
#連接池創建的連接的默認的schema。如果是mysql,這個設置沒什么用。
#defaultSchema=github_demo
緩存語句
緩存語句在mysql
下建議關閉。
#-------------緩存語句--------------------------------
#是否緩存preparedStatement,也就是PSCache。
#PSCache對支持游標的數據庫性能提升巨大,比如說oracle。在mysql下建議關閉
#默認為false
poolPreparedStatements=false
#緩存PreparedStatements的最大個數
#默認為-1
#注意:poolPreparedStatements為true時,這個參數才有效
maxOpenPreparedStatements=-1
#緩存read-only和auto-commit狀態。設置為true的話,所有連接的狀態都會是一樣的。
#默認是true
cacheState=true
連接檢查參數
針對連接失效和連接泄露的問題,建議開啟testWhileIdle
,而不是開啟testOnReturn
或testOnBorrow
(從性能考慮)。
#-------------連接檢查情況--------------------------------
#通過SQL查詢檢測連接,注意必須返回至少一行記錄
#默認為空。即會調用Connection的isValid和isClosed進行檢測
#注意:如果是oracle數據庫的話,應該改為select 1 from dual
validationQuery=select 1 from dual
#SQL檢驗超時時間
validationQueryTimeout=-1
#是否從池中取出連接前進行檢驗。
#默認為true
testOnBorrow=true
#是否在歸還到池中前進行檢驗
#默認為false
testOnReturn=false
#是否開啟空閑資源回收器。
#默認為false
testWhileIdle=false
#空閑資源的檢測周期(單位為毫秒)。
#默認-1。即空閑資源回收器不工作。
timeBetweenEvictionRunsMillis=-1
#做空閑資源回收器時,每次的采樣數。
#默認3,單位毫秒。如果設置為-1,就是對所有連接做空閑監測。
numTestsPerEvictionRun=3
#資源池中資源最小空閑時間(單位為毫秒),達到此值后將被移除。
#默認值1000*60*30 = 30分鍾
minEvictableIdleTimeMillis=1800000
#資源池中資源最小空閑時間(單位為毫秒),達到此值后將被移除。但是會保證minIdle
#默認值-1
#softMinEvictableIdleTimeMillis=-1
#空閑資源回收策略
#默認org.apache.commons.pool2.impl.DefaultEvictionPolicy
#如果要自定義的話,需要實現EvictionPolicy重寫evict方法
evictionPolicyClassName=org.apache.commons.pool2.impl.DefaultEvictionPolicy
#連接最大存活時間。非正數表示不限制
#默認-1
maxConnLifetimeMillis=-1
#當達到maxConnLifetimeMillis被關閉時,是否打印相關消息
#默認true
#注意:maxConnLifetimeMillis設置為正數時,這個參數才有效
logExpiredConnections=true
事務相關參數
這里的參數主要和事務相關,一般默認就行。
#-------------事務相關的屬性--------------------------------
#連接池創建的連接的默認的auto-commit狀態
#默認為空,由驅動決定
defaultAutoCommit=true
#連接池創建的連接的默認的read-only狀態。
#默認值為空,由驅動決定
defaultReadOnly=false
#連接池創建的連接的默認的TransactionIsolation狀態
#可用值為下列之一:NONE,READ_UNCOMMITTED, READ_COMMITTED, REPEATABLE_READ, SERIALIZABLE
#默認值為空,由驅動決定
defaultTransactionIsolation=REPEATABLE_READ
#歸還連接時是否設置自動提交為true
#默認true
autoCommitOnReturn=true
#歸還連接時是否設置回滾事務
#默認true
rollbackOnReturn=true
連接泄漏回收參數
當我們從連接池獲得了連接對象,但因為疏忽或其他原因沒有close
,這個時候這個連接對象就是一個泄露資源。通過配置以下參數可以回收這部分對象。
#-------------連接泄漏回收參數--------------------------------
#當未使用的時間超過removeAbandonedTimeout時,是否視該連接為泄露連接並刪除(當getConnection()被調用時檢測)
#默認為false
#注意:這個機制在(getNumIdle() < 2) and (getNumActive() > (getMaxActive() - 3))時被觸發
removeAbandonedOnBorrow=false
#當未使用的時間超過removeAbandonedTimeout時,是否視該連接為泄露連接並刪除(空閑evictor檢測)
#默認為false
#注意:當空閑資源回收器開啟才生效
removeAbandonedOnMaintenance=false
#泄露的連接可以被刪除的超時值, 單位秒
#默認為300
removeAbandonedTimeout=300
#標記當Statement或連接被泄露時是否打印程序的stack traces日志。
#默認為false
logAbandoned=true
#這個不是很懂
#默認為false
abandonedUsageTracking=false
其他
這部分參數比較少用。
#-------------其他--------------------------------
#是否使用快速失敗機制
#默認為空,由驅動決定
fastFailValidation=false
#當使用快速失敗機制時,設置觸發的異常碼
#多個code用","隔開
#disconnectionSqlCodes
#borrow連接的順序
#默認true
lifo=true
#每個連接創建時執行的語句
#connectionInitSqls=
#連接參數:例如username、password、characterEncoding等都可以在這里設置
#多個參數用";"隔開
#connectionProperties=
#指定數據源的jmx名。注意,配置了才能注冊MBean
jmxName=cn.zzs.jmx:type=BasicDataSource,name=zzs001
#查詢超時時間
#默認為空,即根據驅動設置
#defaultQueryTimeout=
#控制PoolGuard是否容許獲取底層連接
#默認為false
accessToUnderlyingConnectionAllowed=false
#如果容許則可以使用下面的方式來獲取底層物理連接:
# Connection conn = ds.getConnection();
# Connection dconn = ((DelegatingConnection) conn).getInnermostDelegate();
# ...
# conn.close();
源碼分析
注意:考慮篇幅和可讀性,以下代碼經過刪減,僅保留所需部分。
創建數據源和連接池
研究之前,先來看下BasicDataSource
的UML
圖:
這里介紹下這幾個類的作用:
類名 | 描述 |
---|---|
BasicDataSource |
用於滿足基本數據庫操作需求的數據源 |
BasicManagedDataSource |
BasicDataSource 的子類,用於創建支持XA 事務或JTA 事務的連接 |
PoolingDataSource |
BasicDataSource 中實際調用的數據源,可以說BasicDataSource 只是封裝了PoolingDataSource |
ManagedDataSource |
PoolingDataSource 的子類,用於支持XA 事務或JTA 事務的連接。是BasicManagedDataSource 中實際調用的數據源,可以說BasicManagedDataSource 只是封裝了ManagedDataSource |
另外,為了支持JNDI
,DBCP
也提供了相應的類。
類名 | 描述 |
---|---|
InstanceKeyDataSource |
用於支持JDNI 環境的數據源 |
PerUserPoolDataSource |
InstanceKeyDataSource 的子類,針對每個用戶會單獨分配一個連接池,每個連接池可以設置不同屬性。例如以下需求,相比user,admin 可以創建更多地連接以保證 |
SharedPoolDataSource |
InstanceKeyDataSource 的子類,不同用戶共享一個連接池 |
本文的源碼分析僅會涉及到BasicDataSource
(包含它封裝的PoolingDataSource
),其他的數據源暫時不擴展。
BasicDataSource.getConnection()
BasicDataSourceFactory.createDataSource(Properties)
只是簡單地new
了一個BasicDataSource
對象並初始化配置參數,此時真正的數據源(PoolingDataSource
)以及連接池(GenericObjectPool
)並沒有創建,而創建的時機為我們第一次調用getConnection()
的時候,如下:
public Connection getConnection() throws SQLException {
return createDataSource().getConnection();
}
但是,當我們設置了 initialSize > 0,則在BasicDataSourceFactory.createDataSource(Properties)
時就會完成數據源和連接池的初始化。感謝moranshouwang的指正。
當然,過程都是相同的,只是時機不一樣。下面從BasicDataSource
的createDataSource()
方法開始分析。
BasicDataSource.createDataSource()
這個方法會創建數據源和連接池,整個過程可以概括為以下幾步:
- 注冊
MBean
,用於支持JMX
; - 創建連接池對象
GenericObjectPool<PoolableConnection>
; - 創建數據源對象
PoolingDataSource<PoolableConnection>
; - 初始化連接數;
- 開啟空閑資源回收線程(如果設置
timeBetweenEvictionRunsMillis
為正數)。
protected DataSource createDataSource() throws SQLException {
if(closed) {
throw new SQLException("Data source is closed");
}
if(dataSource != null) {
return dataSource;
}
synchronized(this) {
if(dataSource != null) {
return dataSource;
}
// 注冊MBean,用於支持JMX,這方面的內容不在這里擴展
jmxRegister();
// 創建原生Connection工廠:本質就是持有數據庫驅動對象和幾個連接參數
final ConnectionFactory driverConnectionFactory = createConnectionFactory();
// 將driverConnectionFactory包裝成池化Connection工廠
PoolableConnectionFactory poolableConnectionFactory = createPoolableConnectionFactory(driverConnectionFactory);
// 設置PreparedStatements緩存(其實在這里可以發現,上面創建池化工廠時就設置了緩存,這里沒必要再設置一遍)
poolableConnectionFactory.setPoolStatements(poolPreparedStatements);
poolableConnectionFactory.setMaxOpenPreparedStatements(maxOpenPreparedStatements);
// 創建數據庫連接池對象GenericObjectPool,用於管理連接
// BasicDataSource將持有GenericObjectPool對象
createConnectionPool(poolableConnectionFactory);
// 創建PoolingDataSource對象
// 該對象持有GenericObjectPool對象的引用
DataSource newDataSource = createDataSourceInstance();
newDataSource.setLogWriter(logWriter);
// 根據我們設置的initialSize創建初始連接
for(int i = 0; i < initialSize; i++) {
connectionPool.addObject();
}
// 開啟連接池的evictor線程
startPoolMaintenance();
// 最后BasicDataSource將持有上面創建的PoolingDataSource對象
dataSource = newDataSource;
return dataSource;
}
}
以上方法涉及到幾個類,這里再補充下UML
圖。
類名 | 描述 |
---|---|
DriverConnectionFactory |
用於生成原生的Connection對象 |
PoolableConnectionFactory |
用於生成池化的Connection對象,持有ConnectionFactory 對象的引用 |
GenericObjectPool |
數據庫連接池,用於管理連接。持有PoolableConnectionFactory 對象的引用 |
獲取連接對象
上面已經大致分析了數據源和連接池對象的獲取過程,接下來研究下連接對象的獲取。在此之前先了解下DBCP
中幾個Connection
實現類。
類名 | 描述 |
---|---|
DelegatingConnection |
Connection 實現類,是以下幾個類的父類 |
PoolingConnection |
用於包裝原生的Connection ,支持緩存prepareStatement 和prepareCall |
PoolableConnection |
用於包裝原生的PoolingConnection (如果沒有開啟poolPreparedStatements ,則包裝的只是原生Connection ),調用close() 時只是將連接還給連接池 |
PoolableManagedConnection |
PoolableConnection 的子類,用於包裝ManagedConnection ,支持JTA 和XA 事務 |
ManagedConnection |
用於包裝原生的Connection ,支持JTA 和XA 事務 |
PoolGuardConnectionWrapper |
用於包裝PoolableConnection ,當accessToUnderlyingConnectionAllowed 才能獲取底層連接對象。我們獲取到的就是這個對象 |
另外,這里先概括下獲得連接的整個過程:
- 如果設置了
removeAbandonedOnBorrow
,達到條件會進行檢測; - 從連接池中獲取連接,如果沒有就通過工廠創建(通過
DriverConnectionFactory
創建原生對象,再通過PoolableConnectionFactory
包裝為池化對象); - 通過工廠重新初始化連接對象;
- 如果設置了
testOnBorrow
或者testOnCreate
,會通過工廠校驗連接有效性; - 使用
PoolGuardConnectionWrapper
包裝連接對象,並返回給客戶端
PoolingDataSource.getConnection()
前面已經說過,BasicDataSource
本質上是調用PoolingDataSource
的方法來獲取連接,所以這里從PoolingDataSource.getConnection()
開始研究。
以下代碼可知,該方法會從連接池中“借出”連接。
public Connection getConnection() throws SQLException {
// 這個泛型C指的是PoolableConnection對象
// 調用的是GenericObjectPool的方法返回PoolableConnection對象,這個方法后面會展開
final C conn = pool.borrowObject();
if (conn == null) {
return null;
}
// 包裝PoolableConnection對象,當accessToUnderlyingConnectionAllowed為true時,可以使用底層連接
return new PoolGuardConnectionWrapper<>(conn);
}
GenericObjectPool.borrowObject()
GenericObjectPool
是一個很簡練的類,里面涉及到的屬性設置和鎖機制都涉及得非常巧妙。
// 存放着連接池所有的連接對象(但不包含已經釋放的)
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
new ConcurrentHashMap<>();
// 存放着空閑連接對象的阻塞隊列
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
// 為n>1表示當前有n個線程正在創建新連接對象
private long makeObjectCount = 0;
// 創建連接對象時所用的鎖
private final Object makeObjectCountLock = new Object();
// 連接對象創建總數量
private final AtomicLong createCount = new AtomicLong(0);
public T borrowObject() throws Exception {
// 如果我們設置了連接獲取等待時間,“借出”過程就必須在指定時間內完成
return borrowObject(getMaxWaitMillis());
}
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
// 校驗連接池是否打開狀態
assertOpen();
// 如果設置了removeAbandonedOnBorrow,達到觸發條件是會遍歷所有連接,未使用時長超過removeAbandonedTimeout的將被釋放掉(一般可以檢測出泄露連接)
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
PooledObject<T> p = null;
// 連接數達到maxTotal是否阻塞等待
final boolean blockWhenExhausted = getBlockWhenExhausted();
boolean create;
final long waitTime = System.currentTimeMillis();
// 如果獲取的連接對象為空,會再次進入獲取
while (p == null) {
create = false;
// 獲取空閑隊列的第一個元素,如果為空就試圖創建新連接
p = idleObjects.pollFirst();
if (p == null) {
// 后面分析這個方法
p = create();
if (p != null) {
create = true;
}
}
// 連接數達到maxTotal且暫時沒有空閑連接,這時需要阻塞等待,直到獲得空閑隊列中的連接或等待超時
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
// 無限等待
p = idleObjects.takeFirst();
} else {
// 等待maxWaitMillis
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
// 這個時候還是沒有就只能拋出異常
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
// 如果連接處於空閑狀態,會修改連接的state、lastBorrowTime、lastUseTime、borrowedCount等,並返回true
if (!p.allocate()) {
p = null;
}
if (p != null) {
// 利用工廠重新初始化連接對象,這里會去校驗連接存活時間、設置lastUsedTime、及其他初始參數
try {
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
// 根據設置的參數,判斷是否檢測連接有效性
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
// 這里會去校驗連接的存活時間是否超過maxConnLifetimeMillis,以及通過SQL去校驗執行時間
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
// 如果校驗不通過,會釋放該對象
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
// 更新borrowedCount、idleTimes和waitTimes
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
GenericObjectPool.create()
這里在創建連接對象時采用的鎖機制非常值得學習,簡練且高效。
private PooledObject<T> create() throws Exception {
int localMaxTotal = getMaxTotal();
if (localMaxTotal < 0) {
localMaxTotal = Integer.MAX_VALUE;
}
final long localStartTimeMillis = System.currentTimeMillis();
final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0);
// 創建標識:
// - TRUE: 調用工廠創建返回對象
// - FALSE: 直接返回null
// - null: 繼續循環
Boolean create = null;
while (create == null) {
synchronized (makeObjectCountLock) {
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
// 當前池已經達到maxTotal,或者有另外一個線程正在試圖創建一個新的連接使之達到容量極限
createCount.decrementAndGet();
if (makeObjectCount == 0) {
// 連接池確實已達到容量極限
create = Boolean.FALSE;
} else {
// 當前另外一個線程正在試圖創建一個新的連接使之達到容量極限,此時需要等待
makeObjectCountLock.wait(localMaxWaitTimeMillis);
}
} else {
// 當前連接池容量未到達極限,可以繼續創建連接對象
makeObjectCount++;
create = Boolean.TRUE;
}
}
// 當達到maxWaitTimeMillis時不創建連接對象,直接退出循環
if (create == null &&
(localMaxWaitTimeMillis > 0 &&
System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
create = Boolean.FALSE;
}
}
if (!create.booleanValue()) {
return null;
}
final PooledObject<T> p;
try {
// 調用工廠創建對象,后面對這個方法展開分析
p = factory.makeObject();
} catch (final Throwable e) {
createCount.decrementAndGet();
throw e;
} finally {
synchronized (makeObjectCountLock) {
// 創建標識-1
makeObjectCount--;
// 喚醒makeObjectCountLock鎖住的對象
makeObjectCountLock.notifyAll();
}
}
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getLogAbandoned()) {
p.setLogAbandoned(true);
// TODO: in 3.0, this can use the method defined on PooledObject
if (p instanceof DefaultPooledObject<?>) {
((DefaultPooledObject<T>) p).setRequireFullStackTrace(ac.getRequireFullStackTrace());
}
}
// 連接數量+1
createdCount.incrementAndGet();
// 將創建的對象放入allObjects
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}
PoolableConnectionFactory.makeObject()
public PooledObject<PoolableConnection> makeObject() throws Exception {
// 創建原生的Connection對象
Connection conn = connectionFactory.createConnection();
if (conn == null) {
throw new IllegalStateException("Connection factory returned null from createConnection");
}
try {
// 執行我們設置的connectionInitSqls
initializeConnection(conn);
} catch (final SQLException sqle) {
// Make sure the connection is closed
try {
conn.close();
} catch (final SQLException ignore) {
// ignore
}
// Rethrow original exception so it is visible to caller
throw sqle;
}
// 連接索引+1
final long connIndex = connectionIndex.getAndIncrement();
// 如果設置了poolPreparedStatements,則創建包裝連接為PoolingConnection對象
if (poolStatements) {
conn = new PoolingConnection(conn);
final GenericKeyedObjectPoolConfig<DelegatingPreparedStatement> config = new GenericKeyedObjectPoolConfig<>();
config.setMaxTotalPerKey(-1);
config.setBlockWhenExhausted(false);
config.setMaxWaitMillis(0);
config.setMaxIdlePerKey(1);
config.setMaxTotal(maxOpenPreparedStatements);
if (dataSourceJmxObjectName != null) {
final StringBuilder base = new StringBuilder(dataSourceJmxObjectName.toString());
base.append(Constants.JMX_CONNECTION_BASE_EXT);
base.append(Long.toString(connIndex));
config.setJmxNameBase(base.toString());
config.setJmxNamePrefix(Constants.JMX_STATEMENT_POOL_PREFIX);
} else {
config.setJmxEnabled(false);
}
final PoolingConnection poolingConn = (PoolingConnection) conn;
final KeyedObjectPool<PStmtKey, DelegatingPreparedStatement> stmtPool = new GenericKeyedObjectPool<>(
poolingConn, config);
poolingConn.setStatementPool(stmtPool);
poolingConn.setCacheState(cacheState);
}
// 用於注冊連接到JMX
ObjectName connJmxName;
if (dataSourceJmxObjectName == null) {
connJmxName = null;
} else {
connJmxName = new ObjectName(
dataSourceJmxObjectName.toString() + Constants.JMX_CONNECTION_BASE_EXT + connIndex);
}
// 創建PoolableConnection對象
final PoolableConnection pc = new PoolableConnection(conn, pool, connJmxName, disconnectionSqlCodes,
fastFailValidation);
pc.setCacheState(cacheState);
// 包裝成連接池所需的對象
return new DefaultPooledObject<>(pc);
}
空閑對象回收器Evictor
以上基本已分析完連接對象的獲取過程,下面再研究下空閑對象回收器。前面已經講到當創建完數據源對象時會開啟連接池的evictor
線程,所以我們從BasicDataSource.startPoolMaintenance()
開始分析。
BasicDataSource.startPoolMaintenance()
前面說過timeBetweenEvictionRunsMillis
為非正數時不會開啟開啟空閑對象回收器,從以下代碼可以理解具體邏輯。
protected void startPoolMaintenance() {
// 只有timeBetweenEvictionRunsMillis為正數,才會開啟空閑對象回收器
if (connectionPool != null && timeBetweenEvictionRunsMillis > 0) {
connectionPool.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
}
}
BaseGenericObjectPool.setTimeBetweenEvictionRunsMillis(long)
這個BaseGenericObjectPool
是上面說到的GenericObjectPool
的父類。
public final void setTimeBetweenEvictionRunsMillis(
final long timeBetweenEvictionRunsMillis) {
// 設置回收線程運行間隔時間
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
// 繼續調用本類的方法,下面繼續進入方法分析
startEvictor(timeBetweenEvictionRunsMillis);
}
BaseGenericObjectPool.startEvictor(long)
這里會去定義一個Evictor
對象,這個其實是一個Runnable
對象,后面會講到。
final void startEvictor(final long delay) {
synchronized (evictionLock) {
if (null != evictor) {
EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
evictor = null;
evictionIterator = null;
}
// 創建回收器任務,並執行定時調度
if (delay > 0) {
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
EvictionTimer.schedule(Evictor, long, long)
DBCP
是使用ScheduledThreadPoolExecutor
來實現回收器的定時檢測。 涉及到ThreadPoolExecutor
為JDK
自帶的api
,這里不再深入分析線程池如何實現定時調度。感興趣的朋友可以復習下常用的幾款線程池。
static synchronized void schedule(
final BaseGenericObjectPool<?>.Evictor task, final long delay, final long period)
if (null == executor) {
// 創建線程池,隊列為DelayedWorkQueue,corePoolSize為1,maximumPoolSize為無限大
executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory());
// 當任務被取消的同時從等待隊列中移除
executor.setRemoveOnCancelPolicy(true);
}
// 設置任務定時調度
final ScheduledFuture<?> scheduledFuture =
executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS);
task.setScheduledFuture(scheduledFuture);
}
BaseGenericObjectPool.Evictor
Evictor
是BaseGenericObjectPool
的內部類,實現了Runnable
接口,這里看下它的run方法。
class Evictor implements Runnable {
private ScheduledFuture<?> scheduledFuture;
@Override
public void run() {
final ClassLoader savedClassLoader =
Thread.currentThread().getContextClassLoader();
try {
// 確保回收器使用的類加載器和工廠對象的一樣
if (factoryClassLoader != null) {
final ClassLoader cl = factoryClassLoader.get();
if (cl == null) {
cancel();
return;
}
Thread.currentThread().setContextClassLoader(cl);
}
try {
// 回收符合條件的對象,后面繼續擴展
evict();
} catch(final Exception e) {
swallowException(e);
} catch(final OutOfMemoryError oome) {
// Log problem but give evictor thread a chance to continue
// in case error is recoverable
oome.printStackTrace(System.err);
}
try {
// 確保最小空閑對象
ensureMinIdle();
} catch (final Exception e) {
swallowException(e);
}
} finally {
Thread.currentThread().setContextClassLoader(savedClassLoader);
}
}
void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
void cancel() {
scheduledFuture.cancel(false);
}
}
GenericObjectPool.evict()
這里的回收過程包括以下四道校驗:
-
按照
evictionPolicy
校驗idleSoftEvictTime
、idleEvictTime
; -
利用工廠重新初始化樣本,這里會校驗
maxConnLifetimeMillis
(testWhileIdle
為true); -
校驗
maxConnLifetimeMillis
和validationQueryTimeout
(testWhileIdle
為true); -
校驗所有連接的未使用時間是否超過r
emoveAbandonedTimeout
(removeAbandonedOnMaintenance
為true)。
public void evict() throws Exception {
// 校驗當前連接池是否關閉
assertOpen();
if (idleObjects.size() > 0) {
PooledObject<T> underTest = null;
// 介紹參數時已經講到,這個evictionPolicy我們可以自定義
final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
synchronized (evictionLock) {
final EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleTimeMillis(),
getSoftMinEvictableIdleTimeMillis(),
getMinIdle());
final boolean testWhileIdle = getTestWhileIdle();
// 獲取我們指定的樣本數,並開始遍歷
for (int i = 0, m = getNumTests(); i < m; i++) {
if (evictionIterator == null || !evictionIterator.hasNext()) {
evictionIterator = new EvictionIterator(idleObjects);
}
if (!evictionIterator.hasNext()) {
// Pool exhausted, nothing to do here
return;
}
try {
underTest = evictionIterator.next();
} catch (final NoSuchElementException nsee) {
// 當前樣本正被另一個線程借出
i--;
evictionIterator = null;
continue;
}
// 判斷如果樣本是空閑狀態,設置為EVICTION狀態
// 如果不是,說明另一個線程已經借出了這個樣本
if (!underTest.startEvictionTest()) {
i--;
continue;
}
boolean evict;
try {
// 調用回收策略來判斷是否回收該樣本,按照默認策略,以下情況都會返回true:
// 1. 樣本空閑時間大於我們設置的idleSoftEvictTime,且當前池中空閑連接數量>minIdle
// 2. 樣本空閑時間大於我們設置的idleEvictTime
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
swallowException(new Exception(t));
evict = false;
}
// 如果需要回收,則釋放這個樣本
if (evict) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
// 如果設置了testWhileIdle,會
if (testWhileIdle) {
boolean active = false;
try {
// 利用工廠重新初始化樣本,這里會校驗maxConnLifetimeMillis
factory.activateObject(underTest);
active = true;
} catch (final Exception e) {
// 拋出異常標識校驗不通過,釋放樣本
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
if (active) {
// 接下來會校驗maxConnLifetimeMillis和validationQueryTimeout
if (!factory.validateObject(underTest)) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
try {
// 這里會將樣本rollbackOnReturn、autoCommitOnReturn等
factory.passivateObject(underTest);
} catch (final Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
}
}
}
// 如果狀態為EVICTION或EVICTION_RETURN_TO_HEAD,修改為IDLE
if (!underTest.endEvictionTest(idleObjects)) {
//空
}
}
}
}
}
// 校驗所有連接的未使用時間是否超過removeAbandonedTimeout
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
removeAbandoned(ac);
}
}
以上已基本研究完數據源創建、連接對象獲取和空閑資源回收器,后續有空再做補充。
通過JNDI獲取數據源對象
需求
本文測試使用JNDI
獲取PerUserPoolDataSource
和SharedPoolDataSource
對象,選擇使用tomcat 9.0.21
作容器。
如果之前沒有接觸過JNDI
,並不會影響下面例子的理解,其實可以理解為像spring
的bean
配置和獲取。
源碼分析時已經講到,除了我們熟知的BasicDataSource
,DBCP
還提供了通過JDNI
獲取數據源,如下表。
類名 | 描述 |
---|---|
InstanceKeyDataSource |
用於支持JDNI 環境的數據源,是以下兩個類的父類 |
PerUserPoolDataSource |
InstanceKeyDataSource 的子類,針對每個用戶會單獨分配一個連接池,每個連接池可以設置不同屬性。例如以下需求,相比user,admin 可以創建更多地連接以保證 |
SharedPoolDataSource |
InstanceKeyDataSource 的子類,不同用戶共享一個連接池 |
引入依賴
本文在前面例子的基礎上增加以下依賴,因為是web項目,所以打包方式為war
:
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>javax.servlet.jsp-api</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
編寫context.xml
在webapp
文件下創建目錄META-INF
,並創建context.xml
文件。這里面的每個resource
節點都是我們配置的對象,類似於spring
的bean
節點。其中bean/DriverAdapterCPDS
這個對象需要被另外兩個使用到。
<?xml version="1.0" encoding="UTF-8"?>
<Context>
<Resource
name="bean/SharedPoolDataSourceFactory"
auth="Container"
type="org.apache.commons.dbcp2.datasources.SharedPoolDataSource"
factory="org.apache.commons.dbcp2.datasources.SharedPoolDataSourceFactory"
singleton="false"
driverClassName="com.mysql.cj.jdbc.Driver"
url="jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true"
username="root"
password="root"
maxTotal="8"
maxIdle="10"
dataSourceName="java:comp/env/bean/DriverAdapterCPDS"
/>
<Resource
name="bean/PerUserPoolDataSourceFactory"
auth="Container"
type="org.apache.commons.dbcp2.datasources.PerUserPoolDataSource"
factory="org.apache.commons.dbcp2.datasources.PerUserPoolDataSourceFactory"
singleton="false"
driverClassName="com.mysql.cj.jdbc.Driver"
url="jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true"
username="root"
password="root"
maxTotal="8"
maxIdle="10"
dataSourceName="java:comp/env/bean/DriverAdapterCPDS"
/>
<Resource
name="bean/DriverAdapterCPDS"
auth="Container"
type="org.apache.commons.dbcp2.cpdsadapter.DriverAdapterCPDS"
factory="org.apache.commons.dbcp2.cpdsadapter.DriverAdapterCPDS"
singleton="false"
driverClassName="com.mysql.cj.jdbc.Driver"
url="jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true"
userName="root"
userPassword="root"
maxIdle="10"
/>
</Context>
編寫web.xml
在web-app
節點下配置資源引用,每個resource-env-ref
指向了我們配置好的對象。
<resource-env-ref>
<description>Test DriverAdapterCPDS</description>
<resource-env-ref-name>bean/DriverAdapterCPDS</resource-env-ref-name>
<resource-env-ref-type>org.apache.commons.dbcp2.cpdsadapter.DriverAdapterCPDS</resource-env-ref-type>
</resource-env-ref>
<resource-env-ref>
<description>Test SharedPoolDataSource</description>
<resource-env-ref-name>bean/SharedPoolDataSourceFactory</resource-env-ref-name>
<resource-env-ref-type>org.apache.commons.dbcp2.datasources.SharedPoolDataSource</resource-env-ref-type>
</resource-env-ref>
<resource-env-ref>
<description>Test erUserPoolDataSource</description>
<resource-env-ref-name>bean/erUserPoolDataSourceFactory</resource-env-ref-name>
<resource-env-ref-type>org.apache.commons.dbcp2.datasources.erUserPoolDataSource</resource-env-ref-type>
</resource-env-ref>
編寫jsp
因為需要在web
環境中使用,如果直接建類寫個main
方法測試,會一直報錯的,目前沒找到好的辦法。這里就簡單地使用jsp
來測試吧(這是從tomcat官網參照的例子)。
<body>
<%
// 獲得名稱服務的上下文對象
Context initCtx = new InitialContext();
Context envCtx = (Context)initCtx.lookup("java:comp/env/");
// 查找指定名字的對象
DataSource ds = (DataSource)envCtx.lookup("bean/SharedPoolDataSourceFactory");
DataSource ds2 = (DataSource)envCtx.lookup("bean/PerUserPoolDataSourceFactory");
// 獲取連接
Connection conn = ds.getConnection("root","root");
System.out.println("conn" + conn);
Connection conn2 = ds2.getConnection("zzf","zzf");
System.out.println("conn2" + conn2);
// ... 使用連接操作數據庫,以及釋放資源 ...
conn.close();
conn2.close();
%>
</body>
測試結果
打包項目在tomcat9
上運行,訪問 http://localhost:8080/DBCP-demo/testInstanceKeyDataSource.jsp ,控制台打印如下內容:
conn=1971654708, URL=jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true, UserName=root@localhost, MySQL Connector/J
conn2=128868782, URL=jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true, UserName=zzf@localhost, MySQL Connector/J
使用DBCP測試兩階段提交
前面源碼分析已經講到,以下類用於支持JTA
事務。本文將介紹如何使用DBCP
來實現JTA
事務兩階段提交(當然,實際項目並不支持使用2PC
,因為性能開銷太大)。
類名 | 描述 |
---|---|
BasicManagedDataSource |
BasicDataSource 的子類,用於創建支持XA 事務或JTA 事務的連接 |
ManagedDataSource |
PoolingDataSource 的子類,用於支持XA 事務或JTA 事務的連接。是BasicManagedDataSource 中實際調用的數據源,可以說BasicManagedDataSource 只是封裝了ManagedDataSource |
准備工作
因為測試例子使用的是mysql
,使用XA
事務需要開啟支持。注意,mysql
只有innoDB
引擎才支持(另外,XA
事務和常規事務是互斥的,如果開啟了XA
事務,其他線程進來即使只讀也是不行的)。
SHOW VARIABLES LIKE '%xa%' -- 查看XA事務是否開啟
SET innodb_support_xa = ON -- 開啟XA事務
除了原來的github_demo
數據庫,我另外建了一個test
數據庫,簡單地模擬兩個數據庫。
mysql的XA事務使用
測試之前,這里簡單回顧下直接使用sql
操作XA
事務的過程,將有助於對以下內容的理解:
XA START 'my_test_xa'; -- 啟動一個xid為my_test_xa的事務,並使之為active狀態
UPDATE github_demo.demo_user SET deleted = 1 WHERE id = '1'; -- 事務中的語句
XA END 'my_test_xa'; -- 把事務置為idle狀態
XA PREPARE 'my_test_xa'; -- 把事務置為prepare狀態
XA COMMIT 'my_test_xa'; -- 提交事務
XA ROLLBACK 'my_test_xa'; -- 回滾事務
XA RECOVER; -- 查看處於prepare狀態的事務列表
引入依賴
在入門例子的基礎上,增加以下依賴,本文采用第三方atomikos
的實現。
<!-- jta:用於測試DBCP對JTA事務的支持 -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>3.9.3</version>
</dependency>
獲取BasicManagedDataSource
這里千萬記得要設置DefaultCatalog
,否則當前事務中注冊不同資源管理器時,可能都會被當成同一個資源管理器而拒絕注冊並報錯,因為這個問題,花了我好長時間才解決。
public BasicManagedDataSource getBasicManagedDataSource(
TransactionManager transactionManager,
String url,
String username,
String password) {
BasicManagedDataSource basicManagedDataSource = new BasicManagedDataSource();
basicManagedDataSource.setTransactionManager(transactionManager);
basicManagedDataSource.setUrl(url);
basicManagedDataSource.setUsername(username);
basicManagedDataSource.setPassword(password);
basicManagedDataSource.setDefaultAutoCommit(false);
basicManagedDataSource.setXADataSource("com.mysql.cj.jdbc.MysqlXADataSource");
return basicManagedDataSource;
}
@Test
public void test01() throws Exception {
// 獲得事務管理器
TransactionManager transactionManager = new UserTransactionManager();
// 獲取第一個數據庫的數據源
BasicManagedDataSource basicManagedDataSource1 = getBasicManagedDataSource(
transactionManager,
"jdbc:mysql://localhost:3306/github_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true",
"root",
"root");
// 注意,這一步非常重要
basicManagedDataSource1.setDefaultCatalog("github_demo");
// 獲取第二個數據庫的數據源
BasicManagedDataSource basicManagedDataSource2 = getBasicManagedDataSource(
transactionManager,
"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=true",
"zzf",
"zzf");
// 注意,這一步非常重要
basicManagedDataSource1.setDefaultCatalog("test");
}
編寫兩階段提交的代碼
通過運行代碼可以發現,當數據庫1和2的操作都成功,才會提交,只要其中一個數據庫執行失敗,兩個操作都會回滾。
@Test
public void test01() throws Exception {
Connection connection1 = null;
Statement statement1 = null;
Connection connection2 = null;
Statement statement2 = null;
transactionManager.begin();
try {
// 獲取連接並進行數據庫操作,這里會將會將XAResource注冊到當前線程的XA事務對象
/**
* XA START xid1;-- 啟動一個事務,並使之為active狀態
*/
connection1 = basicManagedDataSource1.getConnection();
statement1 = connection1.createStatement();
/**
* update github_demo.demo_user set deleted = 1 where id = '1'; -- 事務中的語句
*/
boolean result1 = statement1.execute("update github_demo.demo_user set deleted = 1 where id = '1'");
System.out.println(result1);
/**
* XA START xid2;-- 啟動一個事務,並使之為active狀態
*/
connection2 = basicManagedDataSource2.getConnection();
statement2 = connection2.createStatement();
/**
* update test.demo_user set deleted = 1 where id = '1'; -- 事務中的語句
*/
boolean result2 = statement2.execute("update test.demo_user set deleted = 1 where id = '1'");
System.out.println(result2);
/**
* 當這執行以下語句:
* XA END xid1; -- 把事務置為idle狀態
* XA PREPARE xid1; -- 把事務置為prepare狀態
* XA END xid2; -- 把事務置為idle狀態
* XA PREPARE xid2; -- 把事務置為prepare狀態
* XA COMMIT xid1; -- 提交事務
* XA COMMIT xid2; -- 提交事務
*/
transactionManager.commit();
} catch(Exception e) {
e.printStackTrace();
} finally {
statement1.close();
statement2.close();
connection1.close();
connection2.close();
}
}
本文為原創文章,轉載請附上原文出處鏈接:https://www.cnblogs.com/ZhangZiSheng001/p/12003922.html