人人快速開發平台 renren-fast 源碼分析--多數據源


項目中可以根據注解聲明的數據庫,在特定的方法中切換數據源。下面就看看它是怎么做到的(盡管我不知道這么做有什么用)
首先看 Springboot 的 config 類有沒有數據源相關的,從RenrenApplication找一下子就找到

/**

 * 配置多數據源 * @author chenshun * @email sunlightcs@gmail.com * @date 2017/8/19 0:41 */ @Configuration public class DynamicDataSourceConfig { @Bean @ConfigurationProperties("spring.datasource.druid.first") public DataSource firstDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @ConfigurationProperties("spring.datasource.druid.second") public DataSource secondDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @Primary public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(DataSourceNames.FIRST, firstDataSource); targetDataSources.put(DataSourceNames.SECOND, secondDataSource); return new DynamicDataSource(firstDataSource, targetDataSources); } }

根據數據源的配置,找到datasources包有個DynamicDataSource

/** * 動態數據源 * @author chenshun * @email sunlightcs@gmail.com * @date 2017/8/19 1:03 */ public class DynamicDataSource extends AbstractRoutingDataSource { private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) { super.setDefaultTargetDataSource(defaultTargetDataSource); super.setTargetDataSources(targetDataSources); super.afterPropertiesSet(); } @Override protected Object determineCurrentLookupKey() { return getDataSource(); } public static void setDataSource(String dataSource) { contextHolder.set(dataSource); } public static String getDataSource() { return contextHolder.get(); } public static void clearDataSource() { contextHolder.remove(); } }
光是看這個類其實看不出什么東西,只知道有個ThreadLocal常量,並且可以用靜態方法配置它。那么我們看看父類AbstractRoutingDataSource
/**
 * Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()} * calls to one of various target DataSources based on a lookup key. The latter is usually * (but not necessarily) determined through some thread-bound transaction context. * * @author Juergen Hoeller * @since 2.0.1 * @see #setTargetDataSources * @see #setDefaultTargetDataSource * @see #determineCurrentLookupKey() */ public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean { @Nullable private Map<Object, Object> targetDataSources; @Nullable private Object defaultTargetDataSource; private boolean lenientFallback = true; private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup(); @Nullable private Map<Object, DataSource> resolvedDataSources; @Nullable private DataSource resolvedDefaultDataSource; /** * Specify the map of target DataSources, with the lookup key as key. * The mapped value can either be a corresponding {@link javax.sql.DataSource} * instance or a data source name String (to be resolved via a * {@link #setDataSourceLookup DataSourceLookup}). * <p>The key can be of arbitrary type; this class implements the * generic lookup process only. The concrete key representation will * be handled by {@link #resolveSpecifiedLookupKey(Object)} and * {@link #determineCurrentLookupKey()}. */ public void setTargetDataSources(Map<Object, Object> targetDataSources) { this.targetDataSources = targetDataSources; } /** * Specify the default target DataSource, if any. * <p>The mapped value can either be a corresponding {@link javax.sql.DataSource} * instance or a data source name String (to be resolved via a * {@link #setDataSourceLookup DataSourceLookup}). * <p>This DataSource will be used as target if none of the keyed * {@link #setTargetDataSources targetDataSources} match the * {@link #determineCurrentLookupKey()} current lookup key. */ public void setDefaultTargetDataSource(Object defaultTargetDataSource) { this.defaultTargetDataSource = defaultTargetDataSource; } /** * Specify whether to apply a lenient fallback to the default DataSource * if no specific DataSource could be found for the current lookup key. * <p>Default is "true", accepting lookup keys without a corresponding entry * in the target DataSource map - simply falling back to the default DataSource * in that case. * <p>Switch this flag to "false" if you would prefer the fallback to only apply * if the lookup key was {@code null}. Lookup keys without a DataSource * entry will then lead to an IllegalStateException. * @see #setTargetDataSources * @see #setDefaultTargetDataSource * @see #determineCurrentLookupKey() */ public void setLenientFallback(boolean lenientFallback) { this.lenientFallback = lenientFallback; } /** * Set the DataSourceLookup implementation to use for resolving data source * name Strings in the {@link #setTargetDataSources targetDataSources} map. * <p>Default is a {@link JndiDataSourceLookup}, allowing the JNDI names * of application server DataSources to be specified directly. */ public void setDataSourceLookup(@Nullable DataSourceLookup dataSourceLookup) { this.dataSourceLookup = (dataSourceLookup != null ? dataSourceLookup : new JndiDataSourceLookup()); } @Override public void afterPropertiesSet() { if (this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } this.resolvedDataSources = new HashMap<>(this.targetDataSources.size()); this.targetDataSources.forEach((key, value) -> { Object lookupKey = resolveSpecifiedLookupKey(key); DataSource dataSource = resolveSpecifiedDataSource(value); this.resolvedDataSources.put(lookupKey, dataSource); }); if (this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource); } } /** * Resolve the given lookup key object, as specified in the * {@link #setTargetDataSources targetDataSources} map, into * the actual lookup key to be used for matching with the * {@link #determineCurrentLookupKey() current lookup key}. * <p>The default implementation simply returns the given key as-is. * @param lookupKey the lookup key object as specified by the user * @return the lookup key as needed for matching */ protected Object resolveSpecifiedLookupKey(Object lookupKey) { return lookupKey; } /** * Resolve the specified data source object into a DataSource instance. * <p>The default implementation handles DataSource instances and data source * names (to be resolved via a {@link #setDataSourceLookup DataSourceLookup}). * @param dataSource the data source value object as specified in the * {@link #setTargetDataSources targetDataSources} map * @return the resolved DataSource (never {@code null}) * @throws IllegalArgumentException in case of an unsupported value type */ protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException { if (dataSource instanceof DataSource) { return (DataSource) dataSource; } else if (dataSource instanceof String) { return this.dataSourceLookup.getDataSource((String) dataSource); } else { throw new IllegalArgumentException( "Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource); } } @Override public Connection getConnection() throws SQLException { return determineTargetDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return determineTargetDataSource().getConnection(username, password); } @Override @SuppressWarnings("unchecked") public <T> T unwrap(Class<T> iface) throws SQLException { if (iface.isInstance(this)) { return (T) this; } return determineTargetDataSource().unwrap(iface); } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { return (iface.isInstance(this) || determineTargetDataSource().isWrapperFor(iface)); } /** * Retrieve the current target DataSource. Determines the * {@link #determineCurrentLookupKey() current lookup key}, performs * a lookup in the {@link #setTargetDataSources targetDataSources} map, * falls back to the specified * {@link #setDefaultTargetDataSource default target DataSource} if necessary. * @see #determineCurrentLookupKey() */ protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; } /** * Determine the current lookup key. This will typically be * implemented to check a thread-bound transaction context. * <p>Allows for arbitrary keys. The returned key needs * to match the stored lookup key type, as resolved by the * {@link #resolveSpecifiedLookupKey} method. */ @Nullable protected abstract Object determineCurrentLookupKey(); }

根據注釋可以看出,這個類重寫了getConnection()方法。而getConnection()方法實現的邏輯就是從determineCurrentLookupKey()中獲取數據源,然后打開連接。結合自己實現的DynamicDataSource我們不難得出一個結論:項目是根據ThreadLocal常量來獲取數據庫連接的。這個我們后面再證實,先來看看項目中是什么時候切換數據庫的。

datasources包下還有個aspect包,一看就讓人覺得是通過 aop 來切換數據源的了。直接來看代碼
DataSourceAspect

/**

 * 多數據源,切面處理類 * @author chenshun * @email sunlightcs@gmail.com * @date 2017/9/16 22:20 */ @Aspect @Component public class DataSourceAspect implements Ordered { protected Logger logger = LoggerFactory.getLogger(getClass()); @Pointcut("@annotation(io.renren.datasources.annotation.DataSource)") public void dataSourcePointCut() { } @Around("dataSourcePointCut()") public Object around(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); DataSource ds = method.getAnnotation(DataSource.class); if(ds == null){ DynamicDataSource.setDataSource(DataSourceNames.FIRST); logger.debug("set datasource is " + DataSourceNames.FIRST); }else { DynamicDataSource.setDataSource(ds.name()); logger.debug("set datasource is " + ds.name()); } try { return point.proceed(); } finally { DynamicDataSource.clearDataSource(); logger.debug("clean datasource"); } } @Override public int getOrder() { return 1; } }

果不其然,是根據 aop 來取出調用的方法的@DataSource中的數據源字符串值,決定用什么數據源的。結合上面的代碼,可以得出結論:是在調用有@DataSource修飾的方法時,修改的當前線程數據源。印象中 Servlet 默認是一個請求一條線程處理,所以是這個請求內的數據源都改變了。

全局搜索一下@DataSource,發現只有一個測試的 service 有用到

/** * 測試多數據源 * * @author Mark sunlightcs@gmail.com * @since 3.1.0 2018-01-28 */ @Service public class DataSourceTestService { @Autowired private SysUserService sysUserService; public SysUserEntity queryUser(Long userId){ return sysUserService.selectById(userId); } @DataSource(name = DataSourceNames.SECOND) public SysUserEntity queryUser2(Long userId){ return sysUserService.selectById(userId); } }

那么切換數據源這部分,在項目源碼中就到此為止了。

我們探究一下在 Mybatis 中是什么時候根據數據源來創建連接的。

首先我們要知道 Mybatis 的工作流程。這個讀一讀官網就知道,Mybatis-Spring 先通過SqlSessionFactoryBean創建SqlSessionFactory實例,然后在適當的時候創建SqlSession,創建SqlSession的時候應該已經打開數據庫連接了,因為要管理事務的。所以我們其實可以直接找到SqlSessionFactory的實現類:DefaultSqlSessionFactory

public class DefaultSqlSessionFactory implements SqlSessionFactory { private final Configuration configuration; public DefaultSqlSessionFactory(Configuration configuration) { this.configuration = configuration; } @Override public SqlSession openSession() { return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, false); } @Override public SqlSession openSession(boolean autoCommit) { return openSessionFromDataSource(configuration.getDefaultExecutorType(), null, autoCommit); } @Override public SqlSession openSession(ExecutorType execType) { return openSessionFromDataSource(execType, null, false); } @Override public SqlSession openSession(TransactionIsolationLevel level) { return openSessionFromDataSource(configuration.getDefaultExecutorType(), level, false); } @Override public SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level) { return openSessionFromDataSource(execType, level, false); } @Override public SqlSession openSession(ExecutorType execType, boolean autoCommit) { return openSessionFromDataSource(execType, null, autoCommit); } @Override public SqlSession openSession(Connection connection) { return openSessionFromConnection(configuration.getDefaultExecutorType(), connection); } @Override public SqlSession openSession(ExecutorType execType, Connection connection) { return openSessionFromConnection(execType, connection); } @Override public Configuration getConfiguration() { return configuration; } private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) { try { boolean autoCommit; try { autoCommit = connection.getAutoCommit(); } catch (SQLException e) { // Failover to true, as most poor drivers // or databases won't support transactions autoCommit = true; } final Environment environment = configuration.getEnvironment(); final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); final Transaction tx = transactionFactory.newTransaction(connection); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } } private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) { if (environment == null || environment.getTransactionFactory() == null) { return new ManagedTransactionFactory(); } return environment.getTransactionFactory(); } private void closeTransaction(Transaction tx) { if (tx != null) { try { tx.close(); } catch (SQLException ignore) { // Intentionally ignore. Prefer previous error. } } } }

從這個 factory 的源碼可以讀出,創建SqlSession調用的是openSessionFromDataSource()方法。在這個方法中:

  1. 先是從configuration中拿到enviroment,熟悉 Mybatis 的朋友都知道我們配置數據源就是在enviroment
  2. 然后調用getTransactionFactoryFromEnvironment()創建了TransactionFactory實例
  3. 根據TransactionFactory開啟了事務tx
  4. 根據這個事務tx創建了執行類executor
  5. 最后創建了一個DefaultSqlSession,也就是我們用來做對數據庫查詢操作的類。

一步步來看是什么時候打開的連接吧

 
            

創建 TransactionFactory 實例

直接看getTransactionFactoryFromEnvironment()方法源碼。

private TransactionFactory getTransactionFactoryFromEnvironment(Environment environment) { if (environment == null || environment.getTransactionFactory() == null) { return new ManagedTransactionFactory(); } return environment.getTransactionFactory(); }
如果沒有配置 transactionFactory,那么返回ManagedTransactionFactory
ManagedTransactionFactory.java
public class ManagedTransactionFactory implements TransactionFactory {
 private boolean closeConnection = true; @Override public void setProperties(Properties props) { if (props != null) { String closeConnectionProperty = props.getProperty("closeConnection"); if (closeConnectionProperty != null) { closeConnection = Boolean.valueOf(closeConnectionProperty); } } } @Override public Transaction newTransaction(Connection conn) { return new ManagedTransaction(conn, closeConnection); } @Override public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) { // Silently ignores autocommit and isolation level, as managed transactions are entirely // controlled by an external manager. It's silently ignored so that // code remains portable between managed and unmanaged configurations. return new ManagedTransaction(ds, level, closeConnection); } }
 
            

根據 TransactionFactory 開啟了事務

ManagedTransaction.java

/** * {@link Transaction} that lets the container manage the full lifecycle of the transaction. * Delays connection retrieval until getConnection() is called. * Ignores all commit or rollback requests. * By default, it closes the connection but can be configured not to do it. * * @author Clinton Begin * * @see ManagedTransactionFactory */ public class ManagedTransaction implements Transaction { private static final Log log = LogFactory.getLog(ManagedTransaction.class); private DataSource dataSource; private TransactionIsolationLevel level; private Connection connection; private final boolean closeConnection; public ManagedTransaction(Connection connection, boolean closeConnection) { this.connection = connection; this.closeConnection = closeConnection; } public ManagedTransaction(DataSource ds, TransactionIsolationLevel level, boolean closeConnection) { this.dataSource = ds; this.level = level; this.closeConnection = closeConnection; } @Override public Connection getConnection() throws SQLException { if (this.connection == null) { openConnection(); } return this.connection; } @Override public void commit() throws SQLException { // Does nothing } @Override public void rollback() throws SQLException { // Does nothing } @Override public void close() throws SQLException { if (this.closeConnection && this.connection != null) { if (log.isDebugEnabled()) { log.debug("Closing JDBC Connection [" + this.connection + "]"); } this.connection.close(); } } protected void openConnection() throws SQLException { if (log.isDebugEnabled()) { log.debug("Opening JDBC Connection"); } this.connection = this.dataSource.getConnection(); if (this.level != null) { this.connection.setTransactionIsolation(this.level.getLevel()); } } @Override public Integer getTimeout() throws SQLException { return null; } }
這里有個openConnection()方法根據dataSource打開了數據庫連接。這跟我們最初自己配的動態數據源連接上了,接下來就看何時調用了。

根據這個事務創建了執行類 executor

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
 executorType = executorType == null ? defaultExecutorType : executorType; executorType = executorType == null ? ExecutorType.SIMPLE : executorType; Executor executor; if (ExecutorType.BATCH == executorType) { executor = new BatchExecutor(this, transaction); } else if (ExecutorType.REUSE == executorType) { executor = new ReuseExecutor(this, transaction); } else { executor = new SimpleExecutor(this, transaction); } if (cacheEnabled) { executor = new CachingExecutor(executor); } executor = (Executor) interceptorChain.pluginAll(executor); return executor; }
可以看出這個executor還用interceptorChain添加了 plugins,這跟常用的分頁插件也有關,這里就不多說了。
 
         
         
        

創建 DefaultSqlSession

這個類代碼太多了,我就僅貼出關鍵部分

@Override public Connection getConnection() { try { return executor.getTransaction().getConnection(); } catch (SQLException e) { throw ExceptionFactory.wrapException("Error getting a new connection. Cause: " + e, e); } }
 
        

這里調用了我們之前創建的事務類的getConnection()方法,在那個方法中打開了我們自定義的數據源的連接。而在我們自定義的DynamicDataSource中,getConnection()又是根據ThreadLocal常量來獲取數據源的,所以只要當前請求的線程中被自定義切面類修改了數據源,那么等到這個線程要用 Mybatis打開數據源的連接的時候,就會打開切換過后的數據源的連接了。

本篇文章內容也不多,不過根據上述的 Mybatis 的工作流程來讀它的源碼,很容易就可以讀出其他功能是怎么實現的,比如說 Mybatis-Spring 中的注入 Mapper,實際上是通過 Mybatis 的動態代理,解析調用的 Mapper 接口的方法對象,獲取注解、方法名、參數等等信息,再用SqlSession來調用。

來源:簡書 鏈接:https://www.jianshu.com/p/6007d5777cf2


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM