MyBatis源碼分析(5)——內置DataSource實現


@(MyBatis)[DataSource]

MyBatis源碼分析(5)——內置DataSource實現

MyBatis內置了兩個DataSource的實現:UnpooledDataSource,該數據源對於每次獲取請求都簡單的打開和關閉連接。PooledDataSource,該數據源在Unpooled的基礎上構建了連接池。

UnpooledDataSource

配置

UNPOOLED數據源只有5個屬性需要配置:

driver:JDBC具體數據庫驅動
url:JDBC連接
username:用戶名
password:密碼
defaultTransactionIsolationLevel:默認事務隔離級別

除了以上屬性外,還可以配置驅動的連接信息,但是需要加前綴driver.,如:driver.encoding=UTF8,會將該配置作為參數傳入driverManager.getConnection。

如下:

<dataSource type="UNPOOLED">
	<property name="driver" value="${jdbc.driver}" />
	<property name="url" value="${jdbc.url}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
</dataSource>

實現

UnpooledDataSource,內部通過調用DriverManager來實現,DriverManager是用於管理低層驅動以及創建連接的。

public class UnpooledDataSource implements DataSource {
  
  private ClassLoader driverClassLoader;
  // 驅動連接屬性
  private Properties driverProperties;
  // 所有已注冊的驅動,僅僅用於識別驅動在DriverManager中是否已經被加載進來了
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

  // 當前使用的驅動
  private String driver;
  private String url;
  private String username;
  private String password;

  private Boolean autoCommit;
  // 默認事務隔離級別
  private Integer defaultTransactionIsolationLevel;

  static {
	// 靜態代碼塊,當類加載的時候,就從DriverManager中獲取所有的驅動信息,放到當前維護的Map中
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

  // 省略了部分代碼...	

  public Connection getConnection() throws SQLException {
    // 獲取數據庫連接
    return doGetConnection(username, password);
  }

  public Connection getConnection(String username, String password) throws SQLException {
    return doGetConnection(username, password);
  }

  private Connection doGetConnection(String username, String password) throws SQLException {
    // 這里通過url加Properties來獲取連接,是因為可以在配置文件中配置數據庫連接的信息,比如編碼之類的
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    // 初始化驅動信息
    initializeDriver();
    // 從DriverManager中獲取數據庫連接
    Connection connection = DriverManager.getConnection(url, properties);
    // 配置連接信息,自動提交以及事務隔離級別
    configureConnection(connection);
    return connection;
  }

  private synchronized void initializeDriver() throws SQLException {
    // 如果沒有包含在已注冊Map中,則需要將該驅動加載進來
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        // 加載數據庫連接驅動
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          // Resources為MyBatis內置的資源工具類,該方法依次嘗試從多個ClassLoader中獲取Class類,順序為:配置的classLoader,默認的defaultClassLoader,當前線程的getContextClassLoader,當前類的getClass().getClassLoader(),系統的systemClassLoader
          driverType = Resources.classForName(driver);
        }
        // 創建驅動實例
        Driver driverInstance = (Driver)driverType.newInstance();
        // 注冊到DriverManager中,用於創建數據庫連接
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        // 放到已注冊Map中
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }

  private void configureConnection(Connection conn) throws SQLException {
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      // 如果已開啟了事務,則可以將自動提交關閉
      conn.setAutoCommit(autoCommit);
    }
    if (defaultTransactionIsolationLevel != null) {
        //設置事務隔離級別 
        conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }
}

PooledDataSource

配置

除了UNPOOLED的配置外,還可以配置其它的一些屬性,如下:

poolMaximumActiveConnections:最大活動連接數(默認為10)
poolMaximumIdleConnections:最大空閑連接數(默認為5)
poolMaximumCheckoutTime:最大可回收時間,即當達到最大活動鏈接數時,此時如果有程序獲取連接,則檢查最先使用的連接,看其是否超出了該時間,如果超出了該時間,則可以回收該連接。(默認20s)
poolTimeToWait:沒有連接時,重嘗試獲取連接以及打印日志的時間間隔(默認20s)
poolPingQuery:檢查連接正確的語句,默認為"NO PING QUERY SET",即沒有,使用會導致拋異常
poolPingEnabled:是否開啟ping檢測,(默認:false)
poolPingConnectionsNotUsedFor:設置ping檢測時間間隔,通常用於檢測超時連接(默認為0,即當開啟檢測后每次從連接詞中獲取連接以及放回連接池都需要檢測)

示例配置如下:

<dataSource type="POOLED">
	<property name="driver" value="${jdbc.driver}" />
	<property name="url" value="${jdbc.url}" />
	<property name="username" value="${jdbc.username}" />
	<property name="password" value="${jdbc.password}" />
	<property name="poolMaximumActiveConnections" value="20" />
	<property name="poolMaximumIdleConnections" value="10" />
	<property name="poolMaximumCheckoutTime" value="15" />
	<property name="poolTimeToWait" value="10" />
	<property name="poolPingQuery" value="select 1 from dual" />
	<property name="poolPingEnabled" value="true" />
	<property name="poolPingConnectionsNotUsedFor" value="0" />
</dataSource>

當日志開啟DEBUG輸出,使用Mapper操作時,可以看到:

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection()]: 
Created connection 395084181.

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.pingConnection()]: 
Testing connection 395084181 ...

2016-07-31 21:30:45 [DEBUG]-[Thread: main]-[org.apache.ibatis.datasource.pooled.PooledDataSource.pingConnection()]: 
Connection 395084181 is GOOD!

實現

連接池的實現,核心的思想在於,將連接緩存起來,即可以通過兩個鏈表(/隊列)來實現,一個用於維持活動連接,另一個維持空閑連接。還有另外一點就是關閉連接后返回給連接池或者釋放掉,這里可以通過代理來實現,當客戶端調用close時,並不是直接關閉連接,而是將其緩存起來,放到空閑鏈表中。這里使用代理還有個優點,每次釋放的時候,重新創建新的代理連接來封裝,並且將原有的代理設置為無效,可以使得程序即使持有原有的代理,也不會影響到回收的連接。
在MyBatis中,主要是通過PoolState來維護連接狀態,以及通過PooledConnection代理來實現歸還連接操作。


PooledConnection

這里主要是通過Java Proxy代理來實現的。

// 實現代理接口
class PooledConnection implements InvocationHandler {

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
  	
  // 省略了部分代碼...

  private PooledDataSource dataSource;
  private Connection realConnection;
  private Connection proxyConnection;

  public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    // 創建代理
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

  // 代理實現Connection接口的所有方法,只對CLOSE方法特別處理
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      // 如果為CLOSE方法,那么就將其放回連接池中
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          checkConnection();
        }
        // 其他方法則直接調用實際的連接來處理
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }
}

PoolState

連接池的狀態,用於維護活動連接,空閑連接,以及統計一些連接信息。

public class PoolState {

  protected PooledDataSource dataSource;

  // 空閑連接
  protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
  // 當前活動連接
  protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
  // 請求次數
  protected long requestCount = 0;
  // 請求獲得連接所需時間
  protected long accumulatedRequestTime = 0;
  // 統計連接使用時間
  protected long accumulatedCheckoutTime = 0;
  // 統計過期回收連接數
  protected long claimedOverdueConnectionCount = 0;
  // 統計連接過期使用時間
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  // 統計獲取連接需要等待的時間
  protected long accumulatedWaitTime = 0;
  // 統計獲取連接需要等待的次數
  protected long hadToWaitCount = 0;
  // 統計無效連接個數
  protected long badConnectionCount = 0;

  public PoolState(PooledDataSource dataSource) {
    this.dataSource = dataSource;
  }
  
  // 省略了部分Getter方法...
}

PooledDataSource獲取連接

連接池主要是通過popConnection來實現連接的創建以及分配的,過程不復雜,當有空閑連接時則直接使用,否則再根據是否達到了設置的峰值,來決定是否需要創建新的連接等。

流程圖如下:

popConnection實現:
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      // 加鎖訪問
      synchronized (state) {
        if (state.idleConnections.size() > 0) {
          // 有空閑連接,直接獲取
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
	      // 空閑連接不足
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // 小於最大活動連接數,直接建立新的連接,並封裝代理
            conn = new PooledConnection(dataSource.getConnection(), this);
            Connection realConn = conn.getRealConnection();
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // 超出最大活動連接數,不能創建連接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            // 獲取使用時間最長的活動連接,並計算使用的時間
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 超出了最大可回收時間,直接回收該連接,回收過期次數增加
              state.claimedOverdueConnectionCount++;
              // 統計過期回收時間增加
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              // 統計使用時間增加
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 將連接從活動隊列中移除
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                // 如果不是自動提交事務,則將其回滾,因為可能存在一些操作
                oldestActiveConnection.getRealConnection().rollback();
              }
              // 使用新的代理封裝,可以使得不會被原有的影響
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              // 將之前的代理設置為無效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait
              try {
                if (!countedWait) {
                  // 增加獲取連接需要等待的次數
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                // 等待
                state.wait(poolTimeToWait);
                // 增加獲取連接的等待時間
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                // 被中斷,退出嘗試以及等待
                break;
              }
            }
          }
        }
        if (conn != null) {
          if (conn.isValid()) {
            
            if (!conn.getRealConnection().getAutoCommit()) {
              // 連接為非自動提交事務,則將其回滾,可能存在一些未提交操作,並且防止影響下一次使用 
              conn.getRealConnection().rollback();
            }
            // 根據URL,用戶名以及密碼計算出一個Hash,用於標識此次連接
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            // 設置當前連接開始使用時間
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            // 設置最后一次使用時間
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            // 加入活動隊列中
            state.activeConnections.add(conn);
            // 統計請求次數
            state.requestCount++;
            // 統計獲取連接所需時間
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            // 無效連接
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            // 統計無效連接個數
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

	// 從上面的循環退出,如果為null,則一定出現異常情況了
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

PooledDataSource回收連接

通過使用代理,就可以在用戶調用Close關閉數據庫連接的時候,根據當前狀態來決定放入空閑鏈表中還是釋放掉。

流程圖如下:

pushConnection實現:
  // 省略了部分日志代碼
  protected void pushConnection(PooledConnection conn) throws SQLException {
    synchronized (state) {
      // 從活動鏈表中移除當前連接
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 當前連接有效的話判斷是否達到了最大空閑連接數,以及當前的連接是否變更過(即用戶名,密碼,Url等變更)
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          // 統計使用連接時長
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            // 沒有自動提交的話,先回滾,防止影響下一次使用
            conn.getRealConnection().rollback();
          }
          // 重新創建一個代理連接來封裝,可以使得當前使用的連接不會被原有的代理連接影響
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          // 放回空閑鏈表中
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 將原有的代理連接設置為無效
          conn.invalidate();
          // 通知等待獲取連接的線程(不去判斷是否真的有線程在等待)
          state.notifyAll();
        } else {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 超出空閑連接限制,則直接釋放當前連接
          conn.getRealConnection().close();
          // 將原有的代理連接設置為無效
          conn.invalidate();
        }
      } else {
        // 連接無效,則統計無效連接個數
        state.badConnectionCount++;
      }
    }
  }

連接池調優

// 經驗不足,后續補充

MyBatis DataSource集成

// 后續看完Spring-MyBatis再補充

參考

  1. MyBatis官方文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM