前言
之前使用的讀寫分離的方案是在mybatis中配置兩個數據源,然后生成兩個不同的SqlSessionTemplate然后手動去識別執行sql語句是操作主庫還是從庫。如下圖所示:
好處是,你可以人為的去控制操作的數據庫。缺點也顯而易見,就是代碼非常麻煩,總是需要去判斷使用什么庫,而且遇到事務的時候還必須特別小心。
這次我們利用spring抽象路由數據源+MyBatis攔截器來實現自動的讀寫分離,並且保證在使用事務的情況下也能正確。結構如下圖所示

我們還是按照老套路,首先我會直接進行代碼的實現,然后根據源碼進行分析,最后做一個總結。
代碼實現
我們一共需要5個類和兩個配置文件
首先來說類
/**
* 全局動態數據源實體
* @author LinkinStar
*
*/
public enum DynamicDataSourceGlobal {
READ, WRITE;
}
這是一個枚舉的實體,后面會用到
/**
* 動態數據源線程持有者
* @author LinkinStar
*
*/
public final class DynamicDataSourceHolder {
private static final ThreadLocal<DynamicDataSourceGlobal> holder = new ThreadLocal<DynamicDataSourceGlobal>();
/**
* 設置當前線程使用的數據源
*/
public static void putDataSource(DynamicDataSourceGlobal dataSource){
holder.set(dataSource);
}
/**
* 獲取當前線程需要使用的數據源
*/
public static DynamicDataSourceGlobal getDataSource(){
return holder.get();
}
/**
* 清空使用的數據源
*/
public static void clearDataSource() {
holder.remove();
}
}
以上是兩個工具,下面就是重點了
一個是我們的主角,動態數據源,它繼承自spring的抽象動態路由數據源
/**
* 動態數據源(繼承自spring抽象動態路由數據源)
* @author LinkinStar
*
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
private Object writeDataSource; //寫數據源
private Object readDataSource; //讀數據源
/**
* 在初始化之前被調用,設置默認數據源,以及數據源資源(這里的寫法是參考源碼中的)
*/
@Override
public void afterPropertiesSet() {
//如果寫數據源不存在,則拋出非法異常
if (this.writeDataSource == null) {
throw new IllegalArgumentException("Property ‘writeDataSource‘ is required");
}
//設置默認目標數據源為主庫
setDefaultTargetDataSource(writeDataSource);
//設置所有數據源資源,有從庫添加,沒有就添加
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DynamicDataSourceGlobal.WRITE.name(), writeDataSource);
if(readDataSource != null) {
targetDataSources.put(DynamicDataSourceGlobal.READ.name(), readDataSource);
}
setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
/**
* 這是AbstractRoutingDataSource類中的一個抽象方法,而它的返回值是你所要用的數據源dataSource的key值
*/
@Override
protected Object determineCurrentLookupKey() {
//根據當前線程所使用的數據源進行切換
DynamicDataSourceGlobal dynamicDataSourceGlobal = DynamicDataSourceHolder.getDataSource();
//如果沒有被賦值,那么默認使用主庫
if(dynamicDataSourceGlobal == null
|| dynamicDataSourceGlobal == DynamicDataSourceGlobal.WRITE) {
return DynamicDataSourceGlobal.WRITE.name();
}
//其他情況使用從庫
return DynamicDataSourceGlobal.READ.name();
}
public void setWriteDataSource(Object writeDataSource) {
this.writeDataSource = writeDataSource;
}
public Object getWriteDataSource() {
return writeDataSource;
}
public Object getReadDataSource() {
return readDataSource;
}
public void setReadDataSource(Object readDataSource) {
this.readDataSource = readDataSource;
}
}
然后是我們的另一個主角,動態數據源插件,實現MyBatis攔截器接口
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.Interceptor;
import org.apache.ibatis.plugin.Intercepts;
import org.apache.ibatis.plugin.Invocation;
import org.apache.ibatis.plugin.Plugin;
import org.apache.ibatis.plugin.Signature;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* 動態數據源插件,實現MyBatis攔截器接口
* @author LinkinStar
*
*/
@Intercepts({
@Signature(type = Executor.class, method = "update", args = {
MappedStatement.class, Object.class }),
@Signature(type = Executor.class, method = "query", args = {
MappedStatement.class, Object.class, RowBounds.class,
ResultHandler.class }) })
public class DynamicPlugin implements Interceptor {
/**
* 匹配SQL語句的正則表達式
*/
private static final String REGEX = ".*insert\\u0020.*|.*delete\\u0020.*|.*update\\u0020.*";
/**
* 這個map用於存放已經執行過的sql語句所對應的數據源
*/
private static final Map<String, DynamicDataSourceGlobal> cacheMap = new ConcurrentHashMap<>();
@Override
public Object intercept(Invocation invocation) throws Throwable {
boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
if (!actualTransactionActive) {
return invocation.proceed();
}
//從代理類參數中獲取參數
Object[] objects = invocation.getArgs();
//其中參數的第一個值為執行的sql語句
MappedStatement ms = (MappedStatement) objects[0];
//當前sql語句所應該使用的數據源,通過sql語句的id從map中獲取,如果獲取到,則之前已經執行過直接取,
DynamicDataSourceGlobal dynamicDataSourceGlobal = cacheMap.get(ms.getId());
if (dynamicDataSourceGlobal != null) {
DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
return invocation.proceed();
}
//如果沒有,則重新進行存放
//ms中獲取方法,如果是查詢方法
if (ms.getSqlCommandType().equals(SqlCommandType.SELECT)) {
//!selectKey 為自增id查詢主鍵(SELECT LAST_INSERT_ID() )方法,使用主庫
if(ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
//通過正則表達式匹配,確定使用那一個數據源
String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replaceAll("[\\t\\n\\r]", " ");
if(sql.matches(REGEX)) {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
} else {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.READ;
}
}
} else {
dynamicDataSourceGlobal = DynamicDataSourceGlobal.WRITE;
}
//將sql對應使用的數據源放進map中存放
cacheMap.put(ms.getId(), dynamicDataSourceGlobal);
//最后設置使用的數據源
DynamicDataSourceHolder.putDataSource(dynamicDataSourceGlobal);
//執行代理之后的方法
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
return Plugin.wrap(target, this);
} else {
return target;
}
}
@Override
public void setProperties(Properties properties) {
}
}
最后是我們的配角,動態數據源的事務管理器
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
/**
* 動態數據源事務管理器
* @author LinkinStar
*
*/
public class DynamicDataSourceTransactionManager extends DataSourceTransactionManager {
private static final long serialVersionUID = 1L;
/**
* 只讀事務到讀庫,讀寫事務到寫庫
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
//根據事務可讀性進行判斷
boolean readOnly = definition.isReadOnly();
//只讀類型事務可以只用從庫
if(readOnly) {
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.READ);
} else {
DynamicDataSourceHolder.putDataSource(DynamicDataSourceGlobal.WRITE);
}
super.doBegin(transaction, definition);
}
/**
* 清理本地線程的數據源(會被默認調用,調用時清除相應數據源)
*/
@Override
protected void doCleanupAfterCompletion(Object transaction) {
super.doCleanupAfterCompletion(transaction);
DynamicDataSourceHolder.clearDataSource();
}
}
然后是兩個配置文件,根據你自己的需要進行修改
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd">
<context:property-placeholder location="classpath:resources/jdbc.properties"/>
<bean id="abstractDataSource" abstract="true" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="minIdle" value="${jdbc.minIdle}"></property>
<property name="maxIdle" value="${jdbc.maxIdle}"></property>
<property name="maxWait" value="${jdbc.maxWait}"></property>
<property name="maxActive" value="${jdbc.maxActive}"></property>
<property name="initialSize" value="${jdbc.initialSize}"></property>
<property name="testWhileIdle"><value>true</value></property>
<property name="testOnBorrow"><value>true</value></property>
<property name="testOnReturn"><value>false</value></property>
<property name="validationQuery"><value>SELECT 1 FROM DUAL</value></property>
<property name="validationQueryTimeout"><value>1</value></property>
<property name="timeBetweenEvictionRunsMillis"><value>3000</value></property>
<property name="numTestsPerEvictionRun"><value>2</value></property>
</bean>
<bean id="dataSourceRead" parent="abstractDataSource">
<property name="url" value="${jdbc.url.read}" />
<property name="username" value="${jdbc.username.read}"/>
<property name="password" value="${jdbc.password.read}"/>
</bean>
<bean id="dataSourceWrite" parent="abstractDataSource">
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
</bean>
<bean id="dataSource" class="com.ssm.dao.data.DynamicDataSource">
<property name="writeDataSource" ref="dataSourceWrite"></property>
<property name="readDataSource" ref="dataSourceRead"></property>
</bean>
<!--配置基於注解的聲明式事務,默認使用注解來管理事務行為-->
<tx:annotation-driven transaction-manager="transactionManager"/>
<!--配置事務管理器(mybatis采用的是JDBC的事務管理器)-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!--注入數據庫連接池-->
<property name="dataSource" ref="dataSource" />
<!--掃描entity包,使用別名,多個用;隔開-->
<property name="typeAliasesPackage" value="com/ssm/entity" />
<!--掃描sql配置文件:mapper需要的xml文件-->
<property name="mapperLocations" value="classpath*:com/ssm/dao/sqlxml/*.xml"></property>
<property name="plugins">
<array>
<bean class="com.ssm.dao.data.DynamicPlugin" />
</array>
</property>
</bean>
<bean id="sqlSessionTemplate" class="org.mybatis.spring.SqlSessionTemplate">
<constructor-arg name="sqlSessionFactory" ref="sqlSessionFactory" />
</bean>
<!--配置掃描Dao接口包,動態實現DAO接口,注入到spring容器-->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!--注入SqlSessionFactory-->
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
<!-- 給出需要掃描的Dao接口-->
<property name="basePackage" value="com.ssm.dao"/>
</bean>
</beans>
另外就是jdbc的配置文件,也需要根據自己進行修改,這邊使用兩個
jdbc.driverClassName=com.mysql.jdbc.Driver jdbc.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8 jdbc.username=root jdbc.password=123456 jdbc.url.read=jdbc:mysql://localhost:3306/xxx?useUnicode=true&characterEncoding=UTF-8 jdbc.username.read=root jdbc.password.read=123456 jdbc.maxActive = 2 jdbc.maxIdle =5 jdbc.minIdle=1 jdbc.initialSize =3 jdbc.maxWait =3000
至此所有的配置都已經完成,現在你已經可以進行測試,看看在查詢和新增的時候是否使用的是不同的數據庫。
看看在使用事務的情況下,是否使用相同的數據庫。
實現分析
首先我們來分析兩個主角
動態數據源(繼承自spring抽象動態路由數據源)
先看一下源碼中父類的說明
/**
* Abstract {@link javax.sql.DataSource} implementation that routes {@link #getConnection()}
* calls to one of various target DataSources based on a lookup key. The latter is usually
* (but not necessarily) determined through some thread-bound transaction context.
*
* @author Juergen Hoeller
* @since 2.0.1
* @see #setTargetDataSources
* @see #setDefaultTargetDataSource
* @see #determineCurrentLookupKey()
*/
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
我們寫的這個類中重寫了父類兩個重要的方法
1、afterPropertiesSet
首先源碼中是這樣的:
@Override
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property ‘targetDataSources‘ is required");
}
this.resolvedDataSources = new HashMap<Object, DataSource>(this.targetDataSources.size());
for (Map.Entry<Object, Object> entry : this.targetDataSources.entrySet()) {
Object lookupKey = resolveSpecifiedLookupKey(entry.getKey());
DataSource dataSource = resolveSpecifiedDataSource(entry.getValue());
this.resolvedDataSources.put(lookupKey, dataSource);
}
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
而我們重寫的目的就是為了設置默認我們的主庫和從庫
2、determineCurrentLookupKey
這是AbstractRoutingDataSource類中的一個抽象方法,而它的返回值是你所要用的數據源dataSource的key值
在這個方法中我們通過DynamicDataSourceHolder獲取當前線程所應該使用的數據源,然后將數據源的名字返回。也就是dataSource的key值。
然后是下一個主角,動態數據源插件,實現MyBatis攔截器接口,這個類一共干了下面幾個事情
(當我們實現了MyBatis攔截器接口之后就能在數據庫執行sql之前做操作,具體請參考別的博客,這里不細說)
1、通過當前是否使用事務確定數據源,如果使用事務,那么默認使用主庫
2、從sql語句中獲取sql執行的類型,根據具體的類型確定使用的數據源
3、利用cacheMap緩存已經進行判斷過的sql和對應執行時使用的數據源
4、通過DynamicDataSourceHolder保存當前線程所需要使用的數據源
最后一個是動態數據源事務管理器
這個類主要是保證,當一些事務是只讀類型的事務時,使用的數據源是從庫。
然后保存到DynamicDataSourceHolder中
總結
1、使用此種方式實現數據庫讀寫分離,對於代碼來說不會對現有代碼造成影響,沒有入侵性,容易剝離和加入。
2、對於事務使用同一個數據庫能保證讀寫的一致性。
3、不需要人為去判斷使用哪一個數據庫,不用擔心會出現人物問題。
4、擴展性上面,當有多個從庫的時候,不要想着配置多個從庫數據源解決問題,而是應該配置數據庫負載均衡然后實現多個從數據庫的訪問。

