EasyTransaction是一個全功能的分布式事務框架,以下特性摘抄自其首頁:https://github.com/QNJR-GROUP/EasyTransaction
- 一個框架包含多種事務形態,一個框架搞定所有類型的事務
- 多種事務形態可混合使用
- 高性能,大多數業務系統瓶頸在業務數據庫,若不啟用框架的冪等功能,對業務數據庫的額外消耗僅為寫入25字節的一行
- 可選的框架自帶冪等實現及調用錯亂次序處理,大幅減輕業務開發工作量,但啟用的同時會在業務數據庫增加一條冪等控制行
- 業務代碼可實現完全無入侵
- 支持嵌套事務
- 無需額外部署協調者,不同APP的服務協調自身發起的事務,也避免了單點故障
- 分布式事務ID可關聯業務ID,業務類型,APPID,便於監控各個業務的分布式事務執行情況
本文主要分享EasyTransaction core中各個package的作用其主要實現。
請先閱讀 Seata架構的比對思考 https://www.cnblogs.com/skyesx/p/10674700.html ,再結合 代碼以及demo調試過程看這篇,直接看的話這里的點太零碎了
一、context包
主要類
LogProcessContext
其用於存儲ET事務的上下文信息。在開啟ET事務(第一次ET遠程調用,或者主動調用startSoftTrans方法)時,將創建本類的實例並將其與Spring的本地事務上下文綁定,通過:
TransactionSynchronizationManager.bindResource()
執行綁定。當需要取得ET上下文時,通過
TransactionSynchronizationManager.getResource()
取得。
ET上下文中包含的主要內容有:
- 最終事務狀態
- 全部的全局事務日志
- 未Flush到外部的全局事務日志
- 事務ID等內容
二、包core
本包主要類為
EasyTransFacade
TransactionHook
ConsistentGuardian
ExecuteCacheManager
類EasyTransFacade
其定義了業務調用方的接口,只包含兩個:
public void startEasyTrans(String busCode,long trxId);
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor, R extends Serializable> Future<R> execute(P params);
第一個用於開啟全局事務,主要的操作為:
- 掛載TransactionHook到當前的Spring本地事務中,使得可以在關鍵節點(如本地事務提交前、本地事務回滾后等等)嵌入ET的代碼
- 將 LogProcessContext 綁定到當前的Spring本地事務,使得ET可以在當前Spring本地事務中隨時取得ET全局事務的狀態。
- 在當前已開啟的本地事務中,寫入一條事務執行記錄到業務庫中,其對Crash恢復時識別全局事務的狀態起關鍵作用
第二個表示執行某個遠程事務方法。
- 通過調用參數Object對應的Class獲取對應的處理器(如TCC處理器,可靠消息處理器等)並執行調用,具體調用的形態后續專門的章節再繼續
基於注解的接口調用也是通過這兩個方法封裝而成。
類TransactionHook
其為ET框架代碼與Spring原生事務的主要交界點,ET通過TransactionSynchronization定義的方法,在Spring本地事務執行過程中,擴展支持了全局事務。主要擴展了以下兩個方法
beforeCommit(boolean readOnly)
afterCompletion(int status)
beforeCommit方法將會
- 在Spring本地事務提交前將所有未落盤的全局事務日志落盤
- 並執行所有未執行的遠程調用(ET會盡量延后全局事務以此堆積並批量執行)
- 若有不成功的全局事務,則拋出異常,回滾事務(包括本地以及全局)
afterCompletion方法將會
- 獲取本地事務的最終結果(提交/回滾/未知)以及 ET父級事務的狀態(提交/回滾/未知)來決定本級ET事務的最終狀態(提交/回滾/未知)
- 獲得最終的本級ET事務狀態后,異步執行最終一致處理(調用consistentGuardian.process)
類ConsistentGuardian
本類用於處理ET全局事務的最終一致,例如TCC的Conifrim/Cancel,可靠消息的發送消息。
最終一致處理通常會在同步操作(TCC的TRY等)對應的本地事務執行完成后拋到線程池異步執行,但執行失敗的話,會有兜底的補償(recovery包),后續再詳細講述
該類的主要工作機制是根據之前寫入的全局事務日志,獲取日志對應的處理器(如從TCC的事務日志獲取對應的TCC日志處理器),以此
- 判斷當前ET事務的最終狀態(若當前ET事務狀態仍未確定的話)
- 傳入最終ET事務狀態到日志處理器,依次處理對應的事務日志,處理的典型過程例子:
- 若存在TRY方法對應的日志
- 並且找不到TRY對應的CONFRIM/CANCEL日志
- 則根據ET最終事務狀態,調用對應CONFIRM/CANCEL方法
類ExecuteCacheManager
本類主要服務於ET的以下期望
- 批量寫入ET事務日志(以減少IO)
- 批量並發執行遠程業務調用(以減少串行等待遠程相應時間)
其主要實現的是,
- 對每個傳入的Calleble對象都返回一個經過改寫的Futrure對象
- 當任意一個Futrue的get方法都沒有被調用前,所有之前傳入的Callable對象都不會執行。
- 當任意一個Future的get被調用時,所有callable都會被批量執行,這里包含了批量寫入日志以及批量並發執行遠程調用
三、包datasource
主要包含以下兩個接口,其主要作用於業務數據源。
DataSourceSelector
TransStatusLogger
類DataSourceSelector
該類主要用於獲取當前事務/請求對應的數據源及其事務管理器,若應用有多個業務數據源,則需要自行實現對應的數據源選擇器,主要包含以下方法
DataSource selectDataSource(String appId,String busCode,long trxId);
DataSource selectDataSource(String appId,String busCode,EasyTransRequest<?, ?> request);
第一個方法是開啟ET事務時候選擇對應的數據源
第二個方法是被調用方接受到請求時選擇對應的數據源(用於冪等、防懸掛處理,若不需要可忽略)
該接口包含一個默認實現,當只有單數據源時,可以直接用該實現
SingleDataSourceSelector
類TransStatusLogger
該類主要用來讀寫用於判斷ET事務狀態的記錄,該記錄會在ET事務開啟時,寫入當前的數據庫表中,隨着業務對應事務(Spring本地事務)提交而提交,回滾而回滾。
更具體請直接看實現
四、包executor
該包存儲的是事務發起方(遠程服務調用方)相關處理類的位置,不同的事務類型(TCC,可靠事務等)有不同的Executor,以TCC為例講解,其他的事務類型實現都類似。
TccMethodExecutor
該類實現了三個接口
- EasyTransExecutor
- LogProcessor
- DemiLogEventHandler
EasyTransExecutor接口定義了方法
<P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(Integer sameBusinessCallSeq, P params);
該方法供類EasyTransFacade.execute使用,其對應的是執行TCC里的TRY方法,具體的,它
- 將TRY方法調用對應的RPC請求包裝成Runnable類
- 構建本次調用對應的全局事務日志(主要包含本次調用的具體參數、對應遠程方法等)
- 然后傳入上面章節提到的類ExecuteCacheManager方法中
LogProcessor接口定義了如何處理事務日志,其包含一個主要方法
boolean logProcess(LogProcessContext ctx, Content currentContent)
該方法將會判斷,如果傳入的日志類型是PreTccCallContent(TCC TRY請求對應的日志)的話,將會監聽該日志最終的配對信息(類ConsistentGuardian會在處理當前ET事務的日志后,發送消息,告知所有需要配對的日志的配對結果),如果
- 監聽到成功配對(找到CONFIRM或者CANCEL對應的日志)的消息,則不再做后續處理
- 監聽到配對失敗(沒有存在對應的CONFIRM/CANCEL日志)的消息,則根據當前的ET事務狀態執行對應的CONFIRM或者CANCEL操作,並記錄對應的日志
其他的事務形態的實現也類似,不再贅述
五、包recovery
用於兜底恢復事務,實現最終一致。
代碼不復雜,可以自行查看。
六、包Filter
該包主要用於實現ET對應的Filter,該Filter作用於被調用端。我們可以通過實現ET的Filter擴展被調用端的功能,如處理冪等、處理嵌套事務、增加調用上下文的處理等等。
七、包idempotent
實現冪等、方懸掛等處理對應的包
冪等及防懸掛處理的主要原理:
- 當遠程調用過來時,寫入調用日志到當前的開啟的業務日志中,並記錄 調用對應的ID,調用參數對應的MD5
- 有結果返回時就將結果更新存儲到日志中
- 當有重復請求過來時,就檢查ID對應的記錄是否存在,若存在則檢查參數的MD5是否一致,若一致則返回之前的存儲結果
- 防懸掛也類似,在上述的日志中,將會記錄調用的方法是什么,如
- 當找不到請求對應日志時,但當前為cancel操作的話,框架將直接返回成功
- 上述cancel已經成功執行后,try方法再來到時,發現cancel已經執行,就直接將try報錯返回
八、包idgen
用於生成ET的分布式事務ID,當自行制定ID時,本包對應的方法不會被調用。當不指定ID時,將會自動生成一個。
九、包log
定義ET事務日志對應的Class,以及其讀寫接口。
事務日志在之前TccExecutor等章節已提到,不再贅述。
需要擴展事務日志存儲實現的,直接實現以下接口即可
TransactionLogReader
TransactionLogWritter
(ET事務日志的讀寫不需要與業務事務在同一個事務中,也不能在同一個事務中)
十、包master
用於在同一個appId中選擇一個作為master進行兜底最終一致補償的包。
實際上選擇不需要太精確,任意一個appId下的實例均可,也可以同時有多個master存在(但目前沒有意義,也會浪費性能)
十一、包monitor
用於提供ET實例狀態的包,可供Dashboard,監控等擴展使用
十二、包protocol
供客戶直接定義分布式事務服務的包,其包含一些客戶直接使用的 父類、配置接口、配置注解等
十三、包provider.factory
從Spring中獲取並存儲對應的bean實現,以便於快速方便地通過ET對應的定義,取得對應bean
十四、包queue
若要擴展新增對應的消息隊列實現,則實現這個包對應的接口
十五、包rpc
同上
十六、包serialization
ET框架所使用的序列化形式,可自行擴展
十七、包stingcodec
用於壓縮字符串,將字符串替換為數字id,以提高存儲效率