1.基本介紹
jta(java Transaction API)+Atomikos(事務管理器) 底層原理是分布式事務的兩階段提交
2.兩階段提交(two phase commit)
2.1 說明
當一個事務跨多個節點時,為了保持事務的原子性與一致性,需要引入一個協調者(Coordinator)來統一掌控所有參與者(Participant)的操作結果,並指示它們是否要把操作結果進行真正的提交(commit)或者回滾(rollback)。這里數據庫充當的是參與者的角色。
2.2 原理
提交請求(投票)階段
- 協調者向所有參與者發送prepare請求與事務內容,詢問是否可以准備事務提交,並等待參與者的響應。
- 參與者執行事務中包含的操作,並記錄undo日志(用於回滾)和redo日志(用於重放),但不真正提交。
- 參與者向協調者返回事務操作的執行結果,執行成功返回yes,否則返回no。
提交(執行)階段
分為成功與失敗兩種情況。
若所有參與者都返回yes,說明事務可以提交:
- 協調者向所有參與者發送commit請求。
- 參與者收到commit請求后,將事務真正地提交上去,並釋放占用的事務資源,並向協調者返回ack。
- 協調者收到所有參與者的ack消息,事務成功完成。
若有參與者返回no或者超時未返回,說明事務中斷,需要回滾:
- 協調者向所有參與者發送rollback請求。
- 參與者收到rollback請求后,根據undo日志回滾到事務執行前的狀態,釋放占用的事務資源,並向協調者返回ack。
- 協調者收到所有參與者的ack消息,事務回滾完成
弊端
1.同步阻塞問題。執行過程中,所有參與節點都是事務阻塞型的。所以這樣很影響效率。
2.單點故障。由於協調者的重要性,一旦協調者發生故障。參與者會一直阻塞下去
3.仍然存在不一致風險。如果由於網絡異常等意外導致只有部分參與者收到了commit請求,就會造成部分參與者提交了事務而其他參與者未提交的情況。
3.編寫代碼
許多解釋在代碼中均有體現.
3.1 引入相關jar包
<dependencies>
<!--開啟AOP切面-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!--druid連接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.14</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<!--分布式事務-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
3.2 application.yml
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driverClassName: com.mysql.jdbc.Driver
druid:
# 主庫數據源
master:
url: jdbc:mysql://localhost:3306/ry?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
# 從庫數據源
slave:
# 是否開啟從數據源,默認關閉
enabled: true
url: jdbc:mysql://localhost:3306/data_catalog?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: 123456
# 初始連接數
initialSize: 5
# 最小連接池數量
minIdle: 10
# 最大連接池數量
maxActive: 20
# 配置獲取連接等待超時的時間
maxWait: 60000
# 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
timeBetweenEvictionRunsMillis: 60000
# 配置一個連接在池中最小生存的時間,單位是毫秒
minEvictableIdleTimeMillis: 300000
# 配置一個連接在池中最大生存的時間,單位是毫秒
maxEvictableIdleTimeMillis: 900000
# 配置檢測連接是否有效
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
webStatFilter:
enabled: true
statViewServlet:
enabled: true
# 設置白名單,不填則允許所有訪問
allow:
url-pattern: /druid/*
# 控制台管理用戶名和密碼
login-username:
login-password:
filter:
stat:
enabled: true
# 慢SQL記錄
log-slow-sql: true
slow-sql-millis: 1000
merge-sql: true
wall:
config:
multi-statement-allow: true
server:
port: 8000
3.3 自定義注解
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface MyDataSource {
/**
* 切換數據源名稱
*/
public DataSourceType value() default DataSourceType.MASTER;
}
3.4 自定義枚舉類
public enum DataSourceType {
/**
* 主庫
*/
MASTER,
/**
* 從庫
*/
SLAVE
}
3.5 自定義aop切面
@Aspect
@Component
@Order(1)
public class MyDataSourceAsp {
/**
* 掃描所有與這個注解有關的
* :@within:用於匹配所有持有指定注解類型內的方法和類;
* 也就是說只要有一個類上的有這個,使用@within這個注解,就能拿到下面所有的方法
*:@annotation:用於匹配當前執行方法持有指定注解的方法,而這個注解只針對方法
*
* 不添加掃描路徑,應該是根據啟動類的掃描范圍執行的
*/
@Pointcut("@annotation(com.shw.dynamic.annotation.MyDataSource) " +
"|| @within(com.shw.dynamic.annotation.MyDataSource)")
public void doPointCut() {
}
@Around("doPointCut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MyDataSource dataSource = getDataSource(joinPoint);
if (dataSource != null) {
RoutingDataSourceContext.setDataSourceRoutingKey(dataSource.value().name());
}
try {
// 繼續執行
return joinPoint.proceed();
} finally {
//關閉線程資源 在執行方法之后
RoutingDataSourceContext.close();
}
}
/**
* 獲取類或者方法上的注解
* 先獲取方法上的注解,然后在獲取類上的注解,這就實現了方法上數據源切換優先於類上的
* @param joinPoint 正在執行的連接點
* @return 注解
*/
private MyDataSource getDataSource(ProceedingJoinPoint joinPoint) {
MethodSignature method = (MethodSignature) joinPoint.getSignature();
// 獲取方法上的注解
MyDataSource annotation = method.getMethod().getAnnotation(MyDataSource.class);
if (annotation != null) {
return annotation;
} else {
// 獲取到這個注解上的類
Class<?> aClass = joinPoint.getTarget().getClass();
// 獲取到這個類上的注解
MyDataSource dataSource = aClass.getAnnotation(MyDataSource.class);
// 返回類上的注解
return dataSource;
}
}
}
3.6 編寫上下文數據源
public class RoutingDataSourceContext {
private static Logger logger = LoggerFactory.getLogger(RoutingDataSourceContext.class);
/**
* 使用ThreadLocal維護變量,ThreadLocal為每個使用該變量的線程提供獨立的變量副本,
* 所以每一個線程都可以獨立地改變自己的副本,而不會影響其它線程所對應的副本。
*/
private static final ThreadLocal<String> THREAD_LOCAL_DATA_SOURCE_KEY = new ThreadLocal<>();
/**
* 得到數據源名稱
* @return
*/
static String getDataSourceRoutingKey() {
String key = THREAD_LOCAL_DATA_SOURCE_KEY.get();
return key == null ? DataSourceType.MASTER.name() : key;
}
/**
* 設置數據源
* @param key
*/
public static void setDataSourceRoutingKey(String key) {
logger.info("切換到{}數據源",key);
THREAD_LOCAL_DATA_SOURCE_KEY.set(key);
}
/**
* 清空數據源設置
*/
public static void close() {
THREAD_LOCAL_DATA_SOURCE_KEY.remove();
}
}
3.7 druid連接池配置參數
@Configuration
public class DruidProperties {
@Value("${spring.datasource.druid.initialSize}")
private int initialSize;
@Value("${spring.datasource.druid.minIdle}")
private int minIdle;
@Value("${spring.datasource.druid.maxActive}")
private int maxActive;
@Value("${spring.datasource.druid.maxWait}")
private int maxWait;
@Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
private int timeBetweenEvictionRunsMillis;
@Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
private int minEvictableIdleTimeMillis;
@Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
private int maxEvictableIdleTimeMillis;
@Value("${spring.datasource.druid.validationQuery}")
private String validationQuery;
@Value("${spring.datasource.druid.testWhileIdle}")
private boolean testWhileIdle;
@Value("${spring.datasource.druid.testOnBorrow}")
private boolean testOnBorrow;
@Value("${spring.datasource.druid.testOnReturn}")
private boolean testOnReturn;
public DruidDataSource dataSource(DruidDataSource datasource) {
/** 配置初始化大小、最小、最大 */
datasource.setInitialSize(initialSize);
datasource.setMaxActive(maxActive);
datasource.setMinIdle(minIdle);
/** 配置獲取連接等待超時的時間 */
datasource.setMaxWait(maxWait);
/** 配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒 */
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
/** 配置一個連接在池中最小、最大生存的時間,單位是毫秒 */
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
/**
* 用來檢測連接是否有效的sql,要求是一個查詢語句,常用select 'x'。如果validationQuery為null,testOnBorrow、testOnReturn、testWhileIdle都不會起作用。
*/
datasource.setValidationQuery(validationQuery);
/** 建議配置為true,不影響性能,並且保證安全性。申請連接的時候檢測,如果空閑時間大於timeBetweenEvictionRunsMillis,執行validationQuery檢測連接是否有效。 */
datasource.setTestWhileIdle(testWhileIdle);
/** 申請連接時執行validationQuery檢測連接是否有效,做了這個配置會降低性能。 */
datasource.setTestOnBorrow(testOnBorrow);
/** 歸還連接時執行validationQuery檢測連接是否有效,做了這個配置會降低性能。 */
datasource.setTestOnReturn(testOnReturn);
return datasource;
}
}
3.8 數據源配置(重點)
@Configuration
@MapperScan(basePackages = DataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "sqlSessionTemplate")
public class DataSourceConfig {
static final String BASE_PACKAGES = "com.shw.dynamic.mapper";
private static final String MAPPER_LOCATION = "classpath:mybatis/mapper/*.xml";
/***
* 創建 DruidXADataSource master 用@ConfigurationProperties 自動配置屬性
*/
@Bean(name = "druidDataSourceMaster")
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource druidDataSourceMaster(DruidProperties properties) {
DruidXADataSource druidXADataSource = new DruidXADataSource();
return properties.dataSource(druidXADataSource);
}
/***
* 創建 DruidXADataSource slave
*/
@Bean(name = "druidDataSourceSlave")
@ConfigurationProperties("spring.datasource.druid.slave")
public DataSource druidDataSourceSlave(DruidProperties properties) {
DruidXADataSource druidXADataSource = new DruidXADataSource();
return properties.dataSource(druidXADataSource);
}
/**
* 創建支持 XA 事務的 Atomikos 數據源 master
*/
@Bean(name = "dataSourceMaster")
public DataSource dataSourceMaster(@Qualifier(value = "druidDataSourceMaster") DataSource druidDataSourceMaster) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceMaster);
// 必須為數據源指定唯一標識
sourceBean.setPoolSize(5);
sourceBean.setTestQuery("SELECT 1");
sourceBean.setUniqueResourceName("master");
return sourceBean;
}
/**
* 創建支持 XA 事務的 Atomikos 數據源 slave
*/
@Bean(name = "dataSourceSlave")
public DataSource dataSourceSlave(@Qualifier(value = "druidDataSourceSlave") DataSource druidDataSourceSlave) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceSlave);
sourceBean.setPoolSize(5);
sourceBean.setTestQuery("SELECT 1");
sourceBean.setUniqueResourceName("slave");
return sourceBean;
}
/**
* @param dataSourceMaster 數據源 master
* @return 數據源 master 的會話工廠
*/
@Bean(name = "sqlSessionFactoryMaster")
@Primary
public SqlSessionFactory sqlSessionFactoryMaster(@Qualifier(value = "dataSourceMaster") DataSource dataSourceMaster)
throws Exception {
return createSqlSessionFactory(dataSourceMaster);
}
/**
* @param dataSourceSlave 數據源 slave
* @return 數據源 slave 的會話工廠
*/
@Bean(name = "sqlSessionFactorySlave")
public SqlSessionFactory sqlSessionFactorySlave(@Qualifier(value = "dataSourceSlave") DataSource dataSourceSlave)
throws Exception {
return createSqlSessionFactory(dataSourceSlave);
}
/***
* sqlSessionTemplate 與 Spring 事務管理一起使用,以確保使用的實際 SqlSession 是與當前 Spring 事務關聯的,
* 此外它還管理會話生命周期,包括根據 Spring 事務配置根據需要關閉,提交或回滾會話
* @param sqlSessionFactoryMaster 數據源 master
* @param sqlSessionFactorySlave 數據源 slave
*/
@Bean(name = "sqlSessionTemplate")
public MySqlSessionTemplate sqlSessionTemplate(@Qualifier(value = "sqlSessionFactoryMaster") SqlSessionFactory sqlSessionFactoryMaster,
@Qualifier(value = "sqlSessionFactorySlave") SqlSessionFactory sqlSessionFactorySlave) {
Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
sqlSessionFactoryMap.put(DataSourceType.MASTER.name(), sqlSessionFactoryMaster);
sqlSessionFactoryMap.put(DataSourceType.SLAVE.name(), sqlSessionFactorySlave);
MySqlSessionTemplate customSqlSessionTemplate = new MySqlSessionTemplate(sqlSessionFactoryMaster);
customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
/***
* 自定義會話工廠
* @param dataSource 數據源
* @return :自定義的會話工廠
*/
private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
//配置駝峰命名
configuration.setMapUnderscoreToCamelCase(true);
//配置sql日志
configuration.setLogImpl(StdOutImpl.class);
factoryBean.setConfiguration(configuration);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
//配置讀取mapper.xml路徑
factoryBean.setDataSource(dataSource);
// 配置別名
factoryBean.setTypeAliasesPackage("com.shw.**");
factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));
return factoryBean.getObject();
}
}
3.9 (重點)重寫SqlSessionTemplate,也就是把SqlSessionTemplate這個類copy一份,修改getSqlSessionFactory這個方法返回值.
public class MySqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory;
private final ExecutorType executorType;
private final SqlSession sqlSessionProxy;
private final PersistenceExceptionTranslator exceptionTranslator;
private Map<Object, SqlSessionFactory> targetSqlSessionFactories;
private SqlSessionFactory defaultTargetSqlSessionFactory;
/**
* 通過Map傳入
* @param targetSqlSessionFactories
*/
public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) {
this.targetSqlSessionFactories = targetSqlSessionFactories;
}
public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
}
public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
}
public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
.getEnvironment().getDataSource(), true));
}
public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
super(sqlSessionFactory, executorType, exceptionTranslator);
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
new SqlSessionInterceptor());
this.defaultTargetSqlSessionFactory = sqlSessionFactory;
}
//通過DataSourceContextHolder獲取當前的會話工廠
@Override
public SqlSessionFactory getSqlSessionFactory() {
String dataSourceKey = RoutingDataSourceContext.getDataSourceRoutingKey();
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
} else {
Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required");
Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required");
}
return this.sqlSessionFactory;
}
@Override
public Configuration getConfiguration() {
return this.getSqlSessionFactory().getConfiguration();
}
@Override
public ExecutorType getExecutorType() {
return this.executorType;
}
@Override
public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
return this.exceptionTranslator;
}
/**
* {@inheritDoc}
*/
@Override
public <T> T selectOne(String statement) {
return this.sqlSessionProxy.<T> selectOne(statement);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T selectOne(String statement, Object parameter) {
return this.sqlSessionProxy.<T> selectOne(statement, parameter);
}
/**
* {@inheritDoc}
*/
@Override
public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
}
/**
* {@inheritDoc}
*/
@Override
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
}
/**
* {@inheritDoc}
*/
@Override
public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
}
/**
* {@inheritDoc}
*/
@Override
public <E> List<E> selectList(String statement) {
return this.sqlSessionProxy.<E> selectList(statement);
}
/**
* {@inheritDoc}
*/
@Override
public <E> List<E> selectList(String statement, Object parameter) {
return this.sqlSessionProxy.<E> selectList(statement, parameter);
}
/**
* {@inheritDoc}
*/
@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
}
/**
* {@inheritDoc}
*/
@Override
public void select(String statement, ResultHandler handler) {
this.sqlSessionProxy.select(statement, handler);
}
/**
* {@inheritDoc}
*/
@Override
public void select(String statement, Object parameter, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, handler);
}
/**
* {@inheritDoc}
*/
@Override
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
}
/**
* {@inheritDoc}
*/
@Override
public int insert(String statement) {
return this.sqlSessionProxy.insert(statement);
}
/**
* {@inheritDoc}
*/
@Override
public int insert(String statement, Object parameter) {
return this.sqlSessionProxy.insert(statement, parameter);
}
/**
* {@inheritDoc}
*/
@Override
public int update(String statement) {
return this.sqlSessionProxy.update(statement);
}
/**
* {@inheritDoc}
*/
@Override
public int update(String statement, Object parameter) {
return this.sqlSessionProxy.update(statement, parameter);
}
/**
* {@inheritDoc}
*/
@Override
public int delete(String statement) {
return this.sqlSessionProxy.delete(statement);
}
/**
* {@inheritDoc}
*/
@Override
public int delete(String statement, Object parameter) {
return this.sqlSessionProxy.delete(statement, parameter);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T getMapper(Class<T> type) {
return getConfiguration().getMapper(type, this);
}
/**
* {@inheritDoc}
*/
@Override
public void commit() {
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
@Override
public void commit(boolean force) {
throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
@Override
public void rollback() {
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
@Override
public void rollback(boolean force) {
throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
}
/**
* {@inheritDoc}
*/
@Override
public void clearCache() {
this.sqlSessionProxy.clearCache();
}
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() {
return this.sqlSessionProxy.getConnection();
}
/**
* {@inheritDoc}
* @since 1.0.2
*/
@Override
public List<BatchResult> flushStatements() {
return this.sqlSessionProxy.flushStatements();
}
/**
* Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
* unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
* the {@code PersistenceExceptionTranslator}.
*/
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final SqlSession sqlSession = getSqlSession(
MySqlSessionTemplate.this.getSqlSessionFactory(),
MySqlSessionTemplate.this.executorType,
MySqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory())) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (MySqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
Throwable translated = MySqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
closeSqlSession(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory());
}
}
}
}
3.10 事務管理器配置
@Configuration
@EnableTransactionManagement
public class XATransactionManagerConfig {
@Bean
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean
public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
TransactionManager transactionManager) {
return new JtaTransactionManager(userTransaction, transactionManager);
}
}
4.測試
@Controller
@RestController
public class Hello {
@Autowired
private HelloMapper helloMapper;
@GetMapping("/hello")
@Transactional(rollbackFor = Exception.class)
public List<Map> hello() {
List<Map> school = helloMapper.getSchool();
System.out.println(school);
List<Map> user = helloMapper.getCatalog();
System.out.println(user);
return null;
}
@GetMapping("/hi")
@Transactional(rollbackFor = Exception.class)
public List<Map> hi() {
helloMapper.insertCatalog();
int i = 1/0;
helloMapper.insertSchool();
return null;
}
}
public interface HelloMapper {
@MyDataSource(DataSourceType.SLAVE)
List<Map> getCatalog();
List<Map> getSchool();
@MyDataSource(DataSourceType.SLAVE)
void insertCatalog();
void insertSchool();
}
結論
在以上代碼的情況下,在需要進行數據源切換的時候,在接口上或方法上添加注解@MyDataSource(DataSourceType.SLAVE)切換到slave數據源,如果在方法上添加了事務,數據源依舊可以切換成功,且當添加事務的方法中發生了異常,整個方法都會回滾.至此,多數據源切換分布式事務問題解決成功.
參考文章:
https://blog.csdn.net/qq_35387940/article/details/103474353
git倉庫地址:
https://github.com/sunhuawei0517/dynamicDataSource/tree/jta
mster分支為多數據源切換,jta分支為多數據源+分布式事務