SpringBoot+Mybatis+自定義注解+Atomikos+實現多源數據庫切換和分布式事務


在我們平時的項目開發中,經常會遇到一個系統操作多個數據源的情況。下面介紹一種通過Spring AOP+自定義注解的形式實現多源數據庫切換的方式:

實現原理:

​       jdbc提供了AbstractRoutingDataSource抽象類用來支持多源數據庫切換,通過重寫determineCurrentLookupKey方法,設定要使用的數據源key即可完成數據源的切換。至於何時切換數據源,采用Aop+自定義注解,在需要切換數據源的方法上添加此注解,利用編寫的自定義注解的解析器獲取注解中配置的目標數據源,從而進行動態數據源切換。

protected DataSource determineTargetDataSource() {
		Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
		Object lookupKey = determineCurrentLookupKey();
		DataSource dataSource = this.resolvedDataSources.get(lookupKey);
		if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
			dataSource = this.resolvedDefaultDataSource;
		}
		if (dataSource == null) {
			throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
		}
		return dataSource;
	}

項目結構:

測試數據庫腳本:

# db01
# 創建數據庫
CREATE DATABASE `db01` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
# 創建數據表
CREATE TABLE `tbl_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '唯一ID',
  `name` varchar(30) DEFAULT NULL COMMENT '姓名',
  `sex` varchar(10) DEFAULT NULL COMMENT '性別',
  `age` int(11) DEFAULT NULL COMMENT '年齡',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=144 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

# db02
# 創建數據庫
CREATE DATABASE `db01` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';
# 創建數據表
CREATE TABLE `tbl_goods` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '唯一ID',
  `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '商品名稱',
  `price` decimal(10,0) DEFAULT NULL COMMENT '商品價格',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=52 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

一、創建SpringBoot測試項目

1、編寫application.yml

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    # 數據庫01
    db01:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/db01?useUnicode=true&characterEncoding=utf8&useSSL=false
      username: root
      password: root123mysql
    # 數據庫02
    db02:
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/db02?useUnicode=true&characterEncoding=utf8&useSSL=false
      username: root
      password: root123mysql

  profiles:
    active: dev

mybatis:
  config-location: classpath:mybatis/mybatis-config.xml
  mapper-locations: classpath:mybatis/mapper/*.xml

2、編寫mybatis-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <settings>
        <!--允許駝峰命名法-->
        <!--        <setting name="mapUnderscoreToCamelCase" value="true"/>-->
        <!--打印sql語句到控制台-->
        <setting name="logImpl" value="STDOUT_LOGGING"/>
    </settings>
    <typeAliases>
        <!--別名直接指定報名,在bean上沒有注解的情況下,會使用bean的首字母小寫的非限定類名作為它的別名-->
        <package name="com.whw.mdb.pojo"/>
    </typeAliases>
</configuration>

3、在resources文件夾下添加相關mapper文件

GoodsMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.whw.mdb.dao.GoodsDao">
    <!--添加商品-->
    <insert id="add" parameterType="com.whw.mdb.pojo.Goods" useGeneratedKeys="true" keyProperty="id">
        insert into tbl_goods( name, price)
        values ( #{name}, #{price})
    </insert>

    <!--獲取商品列表-->
    <select id="getGoodsList" resultType="com.whw.mdb.pojo.Goods">
        select *
        from tbl_goods;
    </select>
</mapper>

UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.whw.mdb.dao.UserDao">
    <!--添加用戶-->
    <insert id="add" parameterType="com.whw.mdb.pojo.User">
        insert into tbl_user(name, sex, age)
        values (#{name}, #{sex}, #{age})
    </insert>

    <!-- 獲取用戶列表 -->
    <select id="getUserList" resultType="com.whw.mdb.pojo.User">
        select *
        from tbl_user;
    </select>
</mapper>

二、在項目啟動項目錄下添加config文件夾,編寫多源數據庫配置和Swagger接口文檔配置

SwaggerConfig.java

package com.whw.mdb.config.swagger;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.env.Profiles;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

import java.util.ArrayList;

@Configuration
@EnableSwagger2//開啟Swagger
public class SwaggerConfig {
    @Bean
    public Docket docket(Environment environment) {
        //設置要顯示的Swagger環境
        Profiles profiles = Profiles.of("dev");
        //獲取項目環境:是生產環境還是發布環境
        boolean flag = environment.acceptsProfiles(profiles);

        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .groupName("大華子")
                .enable(flag)
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.whw.mdb.controller"))
                .build();
    }

    private ApiInfo apiInfo() {
        Contact contact = new Contact("大華子", "", "2483875320@qq.com");
        return new ApiInfo(
                "大華子的SwaggerAPI文檔",
                "不斷學習,不斷進步",
                "1.0",
                "urn:tos",
                contact,
                "Apache 2.0",
                "http://www.apache.org/licenses/LICENSE-2.0",
                new ArrayList());
    }
}

在config文件夾下添加datasource文件夾,數據庫切換相關配置都寫在此包下

1、創建數據源名稱枚舉DataSourceName

package com.whw.mdb.config.datasource;

public enum DataSourceName {

    DB01("db01"),
    DB02("db02");

    private String name;

    DataSourceName(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

2、編寫自定義數據源切換注解SwitchDataSource.java

package com.whw.mdb.config.datasource;

import java.lang.annotation.*;

/**
 * @描述 切換數據源注解
 **/
@Target({ElementType.TYPE, ElementType.METHOD})//指明此自定義注解只能用在方法上
@Retention(RetentionPolicy.RUNTIME)//指明此自定義注解是運行時注解
@Documented
public @interface SwitchDataSource {

    DataSourceName value() default DataSourceName.DB01;

}

3、編寫動態數據源切面DynamicDataSourceAspect.java

package com.whw.mdb.config.datasource;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect
@Order(-1)//切面必須要在事務注解@Transactional之前,由於在開始事務之前就需要確定數據源
@Component
public class DynamicDataSourceAspect {

    @Before("@within(SwitchDataSource)||@annotation(SwitchDataSource)")
    public void changedDataSource(JoinPoint joinpoint) {
        //獲取切入點方法上的注解
        Method method = ((MethodSignature) joinpoint.getSignature()).getMethod();
        SwitchDataSource dataSourceAnnotation = method.getAnnotation(SwitchDataSource.class);
        if (dataSourceAnnotation == null) {
            //如果方法上沒有數據源注解,則獲取方法所在類上面的注解
            dataSourceAnnotation = joinpoint.getTarget().getClass().getAnnotation(SwitchDataSource.class);
            //如果方法所在類上面也沒有數據源注解,則使用默認數據源
            if (dataSourceAnnotation == null) {
                return;
            }
        }

        //如果方法上面或者方法所在類上面有數據源注解,則設置當前線程的數據源為數據源注解指定的數據源
        DataSourceName dataSourceName = dataSourceAnnotation.value();
        DataSourceSwitcher.setDataSource(dataSourceName.getName());
    }

    @After("@within(SwitchDataSource)  || @annotation(SwitchDataSource)")
    public void clean() {
        //清理數據源的標簽
        DataSourceSwitcher.setToDefaultSource();
    }
}

4、編寫數據源切換處理器DataSourceSwitcher

package com.whw.mdb.config.datasource;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class DataSourceSwitcher extends AbstractRoutingDataSource {

    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();

    @Override
    protected Object determineCurrentLookupKey() {
        return threadLocal.get();
    }

    public static void setDataSource(String name) {
        threadLocal.set(name);
    }

    public static void setToDefaultSource() {
        threadLocal.remove();
    }
}

5、編寫多數據源配置文件DataSourceConfig

package com.whw.mdb.config.datasource;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * @描述 動態數據源配置類
 **/
@Configuration
public class DataSourceConfig {

    @Bean(name = "dataSourceDB01")
    @ConfigurationProperties(prefix = "spring.datasource.db01")
    public DataSource dataSourceDB01() {
        return DruidDataSourceBuilder.create().build();
    }

    @Bean(name = "dataSourceDB02")
    @ConfigurationProperties(prefix = "spring.datasource.db02")
    public DataSource dataSourceDB02() {
        return DruidDataSourceBuilder.create().build();
    }

    /*
    * @Primary:自動裝配時當出現多個Bean候選者時,被注解為@Primary的Bean將作為首選者,否則將拋出異常
    * */
    @Primary
    @Bean("dynamicDataSource")
    public DataSource dataSource() {

        DataSource db01 = dataSourceDB01();
        DataSource db02 = dataSourceDB02();
        Map<Object, Object> hashMap = new HashMap<>();
        hashMap.put(DataSourceName.DB01.getName(), db01);
        hashMap.put(DataSourceName.DB02.getName(), db02);

        AbstractRoutingDataSource dataSource = new DataSourceSwitcher();
        // 設定目標數據源
        dataSource.setTargetDataSources(hashMap);
        // 設定默認使用的數據源
        dataSource.setDefaultTargetDataSource(db01);
        return dataSource;
    }
}

三、實現數據庫切換,我們只需要在需要切換數據庫的方法上添加自定義注解@SwitchDataSource即可

1、編寫pojo

Goods.java

package com.whw.mdb.pojo;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;

@ApiModel("商品實體類")
public class Goods {
    @ApiModelProperty("唯一id")
    private Integer id;
    @ApiModelProperty("名稱")
    public String name;
    @ApiModelProperty("價格")
    public Double price;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }
}

User.java

package com.whw.mdb.pojo;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;

@ApiModel("用戶實體類")
public class User {
    @ApiModelProperty("唯一id")
    private Integer id;
    @ApiModelProperty("姓名")
    public String name;
    @ApiModelProperty("性別")
    public String sex;
    @ApiModelProperty("年齡")
    public Integer age;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

2、編寫dao層

GoodsDao.java

package com.whw.mdb.dao;

import com.whw.mdb.pojo.Goods;
import java.util.List;

public interface GoodsDao {
    int add(Goods goods);
    List<Goods> getGoodsList();
}

UserDao.java

package com.whw.mdb.dao;

import com.whw.mdb.pojo.User;
import org.springframework.stereotype.Repository;
import java.util.List;

@Repository
public interface UserDao {
    int add(User user);
    List<User> getUserList();
}

3、編寫service及實現類impl,在實現類中需要切換數據源的方法上添加自定義注解即可

GoodsService.java

package com.whw.mdb.service;

import com.whw.mdb.pojo.Goods;
import java.util.List;

public interface GoodsService {
    int add(Goods goods);
    List<Goods> getGoodsList();
}

GoodsServiceImpl.java

package com.whw.mdb.service.impl;

import com.whw.mdb.config.datasource.DataSourceName;
import com.whw.mdb.config.datasource.SwitchDataSource;
import com.whw.mdb.dao.GoodsDao;
import com.whw.mdb.pojo.Goods;
import com.whw.mdb.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;

@Service
public class GoodsServiceImpl implements GoodsService {

    @Autowired
    GoodsDao goodsDao;

    @Override
    @SwitchDataSource(value = DataSourceName.DB02)
    public int add(Goods goods) {
        return goodsDao.add(goods);
    }

    @SwitchDataSource(value = DataSourceName.DB02)
    @Override
    public List<Goods> getGoodsList() {
        return goodsDao.getGoodsList();
    }
}

UserService.java

package com.whw.mdb.service;

import com.whw.mdb.pojo.User;
import java.util.List;

public interface UserService {
    int add(User user);
    List<User> getUserList();
    Integer addAll();
}

UserServiceImpl.java

package com.whw.mdb.service.impl;

import com.whw.mdb.dao.UserDao;
import com.whw.mdb.pojo.Goods;
import com.whw.mdb.pojo.User;
import com.whw.mdb.service.GoodsService;
import com.whw.mdb.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    UserDao userDao;

    @Autowired
    GoodsService goodsService;

    @Override
    public int add(User user) {
        return userDao.add(user);
    }

    @Override
    public List<User> getUserList() {
        return userDao.getUserList();
    }

    @Override
    public Integer addAll() {
        User user = new User();
        user.setName("A");
        user.setSex("男");
        user.setAge(20);
        int count = userDao.add(user);

        Goods goods = new Goods();
        goods.setName("三星");
        goods.setPrice(3000d);
        count += goodsService.add(goods);
        int i = 1 / 0;
        return count;
    }
}

4、編寫controller層

GoodsController.java

package com.whw.mdb.controller;

import com.whw.mdb.pojo.Goods;
import com.whw.mdb.service.GoodsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;

@Api("商品操作API")
@RestController
@RequestMapping("/goods/")
public class GoodsController {
    @Autowired
    GoodsService goodsService;

    @ApiOperation("獲取商品列表")
    @GetMapping("list")
    public List<Goods> list() {
        List<Goods> goodsList = goodsService.getGoodsList();
        return goodsList;
    }

    @ApiOperation("添加商品")
    @PostMapping("add")
    public String addUser(
            @ApiParam(name = "name", required = true) @RequestParam(name = "name") String name,
            @ApiParam(name = "price", required = true) @RequestParam(name = "price") Double price
    ) {
        Goods goods = new Goods();
        goods.setName(name);
        goods.setPrice(price);
        int count = goodsService.add(goods);
        if (count > 0) {
            return "添加成功!";
        } else {
            return "添加失敗!";
        }
    }
}

UserController.java

package com.whw.mdb.controller;

import com.whw.mdb.pojo.User;
import com.whw.mdb.service.UserService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;

@Api("用戶操作API")
@RestController
@RequestMapping("/user/")
public class UserController {

    @Autowired
    UserService userService;

    @ApiOperation("獲取人員列表")
    @GetMapping("list")
    public List<User> list() {
        List<User> userList = userService.getUserList();
        return userList;
    }

    @ApiOperation("測試事務")
    @PostMapping("test")
    public String test() {
        Integer count = userService.addAll();
        return count > 0 ? "成功" : "失敗";
    }

    @ApiOperation("添加用戶")
    @PostMapping("add")
    public String addUser(
            @ApiParam(name = "name", required = true) @RequestParam(name = "name") String name,
            @ApiParam(name = "sex", required = true) @RequestParam(name = "sex") String sex,
            @ApiParam(name = "age", required = true) @RequestParam(name = "age") Integer age
    ) {
        User user = new User();
        user.setName(name);
        user.setAge(age);
        user.setSex(sex);
        int count = userService.add(user);
        if (count > 0) {
            return "添加成功!";
        } else {
            return "添加失敗!";
        }
    }
}

四、測試

1、運行項目,瀏覽器訪問http://localhost:8080/doc.html進入接口文檔頁面

2、測試獲取人員接口

3、測試獲取商品接口

從上述測試看,系統在訪問不同的功能時可以隨時切換數據源。

五、存在問題

由於數據庫中的事務是針對當前數據庫操作的,數據源切換之后會造成事務功能不可用,如果在方法上添加了事務注解,此方法實現又需要切換數據源,則會發現出現異常,數據庫無法切換

原因:使用了@Transactional注解。為了保證事物的一致性,它需要保證同一個線程的數據庫執行Connection和事物執行的Connection必須保持一致,因此去調用下一個Mapper時仍然保持了上一個Mapper的連接。所以就報錯。

從SpringManagedTransaction類中可以看出,事務開啟的時候就會確定數據庫連接,一個事務中的數據庫連接是唯一的

public Connection getConnection() throws SQLException {
        if (this.connection == null) {
            this.openConnection();
        }

        return this.connection;
    }

    private void openConnection() throws SQLException {
        this.connection = DataSourceUtils.getConnection(this.dataSource);
        this.autoCommit = this.connection.getAutoCommit();
        this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
        LOGGER.debug(() -> {
            return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring";
        });
    }

解決辦法:重寫SpringManagedTransaction的getConnection()方法

第一步:創建MyTransactionsFactory

import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;

import javax.sql.DataSource;

public class MyTransactionsFactory extends SpringManagedTransactionFactory {
    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
        return new MyManagedTransaction(dataSource);
    }
}

第二步:重寫MyManagedTransaction類中的getConnection()方法

package com.whw.mdbtransaction.config.datasource;

import org.mybatis.spring.transaction.SpringManagedTransaction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;

public class MyManagedTransaction extends SpringManagedTransaction {
    DataSource dataSource;
    ConcurrentHashMap<String, Connection> map = new ConcurrentHashMap<>();

    public MyManagedTransaction(DataSource dataSource) {
        super(dataSource);
        this.dataSource = dataSource;
    }

    @Override
    public Connection getConnection() throws SQLException {
        String key = DataSourceSwitcher.getDataSource();
        if (map.containsKey(key)) {
            return map.get(key);
        }
        Connection con = dataSource.getConnection();
        map.put(key, con);
        return con;
    }
}

第三步:在DataSourceConfig中指定事務工廠為自定的MyTransactionsFactory

@Bean
public SqlSessionFactory sqlSessionFactory(
        @Qualifier("dynamicDataSource") DataSource dynamicDataSource
) throws Exception {
    SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
    sessionFactoryBean.setDataSource(dynamicDataSource);
    // 此處要指定mapper.xml文件所在位置
    sessionFactoryBean.setMapperLocations(
            new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mapper/*.xml")
    );
    // 指定自定義的事務工廠
    sessionFactoryBean.setTransactionFactory(new MyTransactionsFactory());
    return sessionFactoryBean.getObject();
}

六、利用Atomikos實現多源數據庫事務一致性

1、實現原理

       對於單個數據庫,通常直接使用Mysql的事務進行事務控制,通過begin,commit和rollback等操作進行開啟、回滾或提交事務。但在復雜的應用中,有時會出現同時修改多個數據源數據的情況,為了保證這些數據的能夠受事務的控制,則需要使用分布式事務,而XA協議則是分布式事務協議。主流的數據庫如Mysql、Oracle、Postgresql、SqlServer等都支持XA協議。
       通過xa分布式事務協議可以允許多個數據源加入到一個全局事務中來,加入事務中的資源通常是關系數據庫,也有可能是其他數據資源。在一個全局事務中,包含多個數據操作的動作,這些動作在全局事務中要么全部執行,要么全部不執行,一個使用全局事務的應用包含一個或者多個資源管理器和一個事務管理器。

       資源管理器RM(resource manager):提供連接事務資源的的功能。一個數據庫服務器就是一種資源管理器。資源管理器是事務的參與者,必須要提供提交和回滾事務的功能。
​       事務管理器TM(transaction manager):事務管理器是全局事務的協調者,他通過與資源管理器通信,協調多個事務的運作。Mysql通過實現XA協議,處理XA事務,讓自身成為全局事務中的一個資源管理器。一個連接到Mysql服務器的客戶端則充當一個事務管理器的角色。實現全局事務,需要知道哪些參與者參與到事務中,如何將他們運行到一個可以共同提交,或者回滾的點。作為全局事務,還需要考慮網絡連接等因素導致的失敗。

       執行全局事務的過程分二階段提交(2PC),三階段提交(3PC)兩種實現。后期會進行詳細介紹...

2、實現步驟:

Atomikos是SpringBoot推薦使用的一個分布式事務協調工具,使用時我們只需要將DataSource改為XADataSource即可

第一步:引入依賴

<!-- 分布式事務控制 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.5.3</version>
</dependency>

第二步:改寫DataSourceConfig.java即可

package com.whw.mdbtransaction.config.datasource;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * @描述 動態數據源配置類
 **/
@Configuration
@MapperScan(basePackages = {"com.whw.mdbtransaction.dao"})
public class DataSourceConfig {

    @Value("${spring.datasource.db01.url}")
    private String db01Url;
    @Value("${spring.datasource.db01.username}")
    private String db01UserName;
    @Value("${spring.datasource.db01.password}")
    private String db01Password;

    @Value("${spring.datasource.db02.url}")
    private String db02Url;
    @Value("${spring.datasource.db02.username}")
    private String db02UserName;
    @Value("${spring.datasource.db02.password}")
    private String db02Password;

    /**
     * 配置數據源db01
     **/
    @Bean(name = "dataSourceDB01")
    public AtomikosDataSourceBean dataSourceDB01() {
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUrl(db01Url);
        mysqlXADataSource.setUser(db01UserName);
        mysqlXADataSource.setPassword(db01Password);

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("db01");
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setMaxPoolSize(20);
        return atomikosDataSourceBean;
    }

    /**
     * 配置數據源db02
     **/
    @Bean(name = "dataSourceDB02")
    public AtomikosDataSourceBean dataSourceDB02() {
        MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
        mysqlXADataSource.setUrl(db02Url);
        mysqlXADataSource.setUser(db02UserName);
        mysqlXADataSource.setPassword(db02Password);

        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setUniqueResourceName("db02");
        atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setMaxPoolSize(20);
        return atomikosDataSourceBean;
    }

    /**
     * 配置動態數據源
     **/
    @Primary
    @Bean("dynamicDataSource")
    public DataSource dataSource(
            @Qualifier("dataSourceDB01") DataSource dataSourceDB01,
            @Qualifier("dataSourceDB02") DataSource dataSourceDB02
    ) {
        Map<Object, Object> hashMap = new HashMap<>();
        hashMap.put(DataSourceName.DB01.getName(), dataSourceDB01);
        hashMap.put(DataSourceName.DB02.getName(), dataSourceDB02);
        AbstractRoutingDataSource dataSource = new DataSourceSwitcher();
        dataSource.setTargetDataSources(hashMap);
        dataSource.setDefaultTargetDataSource(dataSourceDB01);
        return dataSource;
    }

    /**
     * 配置SqlSessionFactory
     **/
    @Bean
    public SqlSessionFactory sqlSessionFactory(
            @Qualifier("dynamicDataSource") DataSource dynamicDataSource
    ) throws Exception {
        SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
        // 此處一定要使用動態數據源
        sessionFactoryBean.setDataSource(dynamicDataSource);
        // 此處要指定mapper.xml文件所在位置
        sessionFactoryBean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mapper/*.xml")
        );
        sessionFactoryBean.setTransactionFactory(new MyTransactionsFactory());
        return sessionFactoryBean.getObject();
    }

    /*@Bean(value = "xatx")
    public JtaTransactionManager jtaTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransactionImp, userTransactionManager);
        jtaTransactionManager.setAllowCustomIsolationLevels(true);
        return jtaTransactionManager;
    }*/
}

第三步:測試

1、編寫測試接口在Swagger接口文檔中測試,放開注釋,則兩個數據庫都能不能添加數據,取消注釋,則事務正常提交,兩個數據庫中都能成功添加。

    @Override
    @Transactional(propagation = Propagation.REQUIRED)
    public Integer addAll() {
        User user = new User();
        user.setName("A");
        user.setSex("男");
        user.setAge(20);
        int count = userDao.add(user);

        Goods goods = new Goods();
        goods.setName("三星");
        goods.setPrice(3000d);
        count += goodsService.add(goods);
        // int i = 1 / 0;
        return count;
    }

寫在最后:Atomikos雖然提供了一種分布式事務的解決方案,但對於高並發場景下存在很大的性能問題,而且相關文檔說明不是很全,在項目中使用的並不是很多,建議研究學習了解下阿里的開源的分布式事務框架 Seata,提供了完備的解決方案。

官方地址:http://seata.io/zh-cn/docs/overview/what-is-seata.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM