從DataSource.getConnection入手來看一下通過druid獲取連接時的內部邏輯。
首先進入DruidDataSource的這個方法
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { //初始化連接 數量為設置的InitilSize,如果沒有配置則為0,MaxActive最大為8
init(); // 如果有FilterChain,走FilterChain的邏輯 if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else {
//真正的獲取連接的方法 return getConnectionDirect(maxWaitMillis); } }
正常情況下通過getConnectionDirect來直接獲取連接,里面傳了一個最大等待時間,稍后我們分析這個參數的作用。
先看上部分邏輯:
int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue; } throw ex; }
這一部分很好理解,一個for循環里面通過getConnectionInternal(maxWaitMillis)方法來獲取一個真實的物理連接,
如果捕捉到獲取連接超時的異常,去判斷notFullTimeoutRetryCnt 這個參數是否小於設定的超時重試參數,並且
在連接池未滿的情況下繼續進行重試。如果重試次數已經足夠或者此時連接池已經滿了(其它線程獲取到連接)則
將異常拋出去。isFull方法用來判斷存在池子內的連接和活躍連接(已經被線程brrow走了)的數量是否大於等於最大
連接數MaxActiveCount。
進入getConnectionInternal內部:
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
//如果當前連接池已經關閉 則拋出異常 if (closed) { connectErrorCountUpdater.incrementAndGet(this); throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); } //如果當期連接池狀態為不可用 則拋出異常 if (!enable) { connectErrorCountUpdater.incrementAndGet(this); if (disableException != null) { throw disableException; } throw new DataSourceDisableException(); } final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this.maxWaitThreadCount; DruidConnectionHolder holder; for (boolean createDirect = false;;) { if (createDirect) { createStartNanosUpdater.set(this, System.nanoTime()); if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); holder.lastActiveTimeMillis = System.currentTimeMillis(); creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); if (LOG.isDebugEnabled()) { LOG.debug("conn-direct_create "); } boolean discard = false; lock.lock(); try { if (activeCount < maxActive) { activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } break; } else { discard = true; } } finally { lock.unlock(); } if (discard) { JdbcUtils.close(pyConnInfo.getPhysicalConnection()); } } } try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("interrupt", e); } try { if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this); StringBuilder errorMsg = new StringBuilder(); errorMsg.append("onFatalError, activeCount ") .append(activeCount) .append(", onFatalErrorMaxActive ") .append(onFatalErrorMaxActive); if (lastFatalErrorTimeMillis > 0) { errorMsg.append(", time '") .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'"); } if (lastFatalErrorSql != null) { errorMsg.append(", sql \n") .append(lastFatalErrorSql); } throw new SQLException( errorMsg.toString(), lastFatalError); } connectCount++; if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0) { createDirect = true; continue; } } if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } if (holder != null) { if (holder.discard) { continue; } activeCount++; holder.active = true; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException(e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this); throw e; } finally { lock.unlock(); } break; } if (holder == null) { long waitNanos = waitNanosLocal.get(); StringBuilder buf = new StringBuilder(128); buf.append("wait millis ")// .append(waitNanos / (1000 * 1000))// .append(", active ").append(activeCount)// .append(", maxActive ").append(maxActive)// .append(", creating ").append(creatingCount)// ; if (creatingCount > 0 && createStartNanos > 0) { long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000); if (createElapseMillis > 0) { buf.append(", createElapseMillis ").append(createElapseMillis); } } if (createErrorCount > 0) { buf.append(", createErrorCount ").append(createErrorCount); } List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); for (int i = 0; i < sqlList.size(); ++i) { if (i != 0) { buf.append('\n'); } else { buf.append(", "); } JdbcSqlStatValue sql = sqlList.get(i); buf.append("runningSqlCount ").append(sql.getRunningCount()); buf.append(" : "); buf.append(sql.getSql()); } String errorMessage = buf.toString(); if (this.createError != null) { throw new GetConnectionTimeoutException(errorMessage, createError); } else { throw new GetConnectionTimeoutException(errorMessage); } } holder.incrementUseCount(); DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); return poolalbeConnection;