PooledDataSource主要涉及到兩個類PooledConnection以及PoolState
PooledConnection:PooledDataSource中創建的數據庫連接,可以獲得實際的realConnection和proxyConnection鏈接對象,重點說明PooledConnection實現了InvocationHandler(即實現Aop時用到的一種動態代理)接口
PoolState:與PooledDataSource對應,記錄數據源對應連接池的狀態,包括空閑、活動鏈接數量,以及連接池的統計信息。
PooledConnection下的屬性說明;
Connection realConnection:真正的 用於連接數據庫的連接,即各個JDBC封裝后的Connection,例如org.hsqldb.jdbc.JDBCConnection實現的JDBC
Connection proxyConnection:通過動態代理獲得的連接,
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
下面主要介紹PooledDataSource類中的方法:
1、構造方法:使用UnpooledDataSource方法返回UnpooledDataSource對象,同時生成數據源的hashcode(由url+username+password的hashcode)
public PooledDataSource(String driver, String url, String username, String password) {
dataSource = new UnpooledDataSource(driver, url, username, password);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
2、從連接池中獲得鏈接,getConnection()是個中間方法,實際調用的是popConnection方法,下面介紹popConnection即從鏈接池中拿Connection。
@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; //while + synchronized (state) 用於以同步方式拿出connection while (conn == null) { synchronized (state) { //如果PoolState中的空閑連接數不為空,則直接拿出第一個鏈接 if (!state.idleConnections.isEmpty()) { // Pool has available connection conn = state.idleConnections.remove(0); if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else { // Pool does not have available connection //如果PoolState中的空閑連接數為空,但是活動的鏈接數小於PooledDataSource設置的最大鏈接數,則可以新建鏈接 if (state.activeConnections.size() < poolMaximumActiveConnections) { // Can create new connection conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // Cannot create new connection //如果PoolState中的空閑連接數為空,並且活動的鏈接數不小於PooledDataSource設置的最大鏈接數,則先獲得活動鏈接中最先使用鏈接(即最老的),判斷是否超時時間,如果超時了,則重新從活動連接池中去掉該連接,然后在PoolState設置一些信息,返回該連接 PooledConnection oldestActiveConnection = state.activeConnections.get(0); long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { oldestActiveConnection.getRealConnection().rollback(); } conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait //如果PoolState中的空閑連接數為空,並且活動的鏈接數不小於PooledDataSource設置的最大鏈接數,則先獲得活動鏈接中最先使用鏈接(即最老的),判斷是否超時時間,如果沒有超時,則必須等待 try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } //獲得conn后,需要進一步判斷該連接狀態,包括非空校驗等 if (conn != null) { if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } state.badConnectionCount++; localBadConnectionCount++; conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } } if (conn == null) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } return conn; }
3、向連接池中push連接。
protected void pushConnection(PooledConnection conn) throws SQLException { synchronized (state) { state.activeConnections.remove(conn); if (conn.isValid()) { if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } //主要工作是根據傳入conn重新生成PooledConnection ,然后把con設置失效 PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } //通知其他等待連接的PooledDataSource,可以pop連接了 state.notifyAll(); } else { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; } } }
4、設置各種屬性時都會調用forceCloseAll()方法,按照注解的意思是關閉所有活動的和空閑的連接。
/* * Closes all active and idle connections in the pool */ public void forceCloseAll() { synchronized (state) { expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); for (int i = state.activeConnections.size(); i > 0; i--) { try { PooledConnection conn = state.activeConnections.remove(i - 1); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { // ignore } } for (int i = state.idleConnections.size(); i > 0; i--) { try { PooledConnection conn = state.idleConnections.remove(i - 1); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { // ignore } } } if (log.isDebugEnabled()) { log.debug("PooledDataSource forcefully closed/removed all connections."); } }
5、獲得操作數據庫的實際連接,即未包裝過的連接,通過動態代理方式獲得連接對象
/* * Unwraps a pooled connection to get to the 'real' connection * * @param conn - the pooled connection to unwrap * @return The 'real' connection */ public static Connection unwrapConnection(Connection conn) { if (Proxy.isProxyClass(conn.getClass())) { InvocationHandler handler = Proxy.getInvocationHandler(conn); if (handler instanceof PooledConnection) { return ((PooledConnection) handler).getRealConnection(); } } return conn; }