數據庫寫入效率要低於讀取效率,一般系統中數據讀取頻率高於寫入頻率,單個數據庫實例在寫入的時候會影響讀取性能,這是做讀寫分離的原因。
實現方式主要基於mysql的主從復制,通過路由的方式使應用對數據庫的寫請求只在master上進行,讀請求在slave上進行。
mysql主從復制:https://www.jianshu.com/p/a68551347d7d
路由的方式主要有兩種:
1.代理
在應用和數據庫之間增加代理層,代理層接收應用對數據庫的請求,根據不同請求類型轉發到不同的實例,在實現讀寫分離的同時可以實現負載均衡。

目前常用的mysql的讀寫分離中間件有amoeba,MySQL-Proxy
2.應用內路由
在應用程序中實現,針對不同的請求類型去不同的實例執行sql

本文主要介紹第二種方式。基於springboot、 mybatis實現。
思路:之前在做項目的時候實現過mybatis數據源的動態切換。基於原來的方案,用aop來攔截dao層方法,根據方法名稱就可以判斷要執行的sql類型,動態切換主從數據源。
1.mybatis和數據源配置

2.數據源切換
切換數據源需要用到類AbstractRoutingDataSource

targetDataSources用一個map來存儲配置的數據源,defaultTargetDataSource默認的數據源

項目啟動時targetDataSources中的值會放到resolvedDataSources,key默認為targetDataSources中的key,可以實現resolveSpecifiedLookupKey()方法處理。
resolvedDefaultDataSource會被賦值給defaultTargetDataSource,因此如果defaultTargetDataSource沒有配啟動會報錯 。

在需要與mysql交互時檢索resolvedDataSources中的數據源,通過抽象determineCurrentLookupKey()獲取當前數據源的key,因此實現這個方法可以實現數據源的切換。
數據源加載:
/** * Title:MybatisConfiguration * * @author angla **/ @Configuration public class MybatisConfiguration { @Autowired private Environment env; /** * 創建數據源(數據源的名稱:方法名可以取為XXXDataSource(),XXX為數據庫名稱,該名稱也就是數據源的名稱) */ @Bean public DataSource masterDataSource() throws Exception { Properties props = new Properties(); props.put("driverClassName", env.getProperty("spring.mastersource.driver-class-name")); props.put("url", env.getProperty("spring.mastersource.url")); props.put("username", env.getProperty("spring.mastersource.username")); props.put("password", env.getProperty("spring.mastersource.password")); return DruidDataSourceFactory.createDataSource(props); } @Bean public DataSource slaveDataSource() throws Exception { Properties props = new Properties(); props.put("driverClassName", env.getProperty("spring.slavesource1.driver-class-name")); props.put("url", env.getProperty("spring.slavesource1.url")); props.put("username", env.getProperty("spring.slavesource1.username")); props.put("password", env.getProperty("spring.slavesource1.password")); return DruidDataSourceFactory.createDataSource(props); } /** * @Primary 該注解表示在同一個接口有多個實現類可以注入的時候,默認選擇哪一個,而不是讓@autowire注解報錯 * @Qualifier 根據名稱進行注入,通常是在具有相同的多個類型的實例的一個注入(例如有多個DataSource類型的實例) */ @Bean @Primary @DependsOn({"masterDataSource","slaveDataSource"}) public DynamicDataSource dataSource(DataSource masterDataSource, DataSource slaveDataSource) { Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put(DataSourceTypeEnum.DATA_SOURCE_MASTER.getName(), masterDataSource); targetDataSources.put(DataSourceTypeEnum.DATA_SOURCE_SLAVE.getName(),slaveDataSource); DynamicDataSource dataSource = new DynamicDataSource(); dataSource.setTargetDataSources(targetDataSources);// 該方法是AbstractRoutingDataSource的方法 dataSource.setDefaultTargetDataSource(slaveDataSource);// 默認的datasource設置為myTestDbDataSource return dataSource; } /** * 根據數據源創建SqlSessionFactory */ @Bean public SqlSessionFactory sqlSessionFactory(DynamicDataSource ds) throws Exception { SqlSessionFactoryBean fb = new SqlSessionFactoryBean(); fb.setDataSource(ds);// 指定數據源 fb.setTypeAliasesPackage(env.getProperty("mybatis.typeAliasesPackage"));// 指定基包 fb.setMapperLocations( new PathMatchingResourcePatternResolver().getResources(Objects.requireNonNull(env.getProperty( "mybatis.mapperLocations")))); return fb.getObject(); } /** * 配置事務管理器 */ @Bean public DataSourceTransactionManager transactionManager(DynamicDataSource dataSource) throws Exception { return new DataSourceTransactionManager(dataSource); } } 數據源枚舉: /** * Title:DataSourceTypeEnum * * @author angla **/ public enum DataSourceTypeEnum { DATA_SOURCE_MASTER(1,"master"), DATA_SOURCE_SLAVE(2,"slave"); DataSourceTypeEnum(Integer code, String name) { this.code = code; this.name = name; } private Integer code; private String name; public Integer getCode() { return code; } public String getName() { return name; } }
定義ThreadLocal存儲
/** * Title:DataSourceContextHolder * * @author angla **/ public class DataSourceContextHolder { private static final ThreadLocal<DataSourceTypeEnum> contextHolder = new ThreadLocal<>(); public static void setDatabaseType(DataSourceTypeEnum databaseType) { contextHolder.set(databaseType); } public static DataSourceTypeEnum getDatabaseType() { return contextHolder.get(); } } 實現determineCurrentLookupKey方法 /** * Title:DynamicDataSource * * @author angla **/ public class DynamicDataSource extends AbstractRoutingDataSource { protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDatabaseType(); } }
定義aop攔截dao層方法:
@Component @Aspect @Slf4j public class DataSourceAspect { private static final String[] queryStrs = {"query", "select", "get"}; /** * 定義切入點,切入點為com.angla.demo.dao下的所有方法 */ @Pointcut("execution(* com.angla.demo.dao.*.*(..))") public void executeSql() { } /** * 前置通知:在連接點之前執行的通知 * * @param joinPoint * @throws Throwable */ @Before("executeSql()") public void doBefore(JoinPoint joinPoint) throws Throwable { MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature(); String mName = methodSignature.getMethod().getName(); log.info("攔截sql方法:{}", mName); DataSourceContextHolder.setDatabaseType(DataSourceTypeEnum.DATA_SOURCE_MASTER); for (String name : queryStrs) { if (mName.startsWith(name)) { log.info("查詢語句,設置數據源為slave"); DataSourceContextHolder.setDatabaseType(DataSourceTypeEnum.DATA_SOURCE_SLAVE); break; } } log.info("當前數據源:{}",DataSourceContextHolder.getDatabaseType().getName()); } }
至此,一個簡單的讀寫分離實現就完成了,測試下結果:

停掉master實例,寫數據報錯,可以正常讀取數據,停掉slave實例可以正常寫數據,不能讀取數據,結果是沒問題的。但是這樣還不夠,現在加載數據源只能加載一主一從,不能適用一主多從或者多主多從的情況,后面需要改下數據源加載和獲取方式。
多主多從配置:

加載數據源配置:
@Data @Component @ConfigurationProperties(prefix = "spring") public class DataSourceProperties { private List<Map<String,String>> mastersources; private List<Map<String,String>> slavesources; } @Autowired private DataSourceProperties dataSourceProperties; /** * 創建數據源(數據源的名稱:方法名可以取為XXXDataSource(),XXX為數據庫名稱,該名稱也就是數據源的名稱) */ @Bean public List<DataSource> masterDataSources() throws Exception { List<Map<String, String>> mastersources = dataSourceProperties.getMastersources(); if (CollectionUtils.isEmpty(mastersources)) { throw new IllegalArgumentException("需要至少一個主數據源"); } List<DataSource> dataSources = new ArrayList<>(); for (Map map : mastersources) { dataSources.add(DruidDataSourceFactory.createDataSource(map)); } return dataSources; } @Bean public List<DataSource> slaveDataSources() throws Exception { List<Map<String, String>> slavesources = dataSourceProperties.getSlavesources(); if (CollectionUtils.isEmpty(slavesources)) { throw new IllegalArgumentException("需要至少一個從數據源"); } List<DataSource> dataSources = new ArrayList<>(); for (Map map : slavesources) { dataSources.add(DruidDataSourceFactory.createDataSource(map)); } return dataSources; } /** * @Primary 該注解表示在同一個接口有多個實現類可以注入的時候,默認選擇哪一個,而不是讓@autowire注解報錯 * @Qualifier 根據名稱進行注入,通常是在具有相同的多個類型的實例的一個注入(例如有多個DataSource類型的實例) */ @Bean @Primary @DependsOn({"masterDataSources", "slaveDataSources"}) public DynamicDataSource dataSource(List<DataSource> masterDataSources, List<DataSource> slaveDataSources) { Map<Object, Object> targetDataSources = new HashMap<>(); for (int i = 0; i < masterDataSources.size(); i++) { targetDataSources.put(DataSourceTypeEnum.DATA_SOURCE_MASTER.getName() + i, masterDataSources.get(i)); } for (int i = 0; i < slaveDataSources.size(); i++) { targetDataSources.put(DataSourceTypeEnum.DATA_SOURCE_SLAVE.getName() + i, slaveDataSources.get(i)); } DynamicDataSource dataSource = new DynamicDataSource(); dataSource.setTargetDataSources(targetDataSources);// 該方法是AbstractRoutingDataSource的方法 dataSource.setDefaultTargetDataSource(slaveDataSources.get(0));// 默認的datasource設置為myTestDbDataSource return dataSource; }
用隨機的方式獲取數據源:
@Slf4j public class DynamicDataSource extends AbstractRoutingDataSource { @Autowired private DataSourceProperties dataSourceProperties; protected Object determineCurrentLookupKey() { DataSourceTypeEnum dataSourceType = DataSourceContextHolder.getDatabaseType(); int i; List masterSources = dataSourceProperties.getMastersources(); List slaveSources = dataSourceProperties.getSlavesources(); if (dataSourceType.equals(DataSourceTypeEnum.DATA_SOURCE_MASTER)) { i = ThreadLocalRandom.current().nextInt(masterSources.size()) % masterSources.size(); } else { i = ThreadLocalRandom.current().nextInt(slaveSources.size()) % slaveSources.size(); } return dataSourceType.getName() + i; } }
當然數據源加載完成后也可以用其他方式來做多數據源的負載均衡,只需要重寫determineCurrentLookupKey()方法就行。