AT 模式下,把每个数据库被当做是一个 Resource,Seata 里称为 DataSource Resource。业务通过 JDBC 标准接口访问数据库资源时,Seata 框架会对所有请求进行拦截,做一些操作。每个本地事务提交时,Seata RM(Resource Manager,资源管理器) 都会向 TC(Transaction Coordinator,事务协调器) 注册一个分支事务。当请求链路调用完成后,发起方通知 TC 提交或回滚分布式事务,进入二阶段调用流程。此时,TC 会根据之前注册的分支事务回调到对应参与者去执行对应资源的第二阶段。TC 是怎么找到分支事务与资源的对应关系呢?每个资源都有一个全局唯一的资源 ID,并且在初始化时用该 ID 向 TC 注册资源。在运行时,每个分支事务的注册都会带上其资源 ID。这样 TC 就能在二阶段调用时正确找到对应的资源。
Seata 中有三大模块,分别是 TM、RM 和 TC。 其中 TM 和 RM 是作为 Seata 的客户端与业务系统集成在一起,TC 作为 Seata 的服务端独立部署。

在 Seata 中,分布式事务的执行流程:
- TM 开启分布式事务(TM 向 TC 注册全局事务记录);
- 按业务场景,编排数据库、服务等事务内资源(RM 向 TC 汇报资源准备状态 );
- TM 结束分布式事务,事务一阶段结束(TM 通知 TC 提交/回滚分布式事务);
- TC 汇总事务信息,决定分布式事务是提交还是回滚;
- TC 通知所有 RM 提交/回滚 资源,事务二阶段结束。
Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式,本文主要介绍AT模式。
Seata类图
TC类图
TC在Seata中其实就是Server端,Client端其实属于RM和TM,但是为了看起来完整,把Client端的RpcCient和Listener也放在了这里,类图如下图所示:

CLient listenner监听服务端发过来的消息,从而进行相应的操作。
RM类图
RM相关类图如下图所示:

TM类图
TM相关类图如下图所示:

AT模式
AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。

在分析AT模式前,我们先提问几个问题:
1、分布式Seata,怎么在系统中集成使用?
2、开始事务前,做了什么初始化工作?
3、AT模式具体怎么实现的?
初始化工作
Seata在GlobalTransactionScanner中进行了TM和RM初始化:
private void initClient() { //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); //Register Spring ShutdownHook registerSpringShutdownHook(); }
其中TMClient的初始化如下:
public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); tmRpcClient.init(); }
其中初始化了一个TmRpcClient对象,它的init方法中,会启动与Server的重连接线程(每5秒),以及消息发送线程和超时线程。重连接事,按照配置方式,寻找Server信息,比如采用file形式,那么首先从file.conf中根据分组名称(service_group)找到集群名称(cluster_name),再根据集群名称找到fescar-server集群ip端口列表,然后从ip列表中选择一个用netty进行连接。
RmClient的初始化如下:
public static void init(String applicationId, String transactionServiceGroup) { RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); rmRpcClient.setResourceManager(DefaultResourceManager.get()); rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); rmRpcClient.init(); }
其中,主要工作为:
- 创建一个RmRpcClient对象
- RmRpcClient对象设置ResourceManager;
- RmRpcClient对象设置消息Listener;
- RmRpcClient对象初始化
初始化阶段总结:
1、Spring启动时,初始化了2个客户端TmClient、RmClient
2、TmClient与Server通过Netty建立连接并发送消息
3、RmClient与Server通过Netty建立连接,负责接收二阶段提交、回滚消息并在回调器(RmHandler)中做处理
因此,使用Seata,需要初始化GlobalTransactionScanner进行初始化工作!
TM全局事务控制
那些方法需要进行全局事务控制了?Seata定义了注解GlobalTransactional,只要添加了注解的方法,就代表需要分布式事务。
注解对应着拦截器,Seata定义的拦截器为GlobalTransactionalInterceptor。其核心处理逻辑如下:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } }
上面方法的逻辑为:如果业务方法上有全局事务GlobalTransactional注解时,就执行全局事务处理handleGlobalTransaction;如果业务方法上有全局锁GlobalLock注解时,就执行全局锁处理handleGlobalLock;否则,就按照普通方法执行。
handleGlobalTransaction中,会调用TransactionalTemplate的execute方法,execute方法如下:
public Object execute(TransactionalExecutor business) throws Throwable { // 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. begin transaction beginTransaction(txInfo, tx); Object rs = null; try { // 3. Do Your Business rs = business.execute(); } catch (Throwable ex) { // 4.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 5. everything is fine, commit. commitTransaction(tx); return rs; } finally { //6. clear triggerAfterCompletion(); cleanUp(); } }
1、创建或创建一个GlobalTransaction,并根据业务TransactionalExecutor获取到事务需要的相关信息;
2、开始全局事务
开始全局事务的方法为beginTransaction方法,方法内部调用了DefaultGlobalTransaction.begin方法,而DefaultGlobalTransaction.begin方法的内部又调用了TransactionManager的begin方法,
xid = transactionManager.begin(null, null, name, timeout);
TransactionManager接口类的默认实现为DefaultTransactionManager,其begin方法为:
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); return response.getXid(); }
方法中,向服务器发起开始全局事务请求,服务会返回一个全局事务标示XID。OK,此时,由TM发起的一个全局事务就开始了!
3、执行业务逻辑
4、回滚全局事务
如果业务都执行没有全部成功,那么就执行全局事务回滚,方法为completeTransactionAfterThrowing。方法执行逻辑与开始事务方法内部逻辑基本一致:内部调用了DefaultGlobalTransaction.rollback方法,而DefaultGlobalTransaction.rollback方法的内部又调用了TransactionManager的rollback方法,向服务器发起rollback,服务会返回GlobalStatus.
5、提交全局事务
如果业务都执行成功,那么就全局commit,方法为commitTransaction。方法执行逻辑与开始事务方法和回滚事务内部逻辑基本一致:内部调用了DefaultGlobalTransaction.commit方法,而DefaultGlobalTransaction.commit方法的内部又调用了TransactionManager的commit方法,向服务器发起Commit,服务会返回GlobalStatus.
6、清理
完成整个事务后,进行相关资源变量清理。
AT模式一阶段
在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

实现上,Seata对数据源做了封装代理,然后对于数据源的操作处理,就由Seata内部逻辑完成了。看一个Demo中的数据源加载配置:
@Bean(name = "order") public DataSourceProxy masterDataSourceProxy(@Qualifier("originOrder") DataSource dataSource) { return new DataSourceProxy(dataSource); }
从Demo中可以看到,我们使用的是Seata封装的代理数据源DataSourceProxy。DataSourceProxy初始化时,会进行Resouce注册:
private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; Connection connection = dataSource.getConnection() jdbcUrl = connection.getMetaData().getURL(); dbType = JdbcUtils.getDbType(jdbcUrl, null); DefaultResourceManager.get().registerResource(this); }
Seata除了对数据库的DataSource进行了封装,同样也对Connection,Statement进行了封装代理,分别为ConnectionProxy和StatementProxy。
DataSourceProxy中重写了DataSource的getConnection方法,以此来获得ConnectionProxy,方法如下:
@Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); }
ConnectionProxy代理了Sql connection。我们重点关于一下其createStatement方法,prepareStatement方法,commit方法,rollback方法以及setAutoCommit方法。
ConnectionProxy的createStatement会返回一个代理的StatementProxy,如下:
@Override public Statement createStatement() throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); return new StatementProxy(this, targetStatement); }
ConnectionProxy的prepareStatement同样会返回一个代理的PreparedStatementProxy,如下:
@Override public PreparedStatement prepareStatement(String sql) throws SQLException { PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql); return new PreparedStatementProxy(this, targetPreparedStatement, sql); }
StatementProxy和PreparedStatementProxy是对执行Sql的Statement的一个封装,两者基本一样,我们看看StatementProxy的execute方法:
@Override
public boolean execute(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() { @Override public Boolean execute(T statement, Object... args) throws SQLException { return statement.execute((String) args[0]); } }, sql); }
可以看到,内部交给了ExecuteTemplate执行,ExecuteTemplate的execute方法如下:
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } if (sqlRecognizer == null) { sqlRecognizer = SQLVisitorFactory.get( statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor = null; if (sqlRecognizer == null) { executor = new PlainExecutor<T, S>(statementProxy, statementCallback); } else { switch (sqlRecognizer.getSQLType()) { case INSERT: executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<T, S>(statementProxy, statementCallback); break; } } T rs = null; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex