分布式事務:Seata框架AT模式及TCC模式執行流程剖析


Seata角色術語

TC - 事務協調者

維護全局和分支事務的狀態,驅動全局事務提交或回滾,即Seata服務端。

TM - 事務管理器

定義全局事務的范圍:開始全局事務、提交或回滾全局事務,在事務發起的客戶端。

RM - 資源管理器

管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾,在分支事務執行的客戶端。

Seata執行流程

Seata AT模式

流程圖解

第一階段

通過代理數據源DataSourceProxy對業務SQL進行解析,轉換成undolog,並與業務SQL在一個事務內入庫,然后注冊分支事務、提交、上報狀態。

 

第二階段

分布式事務操作成功,則TC通知RM異步刪除undolog。

 

分布式事務操作失敗,TM向TC發送回滾請求,RM 收到協調器TC發來的回滾請求,通過 XID 和 Branch ID 找到相應的回滾日志記錄,通過回滾記錄生成反向的更新 SQL 並執行,以完成分支的回滾。

 

工作機制

以一個示例來說明整個 AT 分支的工作過程。

業務表:product

Field

Type

Key

id

bigint(20)

PRI

name

varchar(100)

 

since

varchar(100)

 

AT 分支事務的業務邏輯:

update product set name = 'GTS' where name = 'TXC';
一階段

過程:

  1. 解析 SQL:得到 SQL 的類型(UPDATE),表(product),條件(where name = 'TXC')等相關的信息。
  2. 查詢前鏡像:根據解析得到的條件信息,生成查詢語句,定位數據。
select id, name, since from product where name = 'TXC';

得到前鏡像:

id

name

since

1

TXC

2014

 

  1. 執行業務 SQL:更新這條記錄的 name 為 'GTS'。
  2. 查詢后鏡像:根據前鏡像的結果,通過 主鍵 定位數據。
select id, name, since from product where id = 1;

得到后鏡像:

id

name

since

1

GTS

2014

  1. 插入回滾日志:把前后鏡像數據以及業務 SQL 相關的信息組成一條回滾日志記錄,插入到 UNDO_LOG 表中。
{
    "branchId": 641789253,
    "undoItems": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "GTS"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "TXC"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "sqlType": "UPDATE"
    }],
    "xid": "xid:xxx"
}
  1. 提交前,向 TC 注冊分支:申請 product 表中,主鍵值等於 1 的記錄的 全局鎖 。
  2. 本地事務提交:業務數據的更新和前面步驟中生成的 UNDO LOG 一並提交。
  3. 將本地事務提交的結果上報給 TC。
二階段-回滾
  1. 收到 TC 的分支回滾請求,開啟一個本地事務,執行如下操作。
  2. 通過 XID 和 Branch ID 查找到相應的 UNDO LOG 記錄。
  3. 數據校驗:拿 UNDO LOG 中的后鏡與當前數據進行比較,如果有不同,說明數據被當前全局事務之外的動作做了修改。這種情況,需要根據配置策略來做處理,詳細的說明在另外的文檔中介紹。
  4. 根據 UNDO LOG 中的前鏡像和業務 SQL 的相關信息生成並執行回滾的語句:
update product set name = 'TXC' where id = 1;
  1. 提交本地事務。並把本地事務的執行結果(即分支事務回滾的結果)上報給 TC。
二階段-提交
  1. 收到 TC 的分支提交請求,把請求放入一個異步任務的隊列中,馬上返回提交成功的結果給 TC。
  2. 異步任務階段的分支提交請求將異步和批量地刪除相應 UNDO LOG 記錄。

事務使用

我們只需要使用一個 @GlobalTransactional 注解在業務方法上:

 @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        ......
    }

源碼分析

相關配置

SeataAutoConfiguration

Spring自動配置類中配置了全局事務掃描器GlobalTransactionScanner。

GlobalTransactionScanner

GlobalTransactionScanner內容如下。

可以看到分別實現了Spring的3個接口InitializingBean,ApplicationContextAware,DisposableBean。

在afterPropertiesSet()中,調用了initClient方法:

initClient方法里面對TmClient,RmClient進行了初始化(參數就是配置文件bean里配置的applicationId和txServiceGroup),並注冊了一個Spring的ShutdownHook。

TmClient.init()

TmClient的初始化方法。

其最終調用到的是AbstractNettyRemotingClient的init()方法,啟動了一個定時器不斷進行重連操作。

NettyClientChannelManager的reconnect方法內容如下:

方法getAvailServerList內從注冊中心獲取服務器列表。

RegistryFactory.getInstance().lookup(transactionServiceGroup)是針對不同注冊中心做了適配的,默認看下File形式的實現。

進到FileRegistryServiceImpl#lookup方法,這里結合File.conf配置來說明。

 

FileRegistryServiceImpl的服務器查找方法如下:

 

1、現根據事務分組(key=vgroup_mapping.事務分組名稱)找到分組所屬的server集群名稱,這里是default

2、然后根據集群名稱(key=集群名稱.grouplist)找到server對應ip端口地址

梳理下TmClient的初始化流程:

  1. 啟動定時執行器,每10秒嘗試進行一次重連seata-server
  2. 重連時,先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name)
  3. 再根據集群名稱找到seata-serverr集群ip端口列表
  4. 從ip列表中選擇一個用netty進行連接
RmClient.init()

RmClient的初始化方法。

1、設置了資源管理器resourceManager

2、設置了消息回調監聽器,rmHandler用於接收seata-server在二階段發出的提交或者回滾請求

ResourceManager管理資源的注冊和注銷。

RMHandlerAT在收到TC二階段回滾消息時執行回滾。

第一階段

攔截器中開啟事務

在需要加全局事務的方法中,會加上GlobalTransactional注解,注解往往對應着攔截器,Seata中攔截全局事務的攔截器是GlobalTransactionalInterceptor,看下其攔截方法。

判斷:

l  如果方法上有全局事務注解,調用handleGlobalTransaction開啟全局事務

l  如果沒有,按普通方法執行,避免性能下降

看下handleGlobalTransaction()方法:

可以看到最終調用的是TransactionalTemplate的execute方法,execute方法如下:

分為幾步:

l  開啟全局事務beginTransaction

l  執行業務方法

l  提交事務commitTransaction(若沒拋異常)

l  執行completeTransactionAfterThrowing回滾操作(拋異常)

beginTransaction最終調用到了io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)方法,代碼如下:

  1. 調用transactionManager.begin()方法通過TmNettyRemotingClient與server通信並生成一個xid
  2. 將xid綁定到Root上下文中

看到這里,也就明確了一點,全局事務開啟時,是由TM來發起的。

commitTransaction和rollbackTransaction方法類似,由TM發送事務commit或rollback信息給seata-server。

SQL解析與undolog生成

由於Seata對數據源做了代理,所以sql解析與undolog入庫操作是在數據源代理中執行的。數據源代理類創建器的配置。

DataSourceProxy,ConnectionProxy,StatementProxy是Seata提供的代理封裝類。

最終對Sql進行解析操作,發生在StatementProxy類中:

然后交給了ExecuteTemplate執行,跟到ExecuteTemplate中查看:

  1. 先判斷是否開啟了全局事務,如果沒有,不走代理,不解析sql,避免性能下降
  2. 調用SQLVisitorFactory對目標sql進行解析
  3. 針對特定類型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等進行特殊解析
  4. 執行sql並返回結果

關鍵點在於特定類型執行器中的execute方法,挑選InsertExecutor為例說明,其execute方法調用的是父類BaseTransactionalExecutor中的execute方法,看下源碼。

將ROOT上下文中的xid綁定到了connectionProxy中,並調用了doExecute方法,看下AbstractDMLBaseExecutor中的doExecute方法。

查看代碼,生成undolog在executeAutoCommitFalse方法中:

executeAutoCommitTrue中先將autoCommit設置為false(因為要對sql進行解析,生成undolog在一個事務中入庫,避免提前入庫)。

再執行到executeAutoCommitFalse中,分為4步:

  1. 獲取sql執行前鏡像beforeImage
  2. 執行sql
  3. 獲取sql執行后afterimage
  4. 根據beforeImage,afterImage生成undolog記錄並添加到connectionProxy的上下文中

到此為止,紅色框中幾步已經完成。

 

分支事務注冊與事務提交

業務sql執行以及undolog執行完后會在ConnectionProxy中執行commit操作,

看下代碼。

1、如果處於全局事務中,則調用processGlobalTransactionCommit()處理全局事務提交

2、如果加了全局鎖注解,加全局鎖並提交

3、如果沒有對應注釋,按直接進行事務提交

主要看processGlobalTransactionCommit()方法,也是核心代碼:

流程分為如下幾步:

  1. 注冊分支事務register(),並將branchId分支id綁定到上下文中。
  2. UndoLogManager.flushUndoLogs(this) 如果包含undolog,則將之前綁定到上下文中的undolog進行入庫。
  3. 提交本地事務。
  4. 如果操作失敗,report()中通過RM提交第一階段失敗消息,如果成功,report()提交第一階段成功消息。

undolog入庫和普通業務sql的執行用的一個connection,處於一個本地事務中,保證了業務數據變更時,一定會有對應undolog存在。

至此,第一階段中undolog提交與本地事務提交,分支事務注冊與匯報也已完成。

第二階段

在前面分析RmClient.init()方法時,提到了Seata會使用SPI拓展機制找到RmClient的回調處理器RMHandlerAT,該類是負責接送二階段seata-server發給RmClient的提交、回滾消息,並作出提交,回滾操作。

RMHandlerAT繼承自AbstractRMHandler,AbstractRMHandler中兩個handle方法對應,事務提交、回滾操作。

全局事務提交

全局事務提交對應了doBranchCommit(request, response)方法。

調用的是getResourceManager(),上面提到SPI拓展提到的DataSourceManager類。

DataSourceManager中調用了asyncWorker來異步提交,看下AsyncWorker中branchCommit方法。

這邊只是往一個ASYNC_COMMIT_BUFFER緩沖List中新增了一個二階段提交的context。

但真正提交在哪呢?答案在AsyncWorker的init()方法中,其init()方法會在DataSourceManager中被調用,內部啟動一個定時器不斷進行全局事務提交操作。

真正的分支事務提交就是在doBranchCommits中完成的,主要工作是刪除回滾日志。

主要分為幾步:

  1. 先按resourceId(也就是數據連接)對提交操作進行分組,一個數據庫的可以一起操作,提升效率
  2. 根據resourceId找到對應DataSourceProxy,並獲取一個普通的數據庫連接getPlainConnection(),估計這本身不需要做代理操作,故用了普通的數據庫連接
  3. 調用UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn)刪除undolog

回過頭來看下設計原理圖:

 

全局事務回滾

接下來我們看看全局事務回滾的方法,AbstractRMHandler#doBranchRollback 。

該方法調用了DataSourceManager的branchRollback方法。

最終回滾方法調用的是UndoLogManager.undo(dataSourceProxy, xid, branchId),大體是根據Undolog進行反解析並執行回滾操作。

然后進行回滾日志的清理和提交。

具體的Undolog反解析操作實現在AbstractUndoExecutor的子類中。

再回頭看下回滾設計原理圖:

 

Seata TCC模式

流程圖解

TCC執行流程如下圖所示:

大致流程如下:

1.全局事務攔截器攔截到@GlobalTransational注解,調用TM開啟全局事務

2.執行TCC參與者的prepare方法時,被TCC攔截器攔截,在prepare方法執

行前注冊分支事務到TC,在prepare方法執行后向TC報告分支事務的狀態

3.如果執行發生異常則TM通知TC回滾事務,否則TM通知TC執行提交事務

4.TC收到TM的提交或回滾通知,遍歷各TCC分支事務,逐個進行提交或回滾

事務使用

跟AT模式一樣,TCC模式也通過@GlobalTransactional注解開啟全局事務,然后調用各個兩階段參與者的prepare方法即可。

兩階段的參與者格式如下:

l  TwoPhaseBusinessAction注解標記這是個TCC接口,同時指定commitMethod,rollbackMethod的名稱

l  BusinessActionContext是TCC事務中的上下文對象

l  BusinessActionContextParameter注解標記的參數會在上下文中傳播,即能通過BusinessActionContext對象在commit方法及cancle方法中取到該參數值

源碼分析

TCC資源注冊

GlobalTransactionScanner繼承了AbstractAutoProxyCreator抽象類,並重新實現了wrapIfNecessary接口,該接口用來在spring啟動時,生成代理類。

看下重寫的wrapIfNecessary方法。

可以看到這段邏輯中,判斷了bean如果是個TCC的接口實現,則將攔截器初始化為TccActionInterceptor,TccActionInterceptor是TCC方法的核心攔截器,后面會具體介紹,先跟到TCCBeanParserUtils.isTccAutoProxy()中看下源碼。

isTccAutoProxy()中又會調用DefaultRemotingParser#parserRemotingServiceInfo來進行TCC資源注冊。

 

可以看到,通過反射拿到了TwoPhaseBusinessAction注解中聲明的Commit方法和Rollback方法並封裝成TCCResource對象,最終調用ResourceManager的registerResource方法。

TCC模式下ResourceManger的實現為TCCResourceManager,AbstractRMHandler的實現為RMHandlerTCC。

跟到TCCResourceManager中查看registerResource方法。

看到將TCCResource對象存儲在本地Map中,方便后續通過ResourceId找到對應Resource來進行提交,回滾操作。super.registerResource代碼如下,通過RmNettyRemotingClient發送rpc請求給Seata-server進行資源注冊。

至此,本地內存中會有個TCCResourceCache,注冊完成后,seata-server端也會有個TCC的資源列表。

服務端接收RM注冊信息的接口在DefaultServerMessageListenerImpl 的onRegRmMessage中,看下代碼。

最終調用了ChannelManager.registerRMChannel方法。

服務端也會對RpcContext進行緩存,緩存Map嵌套層次較多,最外層key為resourceId,往內一次是applicationId,clientIIp,port。

至此,TCC資源管理器RM已完成注冊,本地及服務端均有以resourceId為key的緩存Map。

開啟TCC全局事務

TCC模式業務調用方和AT模式一樣,需要使用GlobalTransactional注解來開啟全局事務。

業務方法執行時,最終會被AT模式源碼分析中提到過的攔截器GlobalTransactionalInterceptor攔截,開啟一個全局事務,獲得全局事務id,即xid。

具體代碼是TransactionalTemplate的execute方法,execute方法如下:

分為一下幾步:

1.開啟全局事務beginTransaction(TM與TC通信並獲得Xid)

服務端接收全局事務開啟請求的方法在DefaultCore的begin方法中,可以看到創建了一個GlabalSession。

2.執行業務方法

3.提交事務commitTransaction(TM與TC通信,發起事務提交請求)

4.如果發生異常,執行completeTransactionAfterThrowing回滾操作(TM與TC通信,發起事務回滾請求)

 

TCC攔截器-注冊分支事務

TCC注冊過程分析時,如果bean是個TCC的bean(即bean中方法包含TwoPhaseBusinessAction注解),會初始出TccActionInterceptor攔截器,其實現了MethodInterceptor,這也是TCC接口的方法級別核心攔截器。

看下源碼中的invoke方法:

方法調用了actionInterceptorHandler.proceed方法:

接着看doTccActionLogStore方法:

 

服務端接收分支注冊的代碼也在DefaultCore(見Seata源碼)中,代碼如下:

至此,TCC分支事務注冊完畢。

全局事務提交

TransactionalTemplate的execute方法中,若業務執行無異常,則會調用commitTransaction方法。

最終調用的DefaultGlobalTransaction的commit方法。

其中調用TM的commit方法,來通知TC對全局事務進行提交。

TC收到commit消息的處理在DefaultCore(見Seata源碼)的commit方法中,查看代碼:

分為幾步:

  1. 根據xid取出GlabalSession
  2. 關閉Session,防止再有分支注冊盡量
  3. 修改狀態為提交中
  4. 如果可以異步提交,則異步提交,否則同步提交

看下同步提交代碼doGlobalCommit:

遍歷每個branchSession,對每個分支事務進行提交,失敗會無限重試。

resourceManagerInbound.branchCommit方法會調用DefaultCoordinator中branchCommit方法來與TCC資源管理器通信,發送分支事務提交消息,這里sendSyncRequest方法中就會根據resourceId去找到第一步(TCC資源管理器注冊)中RpcContext的緩存,並得到對應Channel來建立Netty通信。

各TCC資源管理器接收到分支事務提交請求后,會調用TCCResourceManager的branchCommit方法實際對事務進行提交。

自此客戶端收到seata-server提交信息后,完成了對分支事務的提交。

總結一下全局事務提交的大致流程:

  1. 業務方調用微服務無異常,通過TM發起事務提交請求
  2. TC接收到事務提交請求后,通過Xid找到全局事務,再取出所有分支事務
  3. 遍歷分支事務,發出分支事務提交請求
  4. TCC資源管理器RM接收到提交請求后,從本地TCCResource緩存中根據resourceId取出對應方法bean,反射調用commit方法

全局事務回滾

全局事務回滾思路與全局事務提交過程基本一致。

全局事務回滾的大致流程:

1.    業務方調用微服務發生異常,通過TM發起事務回滾請求

2.    TC接收到事務回滾請求后,通過Xid找到全局事務,再取出所有分支事務

3.    遍歷分支事務,發出分支事務回滾請求

4.    TCC資源管理器RM接收到回滾請求后,從本地TCCResource緩存中根據resourceId取出對應方法bean,反射調用rollback方法

 

到此,我們完成了對Seata框架AT模式和TCC模式完整執行流程的分析。

 


作者:朝雨憶輕塵
出處:https://www.cnblogs.com/xifengxiaoma/ 
版權所有,歡迎轉載,轉載請注明原文作者及出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM