AT 模式下,把每個數據庫被當做是一個 Resource,Seata 里稱為 DataSource Resource。業務通過 JDBC 標准接口訪問數據庫資源時,Seata 框架會對所有請求進行攔截,做一些操作。每個本地事務提交時,Seata RM(Resource Manager,資源管理器) 都會向 TC(Transaction Coordinator,事務協調器) 注冊一個分支事務。當請求鏈路調用完成后,發起方通知 TC 提交或回滾分布式事務,進入二階段調用流程。此時,TC 會根據之前注冊的分支事務回調到對應參與者去執行對應資源的第二階段。TC 是怎么找到分支事務與資源的對應關系呢?每個資源都有一個全局唯一的資源 ID,並且在初始化時用該 ID 向 TC 注冊資源。在運行時,每個分支事務的注冊都會帶上其資源 ID。這樣 TC 就能在二階段調用時正確找到對應的資源。
Seata 中有三大模塊,分別是 TM、RM 和 TC。 其中 TM 和 RM 是作為 Seata 的客戶端與業務系統集成在一起,TC 作為 Seata 的服務端獨立部署。

在 Seata 中,分布式事務的執行流程:
- TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
- 按業務場景,編排數據庫、服務等事務內資源(RM 向 TC 匯報資源准備狀態 );
- TM 結束分布式事務,事務一階段結束(TM 通知 TC 提交/回滾分布式事務);
- TC 匯總事務信息,決定分布式事務是提交還是回滾;
- TC 通知所有 RM 提交/回滾 資源,事務二階段結束。
Seata 會有 4 種分布式事務解決方案,分別是 AT 模式、TCC 模式、Saga 模式和 XA 模式,本文主要介紹AT模式。
Seata類圖
TC類圖
TC在Seata中其實就是Server端,Client端其實屬於RM和TM,但是為了看起來完整,把Client端的RpcCient和Listener也放在了這里,類圖如下圖所示:

CLient listenner監聽服務端發過來的消息,從而進行相應的操作。
RM類圖
RM相關類圖如下圖所示:

TM類圖
TM相關類圖如下圖所示:

AT模式
AT 模式是一種無侵入的分布式事務解決方案。在 AT 模式下,用戶只需關注自己的“業務 SQL”,用戶的 “業務 SQL” 作為一階段,Seata 框架會自動生成事務的二階段提交和回滾操作。

在分析AT模式前,我們先提問幾個問題:
1、分布式Seata,怎么在系統中集成使用?
2、開始事務前,做了什么初始化工作?
3、AT模式具體怎么實現的?
初始化工作
Seata在GlobalTransactionScanner中進行了TM和RM初始化:
private void initClient() { //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); //Register Spring ShutdownHook registerSpringShutdownHook(); }
其中TMClient的初始化如下:
public static void init(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup); tmRpcClient.init(); }
其中初始化了一個TmRpcClient對象,它的init方法中,會啟動與Server的重連接線程(每5秒),以及消息發送線程和超時線程。重連接事,按照配置方式,尋找Server信息,比如采用file形式,那么首先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name),再根據集群名稱找到fescar-server集群ip端口列表,然后從ip列表中選擇一個用netty進行連接。
RmClient的初始化如下:
public static void init(String applicationId, String transactionServiceGroup) { RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); rmRpcClient.setResourceManager(DefaultResourceManager.get()); rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); rmRpcClient.init(); }
其中,主要工作為:
- 創建一個RmRpcClient對象
- RmRpcClient對象設置ResourceManager;
- RmRpcClient對象設置消息Listener;
- RmRpcClient對象初始化
初始化階段總結:
1、Spring啟動時,初始化了2個客戶端TmClient、RmClient
2、TmClient與Server通過Netty建立連接並發送消息
3、RmClient與Server通過Netty建立連接,負責接收二階段提交、回滾消息並在回調器(RmHandler)中做處理
因此,使用Seata,需要初始化GlobalTransactionScanner進行初始化工作!
TM全局事務控制
那些方法需要進行全局事務控制了?Seata定義了注解GlobalTransactional,只要添加了注解的方法,就代表需要分布式事務。
注解對應着攔截器,Seata定義的攔截器為GlobalTransactionalInterceptor。其核心處理邏輯如下:
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } }
上面方法的邏輯為:如果業務方法上有全局事務GlobalTransactional注解時,就執行全局事務處理handleGlobalTransaction;如果業務方法上有全局鎖GlobalLock注解時,就執行全局鎖處理handleGlobalLock;否則,就按照普通方法執行。
handleGlobalTransaction中,會調用TransactionalTemplate的execute方法,execute方法如下:
public Object execute(TransactionalExecutor business) throws Throwable { // 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. begin transaction beginTransaction(txInfo, tx); Object rs = null; try { // 3. Do Your Business rs = business.execute(); } catch (Throwable ex) { // 4.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 5. everything is fine, commit. commitTransaction(tx); return rs; } finally { //6. clear triggerAfterCompletion(); cleanUp(); } }
1、創建或創建一個GlobalTransaction,並根據業務TransactionalExecutor獲取到事務需要的相關信息;
2、開始全局事務
開始全局事務的方法為beginTransaction方法,方法內部調用了DefaultGlobalTransaction.begin方法,而DefaultGlobalTransaction.begin方法的內部又調用了TransactionManager的begin方法,
xid = transactionManager.begin(null, null, name, timeout);
TransactionManager接口類的默認實現為DefaultTransactionManager,其begin方法為:
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); return response.getXid(); }
方法中,向服務器發起開始全局事務請求,服務會返回一個全局事務標示XID。OK,此時,由TM發起的一個全局事務就開始了!
3、執行業務邏輯
4、回滾全局事務
如果業務都執行沒有全部成功,那么就執行全局事務回滾,方法為completeTransactionAfterThrowing。方法執行邏輯與開始事務方法內部邏輯基本一致:內部調用了DefaultGlobalTransaction.rollback方法,而DefaultGlobalTransaction.rollback方法的內部又調用了TransactionManager的rollback方法,向服務器發起rollback,服務會返回GlobalStatus.
5、提交全局事務
如果業務都執行成功,那么就全局commit,方法為commitTransaction。方法執行邏輯與開始事務方法和回滾事務內部邏輯基本一致:內部調用了DefaultGlobalTransaction.commit方法,而DefaultGlobalTransaction.commit方法的內部又調用了TransactionManager的commit方法,向服務器發起Commit,服務會返回GlobalStatus.
6、清理
完成整個事務后,進行相關資源變量清理。
AT模式一階段
在一階段,Seata 會攔截“業務 SQL”,首先解析 SQL 語義,找到“業務 SQL”要更新的業務數據,在業務數據被更新前,將其保存成“before image”,然后執行“業務 SQL”更新業務數據,在業務數據更新之后,再將其保存成“after image”,最后生成行鎖。以上操作全部在一個數據庫事務內完成,這樣保證了一階段操作的原子性。

實現上,Seata對數據源做了封裝代理,然后對於數據源的操作處理,就由Seata內部邏輯完成了。看一個Demo中的數據源加載配置:
@Bean(name = "order") public DataSourceProxy masterDataSourceProxy(@Qualifier("originOrder") DataSource dataSource) { return new DataSourceProxy(dataSource); }
從Demo中可以看到,我們使用的是Seata封裝的代理數據源DataSourceProxy。DataSourceProxy初始化時,會進行Resouce注冊:
private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; Connection connection = dataSource.getConnection() jdbcUrl = connection.getMetaData().getURL(); dbType = JdbcUtils.getDbType(jdbcUrl, null); DefaultResourceManager.get().registerResource(this); }
Seata除了對數據庫的DataSource進行了封裝,同樣也對Connection,Statement進行了封裝代理,分別為ConnectionProxy和StatementProxy。
DataSourceProxy中重寫了DataSource的getConnection方法,以此來獲得ConnectionProxy,方法如下:
@Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); }
ConnectionProxy代理了Sql connection。我們重點關於一下其createStatement方法,prepareStatement方法,commit方法,rollback方法以及setAutoCommit方法。
ConnectionProxy的createStatement會返回一個代理的StatementProxy,如下:
@Override public Statement createStatement() throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); return new StatementProxy(this, targetStatement); }
ConnectionProxy的prepareStatement同樣會返回一個代理的PreparedStatementProxy,如下:
@Override public PreparedStatement prepareStatement(String sql) throws SQLException { PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql); return new PreparedStatementProxy(this, targetPreparedStatement, sql); }
StatementProxy和PreparedStatementProxy是對執行Sql的Statement的一個封裝,兩者基本一樣,我們看看StatementProxy的execute方法:
@Override
public boolean execute(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() { @Override public Boolean execute(T statement, Object... args) throws SQLException { return statement.execute((String) args[0]); } }, sql); }
可以看到,內部交給了ExecuteTemplate執行,ExecuteTemplate的execute方法如下:
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } if (sqlRecognizer == null) { sqlRecognizer = SQLVisitorFactory.get( statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor = null; if (sqlRecognizer == null) { executor = new PlainExecutor<T, S>(statementProxy, statementCallback); } else { switch (sqlRecognizer.getSQLType()) { case INSERT: executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<T, S>(statementProxy, statementCallback); break; } } T rs = null; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex