項目中常常使用mybatis配合spring進行數據庫操作,但是我們知道,數據的操作是要求做到線程安全的,而且按照原來的jdbc的使用方式,每次操作完成之后都要將連接關閉,但是實際使用中我們並沒有這么干。
更讓人疑惑的點是,spring中默認使用單例形式來加載bean,而往往我們也不會改變這種默認,所以,是所有線程共享數據連接?
讓我們來看看真相!
自然是要個栗子的:
我們來看下spring中配置mybatis數據庫操作bean(使用 druid 連接池):
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"> <property name="url" value="${jdbc.url}" /> <property name="driverClassName" value="${jdbc.driver}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="configLocation" value="classpath:mybatis-config.xml" /> </bean> <!-- scope="prototype" 另說,另討論,我們先以mapper形式看一下 --> <bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate"> <constructor-arg index="0" ref="sqlSessionFactory" /> </bean> <!-- 事務 --> <bean name="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"></property> </bean>
而在java代碼中使用則是使用依賴注入直接使用 @resource sqlSession, 如下:
@Resource private SqlSessionTemplate sqlSession; @Override public User getUser(Map<String, String> cond) { // 此句執行db查詢 User result = sqlSession.selectOne(NAME_SPACE + ".getUser", cond); return result; }
這個sqlSession就是直接去操作數據庫了看起來是這樣,是在bean初始化的時候依賴注入的!
所以,難道每次進入該操作的時候,sqlSession 的實例都會變化嗎?答案是否定的。
那么,肯定就是往下使用的時候才發生的變化唄!
再往下走,可以看到,調用了一個代理來進行具體的查詢!
// org/mybatis/spring/SqlSessionTemplate.selectOne() public <T> T selectOne(String statement, Object parameter) { return this.sqlSessionProxy.<T> selectOne(statement, parameter); }
為啥要用代理呢?自己直接查不就行了嗎?其實,用代理是有好處的,那就可以可以進行另外的包裝!
代理是怎么生成的呢?其實只要看一下 SqlSessionTemplate 的構造方法就知道了!
/** * Constructs a Spring managed {@code SqlSession} with the given * {@code SqlSessionFactory} and {@code ExecutorType}. * A custom {@code SQLExceptionTranslator} can be provided as an * argument so any {@code PersistenceException} thrown by MyBatis * can be custom translated to a {@code RuntimeException} * The {@code SQLExceptionTranslator} can also be null and thus no * exception translation will be done and MyBatis exceptions will be * thrown * * @param sqlSessionFactory * @param executorType * @param exceptionTranslator */ public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required"); notNull(executorType, "Property 'executorType' is required"); this.sqlSessionFactory = sqlSessionFactory; this.executorType = executorType; this.exceptionTranslator = exceptionTranslator; // 生成代理 SqlSessionInterceptor 為 InvocationHandler this.sqlSessionProxy = (SqlSession) newProxyInstance( SqlSessionFactory.class.getClassLoader(), new Class[] { SqlSession.class }, new SqlSessionInterceptor()); }
從上面的代碼,看不到細節,但是,大致還是知道代理的具體實現了!即使用 SqlSessionInterceptor 去處理具體查詢邏輯!
我們來看下 SqlSessionInterceptor 的實現!
/** * Proxy needed to route MyBatis method calls to the proper SqlSession got * from Spring's Transaction Manager * It also unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to * pass a {@code PersistenceException} to the {@code PersistenceExceptionTranslator}. */ private class SqlSessionInterceptor implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { SqlSession sqlSession = getSqlSession( SqlSessionTemplate.this.sqlSessionFactory, SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator); try { Object result = method.invoke(sqlSession, args); if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) { // force commit even on non-dirty sessions because some databases require // a commit/rollback before calling close() sqlSession.commit(true); } return result; } catch (Throwable t) { Throwable unwrapped = unwrapThrowable(t); if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) { // release the connection to avoid a deadlock if the translator is no loaded. See issue #22 closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); sqlSession = null; Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped); if (translated != null) { unwrapped = translated; } } throw unwrapped; } finally { if (sqlSession != null) { closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory); } } } }
SqlSessionInterceptor 是 SqlSessionTemplate 的內部類,目的只有一個,就是處理多個 session 的db操作!
所有請求都被 invoke() 攔截,從而做相應處理:
1. 進入請求,先生成一個新的sqlSession,為本次db操作做准備;
2. 通過反射調用請求進來的方法,將 sqlSession 回調,進行復雜查詢及結果映射;
3. 如果需要立即提交事務,do it;
4. 如果出現異常,包裝異常信息,重新拋出;
5. 操作完成后,關閉本次session;
到這里,其實我們好像已經明白了,其實外面的 sqlSession 單例,並不會影響具體的db操作控制,所以不用擔心session的線程安全問題!
不過,還有個點值得考慮下,如果我一次請求里有多次數據庫操作,難道我真的要創建多個sqlSession或者說數據庫連接?不會吧!
如果這個問題得不到解決,可能你並不真正了解session的定義了!
所以我們需要繼續看一下 session 到底是怎么獲取的?
getSqlSession() 方法是在 SqlSessionUtils 中實現的!如下:
/** * Gets an SqlSession from Spring Transaction Manager or creates a new one if needed. * Tries to get a SqlSession out of current transaction. If there is not any, it creates a new one. * Then, it synchronizes the SqlSession with the transaction if Spring TX is active and * <code>SpringManagedTransactionFactory</code> is configured as a transaction manager. * * @param sessionFactory a MyBatis {@code SqlSessionFactory} to create new sessions * @param executorType The executor type of the SqlSession to create * @param exceptionTranslator Optional. Translates SqlSession.commit() exceptions to Spring exceptions. * @throws TransientDataAccessResourceException if a transaction is active and the * {@code SqlSessionFactory} is not using a {@code SpringManagedTransactionFactory} * @see SpringManagedTransactionFactory */ public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) { notNull(sessionFactory, "No SqlSessionFactory specified"); notNull(executorType, "No ExecutorType specified"); SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory); // 如果已經有holder,則直接返回,復用連接 if (holder != null && holder.isSynchronizedWithTransaction()) { if (holder.getExecutorType() != executorType) { throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction"); } holder.requested(); if (logger.isDebugEnabled()) { logger.debug("Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction"); } return holder.getSqlSession(); } if (logger.isDebugEnabled()) { logger.debug("Creating a new SqlSession"); } SqlSession session = sessionFactory.openSession(executorType); // Register session holder if synchronization is active (i.e. a Spring TX is active) // // Note: The DataSource used by the Environment should be synchronized with the // transaction either through DataSourceTxMgr or another tx synchronization. // Further assume that if an exception is thrown, whatever started the transaction will // handle closing / rolling back the Connection associated with the SqlSession. if (TransactionSynchronizationManager.isSynchronizationActive()) { Environment environment = sessionFactory.getConfiguration().getEnvironment(); if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) { if (logger.isDebugEnabled()) { logger.debug("Registering transaction synchronization for SqlSession [" + session + "]"); } holder = new SqlSessionHolder(session, executorType, exceptionTranslator); TransactionSynchronizationManager.bindResource(sessionFactory, holder); TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory)); holder.setSynchronizedWithTransaction(true); holder.requested(); } else { if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) { if (logger.isDebugEnabled()) { logger.debug("SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional"); } } else { throw new TransientDataAccessResourceException( "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization"); } } } else { if (logger.isDebugEnabled()) { logger.debug("SqlSession [" + session + "] was not registered for synchronization because synchronization is not active"); } } return session; }
如上獲取 sqlSession 邏輯,主要分兩種情況!
1. 如果存在holder,則返回原有的sqlSession,到於這個holder我們稍后再說;
2. 如果沒有,則創建一個新連接!
所以,看起來情況還不是太糟,至少有復用的概念了!
那么問題來了,復用?如何做到線程安全?所以我們要看下 SqlSessionHolder 的實現了!
獲取holder是通過 TransactionSynchronizationManager.getResource(sessionFactory); 獲取的:
public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); // 實際獲取 Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; } private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
咱們忽略對 key 的處理,實際是直接調用 doGetResource() 獲取holder.
而 doGetResource() 中,則使用了 resources 來保存具體的 kv。 resources 明顯是個共享變量,但是看起來這里沒有任何的加鎖操作!這是為何?
只要看一下 resources 的定義就知道了,其實現為 ThreadLocal, 所以是線程安全了!
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
在新的請求進來時,自然是沒有值的,所以直接返回null.而后續進入,則獲取緩存返回!
而對於沒有獲取到 holder 的情況,則需要重新創建一個 session 了!
這里獲取session由DefaultSqlSessionFactory 進行創建!如下:
// org.apache.ibatis.session.defaults.DefaultSqlSessionFactory.openSession() public SqlSession openSession(ExecutorType execType) { return openSessionFromDataSource(execType, null, false); } private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); // SpringManagedTransactionFactory final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }
創建 session 幾件事:
1. 根據環境配置,開啟一個新事務,該事務管理器會負責后續jdbc連接管理工作;
2. 根據事務創建一個 Executor,備用;
3. 用DefaultSqlSession 將 executor 包裝后返回,用於后續真正的db操作;
至此,真正的 sqlSession 已經創建成功!返回后,就可以真正使用了!
等等,創建的session好像並沒有保存,那么還是那個問題,每個sql都會創建一個 sqlSession ? 好吧,是這樣的!前面的holder,只是用於存在事務操作的連接!(holder的理解出了偏差哦)
但是有一點,這里雖然創建了多個 sqlSession 實例,但是並不意味着有多個db連接,具體使用db連接時,則一般會會使用連接池來進行優化!如前面提到的 druid 就是個不錯的選擇!
真實的jdbc連接獲取,是在進行真正的 query 時,才進行調用 getConnection() 進行接入!
具體則是在 doQuery() 時,進行st的組裝時調用的 ,如下:
// SimpleExecutor.prepareStatement() private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException { Statement stmt; // 獲取 jdbc 連接,返回 java.sql.Connection Connection connection = getConnection(statementLog); stmt = handler.prepare(connection); handler.parameterize(stmt); return stmt; } // 調用 BaseExecutor.getConnection() protected Connection getConnection(Log statementLog) throws SQLException { // SpringManagedTransaction 管理 connection Connection connection = transaction.getConnection(); if (statementLog.isDebugEnabled()) { return ConnectionLogger.newInstance(connection, statementLog, queryStack); } else { return connection; } }
通過前面通過事務管理工廠創建的 SpringManagedTransaction 進行 connection 獲取!一個事務管理器只會存在一次獲取數據庫連接的操作!
public Connection getConnection() throws SQLException { if (this.connection == null) { openConnection(); } return this.connection; } // 而 SpringManagedTransaction 又將connection交由 DataSourceUtils 進行管理! // org/springframework/jdbc/datasource/DataSourceUtils public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { // 真正的連接獲取 return doGetConnection(dataSource); } catch (SQLException ex) { throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex); } } /** * Actually obtain a JDBC Connection from the given DataSource. * Same as {@link #getConnection}, but throwing the original SQLException. * <p>Is aware of a corresponding Connection bound to the current thread, for example * when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread * if transaction synchronization is active (e.g. if in a JTA transaction). * <p>Directly accessed by {@link TransactionAwareDataSourceProxy}. * @param dataSource the DataSource to obtain Connections from * @return a JDBC Connection from the given DataSource * @throws SQLException if thrown by JDBC methods * @see #doReleaseConnection */ public static Connection doGetConnection(DataSource dataSource) throws SQLException { Assert.notNull(dataSource, "No DataSource specified"); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested(); if (!conHolder.hasConnection()) { logger.debug("Fetching resumed JDBC Connection from DataSource"); conHolder.setConnection(dataSource.getConnection()); } return conHolder.getConnection(); } // Else we either got no holder or an empty thread-bound holder here. logger.debug("Fetching JDBC Connection from DataSource"); // 通過接入的dataSource進行連接獲取,這里將會是最終的jdbc連接 Connection con = dataSource.getConnection(); if (TransactionSynchronizationManager.isSynchronizationActive()) { logger.debug("Registering transaction synchronization for JDBC Connection"); // Use same Connection for further JDBC actions within the transaction. // Thread-bound object will get removed by synchronization at transaction completion. ConnectionHolder holderToUse = conHolder; if (holderToUse == null) { holderToUse = new ConnectionHolder(con); } else { holderToUse.setConnection(con); } holderToUse.requested(); TransactionSynchronizationManager.registerSynchronization( new ConnectionSynchronization(holderToUse, dataSource)); holderToUse.setSynchronizedWithTransaction(true); if (holderToUse != conHolder) { TransactionSynchronizationManager.bindResource(dataSource, holderToUse); } } return con; }
上面的實現主要做三件事:
1. 再次確認,是否存在事務處理,holder是否存在,如果有則復用;
2. 如果沒有,那再從數據源處獲取連接;
3. 獲取新連接成功后,檢查如果存在事務,則將新獲取的連接放入holder中保存起來,以備下次使用;
獲取jdbc連接后,就可以真正發起execute()查詢了。
數據庫連接的疑問算是解答了!我們發現,外部的框架並沒有多少為我們節省db連接的動作!而是把最終 getConnection() 交給 datasource 數據源!
而真正解決我們連接復用的問題的,是像 Druid 這樣的連接池組件!所以,咱們可以單獨來看這些中間件了!