一.為什么要進行讀寫分離呢?
因為數據庫的“寫操作”操作是比較耗時的(寫上萬條條數據到Mysql可能要1分鍾分鍾)。但是數據庫的“讀操作”卻比“寫操作”耗時要少的多(從Mysql讀幾萬條數據條數據可能只要十秒鍾)。
所以讀寫分離解決的是,數據庫的“寫操作”影響了查詢的效率問題。
如下圖所示:

讀寫分離: 大多數站點的數據庫讀操作比寫操作更加密集,而且查詢條件相對復雜,數據庫的大部分性能消耗在查詢操作上了。為保證數據庫數據的一致性,我們要求所有對於數據庫的更新操作都是針對主數據庫的,但是讀操作是可以針對從數據庫來進行。
如下圖所示:
以下進行一個代碼層面的自動切換數據源進行讀寫分離的例子。
第一。首先搭建一個SSM框架的web工程。省略。
jdb.properties配置如下:
#主數據庫連接 jdbc_url_m=jdbc:mysql://localhost:3306/mama-bike?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #兩個從數據庫連接 jdbc_url_s_1=jdbc:mysql://localhost:3307/mama-bike?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull jdbc_url_s_2=jdbc:mysql://localhost:3308/mama-bike?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull jdbc_username=root jdbc_password=root
web.xml配置省略
第二。spring-cfg.xml文件中配置一個主數據源,兩個從數據源,具體配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <!--掃描注解生成bean--> <context:annotation-config/> <!--包掃描--> <context:component-scan base-package="com.coder520"/> <context:property-placeholder location="classpath:jdbc.properties"/> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource"/> <property name="mapperLocations" value="classpath:com/coder520/**/**.xml"/> </bean> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="com.coder520.*.dao"/> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/> </bean> <!--聲明事務管理 采用注解方式--> <tx:annotation-driven transaction-manager="transactionManager"/> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <!--開啟切面代理--> <aop:aspectj-autoproxy/> <!--切換數據源切面--> <bean id="switchDataSourceAspect" class="com.coder520.common.DataSourceAspect"/> <!--切面配置--> <aop:config> <aop:aspect ref="switchDataSourceAspect"> <aop:pointcut id="tx" expression="execution(* com.coder520.*.service.*.*(..))"/> <aop:before method="before" pointcut-ref="tx"/> </aop:aspect> </aop:config> <!--主數據庫設置--> <bean id="masterdataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close" init-method="init"> <property name="url" value="${jdbc_url_m}"/> <property name="username" value="${jdbc_username}"/> <property name="password" value="${jdbc_password}"/> </bean> <!--從數據庫設置--> <bean id="slavedataSource_1" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close" init-method="init"> <property name="url" value="${jdbc_url_s_1}"/> <property name="username" value="${jdbc_username}"/> <property name="password" value="${jdbc_password}"/> </bean> <!--從數據庫設置--> <bean id="slavedataSource_2" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close" init-method="init"> <property name="url" value="${jdbc_url_s_2}"/> <property name="username" value="${jdbc_username}"/> <property name="password" value="${jdbc_password}"/> </bean> <bean id="dataSource" class="com.coder520.common.DynamicDataSource"> <property name="targetDataSources"> <map> <entry key="master" value-ref="masterdataSource"/> <entry key="slave_1" value-ref="slavedataSource_1"/> <entry key="slave_2" value-ref="slavedataSource_2"/> </map> </property> <!--默認數據源為主數據庫--> <property name="defaultTargetDataSource" ref="masterdataSource"/> </bean> </beans>
spring-mvc.xml配置如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <!--開啟切面編程自動代理--> <mvc:annotation-driven> <mvc:message-converters> <bean class="org.springframework.http.converter.StringHttpMessageConverter"/> <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <value>text/html;charset=UTF-8</value> <value>application/json;charset=UTF-8</value> </list> </property> </bean> </mvc:message-converters> </mvc:annotation-driven> <!--包掃描--> <context:component-scan base-package="com.coder520.*.controller"> </context:component-scan> <!--開啟注解掃描--> <mvc:annotation-driven/> <!--處理靜態資源--> <mvc:default-servlet-handler/> <bean id="velocityConfigurer" class="org.springframework.web.servlet.view.velocity.VelocityConfigurer"> <property name="resourceLoaderPath" value="/WEB-INF/views"/> <property name="velocityProperties"> <props> <prop key="input.encoding">utf-8</prop> <prop key="output.encoding">utf-8</prop> <prop key="file.resource.loader.cache">false</prop> <prop key="file.resource.loader.modificationCheckInterval">1</prop> <prop key="velocimacro.library.autoreload">false</prop> </props> </property> </bean> <bean class="org.springframework.web.servlet.handler.SimpleMappingExceptionResolver"> <property name="exceptionMappings"> <props> <prop key="org.apache.shiro.authz.UnauthorizedException">403</prop> </props> </property> </bean> <bean class="org.springframework.web.servlet.view.velocity.VelocityViewResolver"> <property name="suffix" value=".vm"/> <property name="contentType" value="text/html;charset=utf-8"/> <property name="dateToolAttribute" value="date"/><!--日期函數名稱--> </bean> </beans>
Spring提供了一個AbstractRoutingDataSource這個類來幫我們切換數據源。故名思意,Routing,是路由的意思,可以幫我們切換到我們想切換到的數據庫。因此我們需要自己創建一個類來繼承它。
我們再進入看一下AbstractRoutingDataSource源碼是如何實現。
里面的方法到底干嘛用的,都在源碼里面寫明注釋,並且標記執行順序。如下://
// Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.jdbc.datasource.lookup; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import javax.sql.DataSource; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.AbstractDataSource; import org.springframework.util.Assert; public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
//裝載spring-cfg.xml中配置的那三個數據源。 private Map<Object, Object> targetDataSources;
//默認數據源 private Object defaultTargetDataSource;
//出錯回滾 private boolean lenientFallback = true;
//Map中各個數據源對應的key private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();
//裝載Map<Object,Object> targetDataSources,即一個MAP裝載一個舊MAP private Map<Object, DataSource> resolvedDataSources;
//這屬性是為了得到defaultTargetDataSource, private DataSource resolvedDefaultDataSource; public AbstractRoutingDataSource() { }
//1.裝載spring-cfg.xml中配置的那三個數據源 public void setTargetDataSources(Map<Object, Object> targetDataSources) { this.targetDataSources = targetDataSources; } //1.設置默認數據源 public void setDefaultTargetDataSource(Object defaultTargetDataSource) { this.defaultTargetDataSource = defaultTargetDataSource; }
public void setLenientFallback(boolean lenientFallback) { this.lenientFallback = lenientFallback; }
public void setDataSourceLookup(DataSourceLookup dataSourceLookup) { this.dataSourceLookup = (DataSourceLookup)(dataSourceLookup != null?dataSourceLookup:new JndiDataSourceLookup()); }
// 2.根據spring-cfg.xml中配置targetDataSources可以在afterPropertiesSet方法中對targetDataSources進行解析,獲取真正的datasources public void afterPropertiesSet() { if(this.targetDataSources == null) { throw new IllegalArgumentException("Property 'targetDataSources' is required"); } else {
//新建一個跟MAP targetDataSource一樣的MAP this.resolvedDataSources = new HashMap(this.targetDataSources.size());
//遍歷MAP Iterator var1 = this.targetDataSources.entrySet().iterator(); //判斷MAP中是否還有數據源 while(var1.hasNext()) {
//獲取數據源Entry Entry<Object, Object> entry = (Entry)var1.next();
//設置每一個數據源Entry對應的key Object lookupKey = this.resolveSpecifiedLookupKey(entry.getKey());
//設置數據源Entry對應的value,即數據源 DataSource dataSource = this.resolveSpecifiedDataSource(entry.getValue());
//放入到新建的MAP中 this.resolvedDataSources.put(lookupKey, dataSource); } //設置默認數據源 if(this.defaultTargetDataSource != null) { this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource); } } } protected Object resolveSpecifiedLookupKey(Object lookupKey) { return lookupKey; } protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException { if(dataSource instanceof DataSource) { return (DataSource)dataSource; } else if(dataSource instanceof String) { return this.dataSourceLookup.getDataSource((String)dataSource); } else { throw new IllegalArgumentException("Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource); } } public Connection getConnection() throws SQLException { return this.determineTargetDataSource().getConnection(); } public Connection getConnection(String username, String password) throws SQLException { return this.determineTargetDataSource().getConnection(username, password); } public <T> T unwrap(Class<T> iface) throws SQLException { return iface.isInstance(this)?this:this.determineTargetDataSource().unwrap(iface); } public boolean isWrapperFor(Class<?> iface) throws SQLException { return iface.isInstance(this) || this.determineTargetDataSource().isWrapperFor(iface); }
//3.最關鍵的一個方法。此方法決定選擇哪一個數據源 protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
//決定選擇數據源的key,即傳進來的那個數據源 Object lookupKey = this.determineCurrentLookupKey();
//獲取相應的數據源 DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
//如果為空,就用默認的那個數據源 if(dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } //如果默認數據源還是為空,證明沒配置默認數據源,就會拋異常 if(dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } else { return dataSource; } } //這是最重要的方法,要我們實現改方法的。 protected abstract Object determineCurrentLookupKey(); }
因此實現該determineCurrentLookupKey()方法:首先自己創建的類要繼承AbstractRoutingDataSource
如下代碼
package com.coder520.common;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/**
* Created by cong on 2018/3/14.
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getDataSource();
}
}
DynamicDataSourceHolder.getDataSource()是獲取數據源。但是呢,spring中的數據源是唯一,每一個用戶過來都是共用這個數據源的。我們知道高並發的情況下,多個用戶共享一個資源,這是有線程問題的,這樣獲取數據源是不安全的。
因此我們要用到並發編程問題呢,我們要用到並發編程里面的一個類ThreadLocal這個類,這個類用來ThreadLocal類用來提供線程內部的局部變量。這種變量在多線程環境下訪問(通過get或set方法訪問)時能保證各個線程里的變量相對獨立於其他線程內的變量。
ThreadLocal實例通常來說都是private static類型的,用於關聯線程和線程的上下文。
那么我們在兩個從庫中進行讀操作如何公平的分配來讀操作呢?我們自然想到要有輪詢的思維。通過一個計時器來自增求模運算。這個計時器的只從-1開始,這樣得到的結果就只有0和1了,根據0 和 1來分配兩個從庫進行讀操作。
注意這個計時器如果用Inter類型的話,必然會出現線程安全問題的,因為這是共享的數據類型。因此我們可以用並發編程里面的AtomicInterger原子屬性的類。解決線程安全問題。我們知道Integer是有范圍的,我們不能讓
這個計數器一直自增,這樣下去會去問題的。因此還需要來一個計數器重置。
DynamicDataSourceHolder類代碼如下:
package com.coder520.common; import java.util.concurrent.atomic.AtomicInteger; /** * Created by cong on 2018/3/14. */ public class DynamicDataSourceHolder { //綁定本地線程 public static final ThreadLocal<String> holder = new ThreadLocal<>(); //計數器 private static AtomicInteger counter = new AtomicInteger(-1); //寫庫對應的數據源Key private static final String MASTER = "master"; //從庫對應的數據源key private static final String SLAVE_1 = "slave_1"; private static final String SLAVE_2 = "slave_2"; //設置數據源,判斷傳進來的主庫還是從庫的類型 public static void setDataSource(DataSourceType dataSourceType){ if (dataSourceType == DataSourceType.MASTER){ System.out.println("-----MASTER------"); holder.set(MASTER); }else if (dataSourceType == DataSourceType.SLAVE){ holder.set(roundRobinSlaveKey()); } } //獲取數據源 public static String getDataSource(){ return holder.get(); } //輪詢選擇哪一個從數據庫去讀操作 private static String roundRobinSlaveKey() { //計數器模運算 Integer index = counter.getAndIncrement() % 2; //計數器重置 if (counter.get()>9999){ counter.set(-1); } //輪詢判斷 if (index == 0){ System.out.println("----SLAVE_1-----"); return SLAVE_1; }else { System.out.println("----SLAVE_2-----"); return SLAVE_2; } } }
DataSourceType是一個枚舉類型,這些這樣寫是讓代碼美觀一些。
DataSourceType枚舉類型代碼如下:
package com.coder520.common; /** */ public enum DataSourceType { MASTER,SLAVE; }
到這里已經萬事具備了,到了關鍵一步了,那么我們什么時候切換數據源呢?我怎么切換數據源呢?
我們要切換數據源的時候我們手動去控制它,我們希望在業務層打一個注解,比如現在我們需要讀庫了,業務層的方法都是讀庫了,我們只要打一個注解就把它搞定,例如@DataSource(DataSourceType.SLAVE),
然后讓DynamicDataSourceHolder這個類自動幫我們切換一下,用它setDataSource(DataSourceType dataSourceType)方法將數據源設置成SLAVE.這樣讀操作就走讀庫了。
那么問題來了,我們想達到這個效果,那改怎么辦呢?那么首先我們要定義一個注解。
那么又有疑問了,為什么我們不在每一個查詢的方法里面調用DynamicDataSourceHolder.setDataSource(DataSourceType dataSourceType)方法設置一下不就行了嗎?
這樣做也可以,但是這樣做豈不是很蛋疼?因為這樣做代碼就不夠優雅了,要重復寫很多代碼。每一個查詢方法里面都這樣寫,豈不是煩死?
因此我們自定義一個注解,代碼如下:
package com.coder520.common; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * Created by cong on 2018/3/14. */
//運行時影響程序注解 @Retention(RetentionPolicy.RUNTIME)
//這個注解作用於所有方法 @Target({ElementType.METHOD}) public @interface DataSource { //打了這個注解,如果沒設置值,我們就默認用MASTER主庫 DataSourceType value() default DataSourceType.MASTER; }
那么我們到這里就OK了嗎?並不是的,我們只是打了個注解,還沒進行數據源的切換呢。然后做呢?
這時我們就要用切面編程AOP方法來執行所有的切面,我們切哪個方法呢?我們切所有的業務層,service層的方法,然后獲取到它的注解,看一下注解標記的是MASTER,還是SLAVE
然后調用DynamicDataSourceHolder.setDataSource(DataSourceType dataSourceType)方法設置一下就行了。這是正是切面編程大顯身手的時候,切面編程讓我們一段代碼讓我們給每一個方法執行一段業務邏輯,
減少我們的代碼量。
我們都是AOP有前置通知,后置通知,環繞通知,我們在這里一定要用前置通知,因為進入方法前就一定先要切換數據源,方法執行完了,再切換數據源還有個屁用。
DataSourceAspect切面類的代碼如下:
package com.coder520.common; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.reflect.MethodSignature; import java.lang.reflect.Method; /***/ public class DataSourceAspect { public void before(JoinPoint point) throws NoSuchMethodException { //獲取切點 Object target = point.getTarget(); //獲取方法的名字 String method = point.getSignature().getName(); //獲取字節碼對象 Class classz = target.getClass(); //獲取方法上的參數 Class<?>[] parameterTypes = ((MethodSignature)point.getSignature()).getMethod().getParameterTypes(); //獲取方法 Method m = classz.getMethod(method,parameterTypes); //判斷方法是否存在,並且判斷是否有DataSource這個注釋。 if (m != null && m.isAnnotationPresent(DataSource.class)){ //獲取注解 DataSource dataSource = m.getAnnotation(DataSource.class); //設置數據源 DynamicDataSourceHolder.setDataSource(dataSource.value()); } } }
注意:必須在spirng-cfg.xml中聲明切面這個BEAN,並指定切哪里。
如下:
<!--開啟切面代理-->
<aop:aspectj-autoproxy/>
<!--切換數據源切面Bean-->
<bean id="switchDataSourceAspect" class="com.coder520.common.DataSourceAspect"/>
<!--切面配置-->
<aop:config>
<aop:aspect ref="switchDataSourceAspect">
<aop:pointcut id="tx" expression="execution(* com.coder520.*.service.*.*(..))"/>
<aop:before method="before" pointcut-ref="tx"/>
</aop:aspect>
</aop:config>
package com.coder520.user.service; import com.coder520.common.DataSource; import com.coder520.common.DataSourceType; import com.coder520.common.DynamicDataSourceHolder; import com.coder520.user.dao.UserMapper; import com.coder520.user.entity.User; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.UnsupportedEncodingException; import java.security.NoSuchAlgorithmException; @Service("userServiceImpl") public class UserServiceImpl implements UserService{ @Autowired private UserMapper userMapper; /** *@Description 根據用戶名查詢用戶 */ @DataSource(DataSourceType.SLAVE) @Override public User findUserByUserId(long id) { User user=null; try { user =userMapper.selectByPrimaryKey(id); }catch (Exception e){ e.printStackTrace(); throw e; } return user; } @Override @Transactional public int insertUser() { User user = new User(); user.setMobile("1234567"); user.setNickname("laowang"); User user1 = new User(); user1.setId(2L); user1.setMobile("11111111"); user1.setNickname("laowang2"); userMapper.insertSelective(user); userMapper.insertSelective(user1); return 0; } @Override public void createUser(User user) { userMapper.insertSelective(user); } }
Controller層代碼:
package com.coder520.user.controller; import com.coder520.user.entity.User; import com.coder520.user.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.servlet.ModelAndView; import javax.servlet.http.HttpSession; /***/ @Controller @RequestMapping("user") public class UserController { @Autowired private UserService userService; /** *@Description 獲取用戶信息 */ @RequestMapping("/getuser") @ResponseBody public User getUser(){ return userService.findUserByUserId(1); } @RequestMapping("/setuser") @ResponseBody public int setUser(){ return userService.insertUser(); } }
mybatis那部分的代碼省略。
運行結果如下:


可以看到兩個SLVE是輪詢切換的。
接着自己可以測試一下插入,修改數據源,是否切換到主庫中。查看3個數據庫是否同步了,這里就不演示了。
就算中途出錯,事務會回滾的。這里不演示了,自己可以去試一下。
主從復制數據是異步完成的,這就導致主從數據庫中的數據有一定的延遲,在讀寫分離的設計中必須要考慮這一點。
以博客為例,用戶登錄后發表了一篇文章,他需要馬上看到自己的文章,但是對於其它用戶來講可以允許延遲一段時間(1分鍾/5分鍾/30分鍾),不會造成什么問題。
這時對於當前用戶就需要讀主數據庫,對於其他訪問量更大的外部用戶就可以讀從數據庫。
解決辦法:
適當放棄一致性:在一些實時性要求不高的場合,我們適當放棄一致性要求。這樣就可以充分利用多種手段來提高系統吞吐量,例如頁面緩存(cookie,session)、分布式數據緩存(redis)、數據庫讀寫分離、查詢數據搜索索引化。
總結:
我的想法是要使用讀寫分離來實現系統吞吐量的提升就要從業務上想辦法降低一致性的要求。
對必須要有一致性的功能是無法進行讀寫分離的,可以采用多庫不區分讀寫以及redis緩存等技術來實現。
所以主從分離后,去從數據庫讀的話,可能還沒同步過來。
下一篇用中間件來屏蔽掉這些復雜的操作來進行數據源切換
