前言
在我們平時開發中,使用數據庫連接池時使用阿里的Druid連接池已經比較常見了,但是我們在集成到Springboot時似乎非常簡單,只需要簡單的配置即可使用,那么Druid是怎么加載的呢,本文就從源碼層面進行揭秘
使用
首先簡單的介紹下如何使用
1、pom.xml加載jar包,直接使用集成springboot的jar
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.10</version> </dependency>
2、application.properties進行配置
spring.datasource.url=jdbc:mysql://localhost:3306/mynote spring.datasource.username=root spring.datasource.password=root # 使用阿里的DruidDataSource數據源 spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver # 初始化連接數,默認為0 spring.datasource.druid.initial-size=0 # 最大連接數,默認為8 spring.datasource.druid.max-active=8
主要配置參數就是初始化連接數和最大連接數,最大連接數一般不需要配置的太大,一般8核cpu使用8個線程就可以了,原因是8核cpu同時可以處理的線程數只有8,設置的太大反而會造成CPU時間片的頻繁切換
源碼
首先我們沒有做任何代碼上的配置,為什么druid可以加載呢?那么就很容易聯想到springboot的自動裝配機制,所以我們看druid-spring-boot-starter jar包,這是一個start組件,所以我們直接看他的spring.factories文件,自動裝配的機制這里不做介紹,可以看這篇文章
1 @Configuration 2 @ConditionalOnClass(DruidDataSource.class) 3 @AutoConfigureBefore(DataSourceAutoConfiguration.class) 4 @EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class}) 5 @Import({DruidSpringAopConfiguration.class, 6 DruidStatViewServletConfiguration.class, 7 DruidWebStatFilterConfiguration.class, 8 DruidFilterConfiguration.class}) 9 public class DruidDataSourceAutoConfigure { 10 11 private static final Logger LOGGER = LoggerFactory.getLogger(DruidDataSourceAutoConfigure.class); 12 13 @Bean(initMethod = "init") 14 @ConditionalOnMissingBean 15 public DataSource dataSource() { 16 LOGGER.info("Init DruidDataSource"); 17 return new DruidDataSourceWrapper(); 18 } 19 }
初始化了一個DataSource,實現類是DruidDataSourceWrapper,這個DataSource就是我們jdk提供jdbc操作的一個很重要的接口
到這里DataSource已經初始化完成了
我們開始從使用的地方入手,我的項目是基於Mybatis查詢數據庫的,這里從Mybatis查詢開始入手
我們都知道Mybatis查詢最終必定會從mybatis的Executor的query開始執行
所以我們在BaseExecutor的query方法打上斷點,果然進來了,然后我們繼續看
1 @Override 2 public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { 3 ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId()); 4 if (closed) { 5 throw new ExecutorException("Executor was closed."); 6 } 7 if (queryStack == 0 && ms.isFlushCacheRequired()) { 8 clearLocalCache(); 9 } 10 List<E> list; 11 try { 12 queryStack++; 13 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null; 14 if (list != null) { 15 handleLocallyCachedOutputParameters(ms, key, parameter, boundSql); 16 } else { 17 // 核心代碼 18 list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql); 19 } 20 } finally { 21 queryStack--; 22 } 23 ...... 24 return list; 25 }
我們只看核心代碼,進入queryFromDatabase
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException { List<E> list; localCache.putObject(key, EXECUTION_PLACEHOLDER); try { // 核心代碼 list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql); } finally { localCache.removeObject(key); } localCache.putObject(key, list); if (ms.getStatementType() == StatementType.CALLABLE) { localOutputParameterCache.putObject(key, parameter); } return list; }
繼續跟
1 @Override 2 public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException { 3 Statement stmt = null; 4 try { 5 Configuration configuration = ms.getConfiguration(); 6 StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql); 7 // 核心代碼 8 stmt = prepareStatement(handler, ms.getStatementLog()); 9 return handler.query(stmt, resultHandler); 10 } finally { 11 closeStatement(stmt); 12 } 13 }
這里我們看到獲取了一個Statement ,這個Statement 是我們java原生操作數據庫的一個很重要的類,這個Statement 應該是需要從一個數據庫連接(Connection)上獲取的,這里就很重要了,所以我們就需要看在里面是怎么獲取Connection的就可以了
1 private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { 2 Statement stmt; 3 // 核心 4 Connection connection = getConnection(statementLog); 5 stmt = handler.prepare(connection, transaction.getTimeout()); 6 handler.parameterize(stmt); 7 return stmt; 8 }
繼續
1 protected Connection getConnection(Log statementLog) throws SQLException { 2 // 核心代碼 3 Connection connection = transaction.getConnection(); 4 if (statementLog.isDebugEnabled()) { 5 return ConnectionLogger.newInstance(connection, statementLog, queryStack); 6 } else { 7 return connection; 8 } 9 }
核心代碼,獲取Connection,進入了SpringManagedTransaction的getConnection方法
1 @Override 2 public Connection getConnection() throws SQLException { 3 if (this.connection == null) { 4 // 核心代碼 5 openConnection(); 6 } 7 return this.connection; 8 }
繼續
private void openConnection() throws SQLException { // 核心代碼 this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); }
核心代碼處,這個this.dataSource就是我們一開始通過自動裝配初始化的。
DataSourceUtils這個類是spring提供的,也就是最終數據源的策略是通過spring提供的擴展機制,實現不同的dataSource來實現不同功能的
繼續
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { // 核心代碼 return doGetConnection(dataSource); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex); } catch (IllegalStateException ex) { throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage()); } }
繼續
public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(fetchConnection(dataSource)); } return conHolder.getConnection(); } // Else we either got no holder or an empty thread-bound holder here. logger.debug("Fetching JDBC Connection from DataSource"); // 核心代碼 Connection con = fetchConnection(dataSource); ...... return con; }
private static Connection fetchConnection(DataSource dataSource) throws SQLException { // 核心代碼 Connection con = dataSource.getConnection(); if (con == null) { throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource); } return con; }
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { // 核心代碼1 init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); // 核心代碼2 return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
這里的核心代碼1也很重要的,這里我們后續再看
繼續看dataSource_connect
@Override public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this.pos < filterSize) { // 核心代碼 DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis); return conn; } return dataSource.getConnectionDirect(maxWaitMillis); }
繼續,進入了StatFilter的dataSource_getConnection
@Override public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource, long maxWaitMillis) throws SQLException { // 核心代碼 DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis); if (conn != null) { conn.setConnectedTimeNano(); StatFilterContext.getInstance().pool_connection_open(); } return conn; }
繼續,然后又回到了FilterChainImpl的dataSource_connect
@Override public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this.pos < filterSize) { DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis); return conn; } // 核心代碼 return dataSource.getConnectionDirect(maxWaitMillis); }
這個時候走了下面這個方法
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { // 核心代碼 poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; } ...... }
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { DruidConnectionHolder holder; ...... // 上面做了各種邏輯判斷,此處不關注 if (maxWait > 0) { holder = pollLast(nanos); } else { // 核心代碼1 holder = takeLast(); } ...... holder.incrementUseCount(); // 核心代碼2 DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection; }
核心代碼1處獲取了一個DruidConnectionHolder,DruidConnectionHolder里面有個關鍵的成員變量,就是我們的連接Connection
DruidConnectionHolder takeLast() throws InterruptedException, SQLException { try { while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && failContinuous.get()) { throw new DataSourceNotAvailableException(createError); } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { notEmpty.await(); // signal by recycle or creator } finally { notEmptyWaitThreadCount--; } notEmptyWaitCount++; if (!enable) { connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceDisableException(); } } } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread notEmptySignalCount++; throw ie; } // 核心代碼1 decrementPoolingCount(); // 核心代碼2 DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null; return last; }
這里的decrementPoolingCount就是把一個int的變量poolingCount-1,然后在connections數組里面取某一個Connection
這里就已經看到核心代碼了,connections就是我們的線程池了,是一個數組類型,里面存放了我們需要的連接,依靠一個指針poolingCount來控制當前應該可以取哪一個下標的Connection
查看斷點,可以看到里面有8個Connection,也就是我們初始線程池數量
接下來再看下之前沒看的init
public void init() throws SQLException { ...... // 核心代碼1 connections = new DruidConnectionHolder[maxActive]; evictConnections = new DruidConnectionHolder[maxActive]; keepAliveConnections = new DruidConnectionHolder[maxActive]; SQLException connectError = null; if (createScheduler != null) { for (int i = 0; i < initialSize; ++i) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(true); this.createSchedulerFuture = createScheduler.submit(task); } } else if (!asyncInit) { try { // init connections for (int i = 0; i < initialSize; ++i) { // 核心代碼2 PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount] = holder; incrementPoolingCount(); } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); connectError = ex; } } ...... } }
核心代碼1,初始化了一個最大連接數的數組
核心代碼2,初始化初始連接數數量的線程池連接
到這里,核心代碼就全部看完了,本文是從Mybatis查詢開始看代碼的,實際上核心代碼可以直接從DataSource的getConnection方法開始看
總結
Druid連接池的核心功能主要就是注冊一個DataSource的bean,連接池、獲取連接等都依賴於DataSource的實現類DruidDataSourceWrapper,連接池功能主要是維護了一個數組,在項目啟動時提前創建了一些數據庫連接放到了里面復用
參考:https://blog.csdn.net/qq_31086797/article/details/114631032