數據庫讀寫分離,主從同步實現方法


https://blog.csdn.net/starlh35/article/details/78735510

前言

眾所周知,隨着用戶量的增多,數據庫操作往往會成為一個系統的瓶頸所在,而且一般的系統“讀”的壓力遠遠大於“寫”,因此我們可以通過實現數據庫的讀寫分離來提高系統的性能。
實現思路

通過設置主從數據庫實現讀寫分離,主數據庫負責“寫操作”,從數據庫負責“讀操作”,根據壓力情況,從數據庫可以部署多個提高“讀”的速度,借此來提高系統總體的性能。
基礎知識

要實現讀寫分離,就要解決主從數據庫數據同步的問題,在主數據庫寫入數據后要保證從數據庫的數據也要更新。

    1

主從數據庫同步的實現思路如圖:
主從同步

主服務器master記錄數據庫操作日志到Binary log,從服務器開啟i/o線程將二進制日志記錄的操作同步到relay log(存在從服務器的緩存中),另外sql線程將relay log日志記錄的操作在從服務器執行。
記住這張圖,接下來基於這個圖實際設置主從數據庫。
主從數據庫設置的具體步驟

首先要有兩個數據庫服務器master、slave(也可以用一個服務器安裝兩套數據庫環境運行在不同端口,slave也可以舉一反三設置多個),我們窮人就買虛擬雲服務器玩玩就行 0.0。以下操作假設你的兩台服務器上都已經安裝好了mysql服務。
1.打開mysql數據庫配置文件

vim /etc/my.cnf

    1

2.在主服務器master上配置開啟Binary log,主要是在[mysqld]下面添加:

server-id=1
log-bin=master-bin
log-bin-index=master-bin.index

    1
    2
    3

如圖:
master my.cnf
3.重啟mysql服務

service mysql restart

    1

ps:重啟方式隨意
4.檢查配置效果,進入主數據庫並執行

mysql> SHOW MASTER STATUS;

    1

可以看到下圖表示配置沒問題,這里面的File名:master-bin.000001 我們接下來在從數據庫的配置會使用:
5.配置從服務器的 my.cnf

在[mysqld]節點下面添加:
master status

server-id=2
relay-log-index=slave-relay-bin.index
relay-log=slave-relay-bin

    1
    2
    3

這里面的server-id 一定要和主庫的不同,如圖:
slave my.cnf
配置完成后同樣重啟從數據庫一下

service mysql restart

    1

6.接下來配置兩個數據庫的關聯

首先我們先建立一個操作主從同步的數據庫用戶,切換到主數據庫執行:

mysql> create user repl;
mysql> GRANT REPLICATION SLAVE ON *.* TO 'repl'@'從xxx.xxx.xxx.xx' IDENTIFIED BY 'mysql';
mysql> flush privileges;

    1
    2
    3

這個配置的含義就是創建了一個數據庫用戶repl,密碼是mysql, 在從服務器使用repl這個賬號和主服務器連接的時候,就賦予其REPLICATION SLAVE的權限, *.* 表面這個權限是針對主庫的所有表的,其中xxx就是從服務器的ip地址。
進入從數據庫后執行:

mysql> change master to master_host='主xxx.xxx.xxx.xx',master_port=3306,master_user='repl',master_password='mysql',master_log_file='master-bin.000001',master_log_pos=0;

    1

這里面的xxx是主服務器ip,同時配置端口,repl代表訪問主數據庫的用戶,上述步驟執行完畢后執行start slave啟動配置:

mysql> start slave;

    1

start slave
停止主從同步的命令為:

mysql> stop slave;

    1

查看狀態命令,\G表示換行查看

mysql> show slave status \G;

    1

可以看到狀態如下:
slave status
這里看到從數據庫已經在等待主庫的消息了,接下來在主庫的操作,在從庫都會執行了。我們可以主庫負責寫,從庫負責讀(不要在從庫進行寫操作),達到讀寫分離的效果。
我們可以簡單測試:

在主數據庫中創建一個新的數據庫:

mysql> create database testsplit;

    1

在從數據庫查看數據庫:

mysql> show databases;

    1

可以看到從數據庫也有testsplit這張表了,這里就不上圖了,親測可用。在主數據庫插入數據,從數據庫也可以查到。
至此已經實現了數據庫主從同步
代碼層面實現讀寫分離

上面我們已經有了兩個數據庫而且已經實現了主從數據庫同步,接下來的問題就是在我們的業務代碼里面實現讀寫分離,假設我們使用的是主流的ssm的框架開發的web項目,這里面我們需要多個數據源。

在此之前,我們在項目中一般會使用一個數據庫用戶遠程操作數據庫(避免直接使用root用戶),因此我們需要在主從數據庫里面都創建一個用戶mysqluser,賦予其增刪改查的權限:

    1

mysql> GRANT select,insert,update,delete ON *.* TO 'mysqluser'@'%' IDENTIFIED BY 'mysqlpassword' WITH GRANT OPTION;

    1

然后我們的程序里就用mysqluser這個用戶操作數據庫:
1.編寫jdbc.propreties

#mysql驅動
jdbc.driver=com.mysql.jdbc.Driver
#主數據庫地址
jdbc.master.url=jdbc:mysql://xxx.xxx.xxx.xx:3306/testsplit?useUnicode=true&characterEncoding=utf8
#從數據庫地址
jdbc.slave.url=jdbc:mysql://xxx.xxx.xxx.xx:3306/testsplit?useUnicode=true&characterEncoding=utf8
#數據庫賬號
jdbc.username=mysqluser
jdbc.password=mysqlpassword

    1
    2
    3
    4
    5
    6
    7
    8
    9

這里我們指定了兩個數據庫地址,其中的xxx分別是我們的主從數據庫的ip地址,端口都是使用默認的3306
2.配置數據源

在spring-dao.xml中配置數據源(這里就不累贅介紹spring的配置了,假設大家都已經配置好運行環境),配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!-- 配置整合mybatis過程 -->
    <!-- 1.配置數據庫相關參數properties的屬性:${url} -->
    <context:property-placeholder location="classpath:jdbc.properties" />
    <!-- 掃描dao包下所有使用注解的類型 -->
    <context:component-scan base-package="c n.xzchain.testsplit.dao" />
    <!-- 2.數據庫連接池 -->
    <bean id="abstractDataSource" abstract="true" class="com.mchange.v2.c3p0.ComboPooledDataSource"
    destroy-method="close">
        <!-- c3p0連接池的私有屬性 -->
        <property name="maxPoolSize" value="30" />
        <property name="minPoolSize" value="10" />
        <!-- 關閉連接后不自動commit -->
        <property name="autoCommitOnClose" value="false" />
        <!-- 獲取連接超時時間 -->
        <property name="checkoutTimeout" value="10000" />
        <!-- 當獲取連接失敗重試次數 -->
        <property name="acquireRetryAttempts" value="2" />
    </bean>
    <!--主庫配置-->
    <bean id="master" parent="abstractDataSource">
        <!-- 配置連接池屬性 -->
        <property name="driverClass" value="${jdbc.driver}" />
        <property name="jdbcUrl" value="${jdbc.master.url}" />
        <property name="user" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>
    <!--從庫配置-->
    <bean id="slave" parent="abstractDataSource">
        <!-- 配置連接池屬性 -->
        <property name="driverClass" value="${jdbc.driver}" />
        <property name="jdbcUrl" value="${jdbc.slave.url}" />
        <property name="user" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>
    <!--配置動態數據源,這里的targetDataSource就是路由數據源所對應的名稱-->
    <bean id="dataSourceSelector" class="cn.xzchain.testsplit.dao.split.DataSourceSelector">
        <property name="targetDataSources">
            <map>
                <entry value-ref="master" key="master"></entry>
                <entry value-ref="slave" key="slave"></entry>
            </map>
        </property>
    </bean>
    <!--配置數據源懶加載-->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy">
        <property name="targetDataSource">
            <ref bean="dataSourceSelector"></ref>
        </property>
    </bean>

    <!-- 3.配置SqlSessionFactory對象 -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <!-- 注入數據庫連接池 -->
        <property name="dataSource" ref="dataSource" />
        <!-- 配置MyBaties全局配置文件:mybatis-config.xml -->
        <property name="configLocation" value="classpath:mybatis-config.xml" />
        <!-- 掃描entity包 使用別名 -->
        <property name="typeAliasesPackage" value="cn.xzchain.testsplit.entity" />
        <!-- 掃描sql配置文件:mapper需要的xml文件 -->
        <property name="mapperLocations" value="classpath:mapper/*.xml" />
    </bean>

    <!-- 4.配置掃描Dao接口包,動態實現Dao接口,注入到spring容器中 -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <!-- 注入sqlSessionFactory -->
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
        <!-- 給出需要掃描Dao接口包 -->
        <property name="basePackage" value="cn.xzchain.testsplit.dao" />
    </bean>
</beans>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75

說明:
首先讀取配置文件jdbc.properties,然后在我們定義了一個基於c3p0連接池的父類“抽象”數據源,然后配置了兩個具體的數據源master、slave,繼承了abstractDataSource,這里面就配置了數據庫連接的具體屬性,然后我們配置了動態數據源,他將決定使用哪個具體的數據源,這里面的關鍵就是DataSourceSelector,接下來我們會實現這個bean。下一步設置了數據源的懶加載,保證在數據源加載的時候其他依賴的bean已經加載好了。接着就是常規的配置了,我們的mybatis全局配置文件如下
3.mybatis全局配置文件

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
  PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
  "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <!-- 配置全局屬性 -->
    <settings>
        <!-- 使用jdbc的getGeneratedKeys獲取數據庫自增主鍵值 -->
        <setting name="useGeneratedKeys" value="true" />

        <!-- 使用列別名替換列名 默認:true -->
        <setting name="useColumnLabel" value="true" />

        <!-- 開啟駝峰命名轉換:Table{create_time} -> Entity{createTime} -->
        <setting name="mapUnderscoreToCamelCase" value="true" />
        <!-- 打印查詢語句 -->
        <setting name="logImpl" value="STDOUT_LOGGING" />
    </settings>
    <plugins>
        <plugin interceptor="cn.xzchain.testsplit.dao.split.DateSourceSelectInterceptor"></plugin>
    </plugins>
</configuration>

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

這里面的關鍵就是DateSourceSelectInterceptor這個攔截器,它會攔截所有的數據庫操作,然后分析sql語句判斷是“讀”操作還是“寫”操作,我們接下來就來實現上述的DataSourceSelector和DateSourceSelectInterceptor
4.編寫DataSourceSelector

DataSourceSelector就是我們在spring-dao.xml配置的,用於動態配置數據源。代碼如下:

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

/**
 * @author lihang
 * @date 2017/12/6.
 * @description 繼承了AbstractRoutingDataSource,動態選擇數據源
 */
public class DataSourceSelector extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        return DynamicDataSourceHolder.getDataSourceType();
    }
}

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

我們只要繼承AbstractRoutingDataSource並且重寫determineCurrentLookupKey()方法就可以動態配置我們的數據源。
編寫DynamicDataSourceHolder,代碼如下:

/**
 * @author lihang
 * @date 2017/12/6.
 * @description
 */
public class DynamicDataSourceHolder {

    /**用來存取key,ThreadLocal保證了線程安全*/
    private static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
    /**主庫*/
    public static final String DB_MASTER = "master";
    /**從庫*/
    public static final String DB_SLAVE = "slave";

    /**
     * 獲取線程的數據源
     * @return
     */
    public static String getDataSourceType() {
        String db = contextHolder.get();
        if (db == null){
            //如果db為空則默認使用主庫(因為主庫支持讀和寫)
            db = DB_MASTER;
        }
        return db;
    }

    /**
     * 設置線程的數據源
     * @param s
     */
    public static void setDataSourceType(String s) {
        contextHolder.set(s);
    }

    /**
     * 清理連接類型
     */
    public static void clearDataSource(){
        contextHolder.remove();
    }
}

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43

這個類決定返回的數據源是master還是slave,這個類的初始化我們就需要借助DateSourceSelectInterceptor了,我們攔截所有的數據庫操作請求,通過分析sql語句來判斷是讀還是寫操作,讀操作就給DynamicDataSourceHolder設置slave源,寫操作就給其設置master源,代碼如下:

import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import java.util.Locale;
import java.util.Properties;

/**
 * @author lihang
 * @date 2017/12/6.
 * @description 攔截數據庫操作,根據sql判斷是讀還是寫,選擇不同的數據源
 */
@Intercepts({@Signature(type = Executor.class,method = "update",args = {MappedStatement.class,Object.class}),
@Signature(type = Executor.class,method = "query",args = {MappedStatement.class,Object.class, RowBounds.class, ResultHandler.class})})
public class DateSourceSelectInterceptor implements Interceptor{

    /**正則匹配 insert、delete、update操作*/
    private static final String REGEX = ".*insert\\\\u0020.*|.*delete\\\\u0020.*|.*update\\\\u0020.*";

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        //判斷當前操作是否有事務
        boolean synchonizationActive = TransactionSynchronizationManager.isSynchronizationActive();
        //獲取執行參數
        Object[] objects = invocation.getArgs();
        MappedStatement ms = (MappedStatement) objects[0];
        //默認設置使用主庫
        String lookupKey = DynamicDataSourceHolder.DB_MASTER;;
        if (!synchonizationActive){
            //讀方法
            if (ms.getSqlCommandType().equals(SqlCommandType.SELECT)){
                //selectKey為自增主鍵(SELECT LAST_INSERT_ID())方法,使用主庫
                if (ms.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)){
                    lookupKey = DynamicDataSourceHolder.DB_MASTER;
                }else {
                    BoundSql boundSql = ms.getSqlSource().getBoundSql(objects[1]);
                    String sql = boundSql.getSql().toLowerCase(Locale.CHINA).replace("[\\t\\n\\r]"," ");
                    //如果是insert、delete、update操作 使用主庫
                    if (sql.matches(REGEX)){
                        lookupKey = DynamicDataSourceHolder.DB_MASTER;
                    }else {
                        //使用從庫
                        lookupKey = DynamicDataSourceHolder.DB_SLAVE;
                    }
                }
            }
        }else {
            //一般使用事務的都是寫操作,直接使用主庫
            lookupKey = DynamicDataSourceHolder.DB_MASTER;
        }
        //設置數據源
        DynamicDataSourceHolder.setDataSourceType(lookupKey);
        return invocation.proceed();
    }

    @Override
    public Object plugin(Object target) {
        if (target instanceof Executor){
            //如果是Executor(執行增刪改查操作),則攔截下來
            return Plugin.wrap(target,this);
        }else {
            return target;
        }
    }

    @Override
    public void setProperties(Properties properties) {

    }
}

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76

通過這個攔截器,所有的insert、delete、update操作設置使用master源,select會使用slave源。

接下來就是測試了,我這是生產環境的代碼,直接打印日志,小伙伴可以加上日志后測試使用的是哪個數據源,結果和預期一樣,這樣我們就實現了讀寫分離~

ps:我們可以配置多個slave用於負載均衡,只需要在spring-dao.xml中添加slave1、slave2、slave3……然后修改dataSourceSelector這個bean,

<bean id="dataSourceSelector" class="cn.xzchain.o2o.dao.split.DataSourceSelector">
        <property name="targetDataSources">
            <map>
                <entry value-ref="master" key="master"></entry>
                <entry value-ref="slave1" key="slave1"></entry>
                <entry value-ref="slave2" key="slave2"></entry>
                <entry value-ref="slave3" key="slave3"></entry>
            </map>
        </property>

    1
    2
    3
    4
    5
    6
    7
    8
    9

在map標簽中添加slave1、slave2、slave3……即可,具體的負載均衡策略我們在DynamicDataSourceHolder、DateSourceSelectInterceptor中實現即可。

最后整理一下整個流程:
1.項目啟動后,在依賴的bean加載完成后,我們的數據源通過LazyConnectionDataSourceProxy開始加載,他會引用dataSourceSelector加載數據源。
2.DataSourceSelector會選擇一個數據源,我們在代碼里設置了默認數據源為master,在初始化的時候我們就默認使用master源。
3.在數據庫操作執行時,DateSourceSelectInterceptor攔截器攔截了請求,通過分析sql決定使用哪個數據源,“讀操作”使用slave源,“寫操作”使用master源。
寫在后面

現在很多讀寫分離中間件已經大大簡化了我們的工作,但是自己實現一個小體量的讀寫分離有助於我們進一步理解數據庫讀寫分離在業務上的實現,呼~  
---------------------  
作者:小小程序汪  
來源:CSDN  
原文:https://blog.csdn.net/starlh35/article/details/78735510  
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM