EasyTransaction主要源碼分析


EasyTransaction是一個全功能的分布式事務框架,以下特性摘抄自其首頁:https://github.com/QNJR-GROUP/EasyTransaction

  • 一個框架包含多種事務形態,一個框架搞定所有類型的事務
  • 多種事務形態可混合使用
  • 高性能,大多數業務系統瓶頸在業務數據庫,若不啟用框架的冪等功能,對業務數據庫的額外消耗僅為寫入25字節的一行
  • 可選的框架自帶冪等實現及調用錯亂次序處理,大幅減輕業務開發工作量,但啟用的同時會在業務數據庫增加一條冪等控制行
  • 業務代碼可實現完全無入侵
  • 支持嵌套事務
  • 無需額外部署協調者,不同APP的服務協調自身發起的事務,也避免了單點故障
  • 分布式事務ID可關聯業務ID,業務類型,APPID,便於監控各個業務的分布式事務執行情況

本文主要分享EasyTransaction core中各個package的作用其主要實現。

請先閱讀 Seata架構的比對思考 https://www.cnblogs.com/skyesx/p/10674700.html ,再結合 代碼以及demo調試過程看這篇,直接看的話這里的點太零碎了

一、context包

主要類

LogProcessContext

其用於存儲ET事務的上下文信息。在開啟ET事務(第一次ET遠程調用,或者主動調用startSoftTrans方法)時,將創建本類的實例並將其與Spring的本地事務上下文綁定,通過:

TransactionSynchronizationManager.bindResource()

執行綁定。當需要取得ET上下文時,通過

TransactionSynchronizationManager.getResource()

取得。

ET上下文中包含的主要內容有:

  • 最終事務狀態
  • 全部的全局事務日志
  • 未Flush到外部的全局事務日志
  • 事務ID等內容

二、包core

本包主要類為

EasyTransFacade
TransactionHook
ConsistentGuardian
ExecuteCacheManager

類EasyTransFacade

其定義了業務調用方的接口,只包含兩個:

public void startEasyTrans(String busCode,long trxId);

public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor, R extends Serializable> Future<R> execute(P params);

第一個用於開啟全局事務,主要的操作為:

  • 掛載TransactionHook到當前的Spring本地事務中,使得可以在關鍵節點(如本地事務提交前、本地事務回滾后等等)嵌入ET的代碼
  • 將 LogProcessContext 綁定到當前的Spring本地事務,使得ET可以在當前Spring本地事務中隨時取得ET全局事務的狀態。
  • 在當前已開啟的本地事務中,寫入一條事務執行記錄到業務庫中,其對Crash恢復時識別全局事務的狀態起關鍵作用

第二個表示執行某個遠程事務方法。

  • 通過調用參數Object對應的Class獲取對應的處理器(如TCC處理器,可靠消息處理器等)並執行調用,具體調用的形態后續專門的章節再繼續

基於注解的接口調用也是通過這兩個方法封裝而成。

類TransactionHook

其為ET框架代碼與Spring原生事務的主要交界點,ET通過TransactionSynchronization定義的方法,在Spring本地事務執行過程中,擴展支持了全局事務。主要擴展了以下兩個方法

beforeCommit(boolean readOnly)
afterCompletion(int status)

beforeCommit方法將會

  • 在Spring本地事務提交前將所有未落盤的全局事務日志落盤
  • 並執行所有未執行的遠程調用(ET會盡量延后全局事務以此堆積並批量執行)
  • 若有不成功的全局事務,則拋出異常,回滾事務(包括本地以及全局)

afterCompletion方法將會

  • 獲取本地事務的最終結果(提交/回滾/未知)以及 ET父級事務的狀態(提交/回滾/未知)來決定本級ET事務的最終狀態(提交/回滾/未知)
  • 獲得最終的本級ET事務狀態后,異步執行最終一致處理(調用consistentGuardian.process)

類ConsistentGuardian

本類用於處理ET全局事務的最終一致,例如TCC的Conifrim/Cancel,可靠消息的發送消息。

最終一致處理通常會在同步操作(TCC的TRY等)對應的本地事務執行完成后拋到線程池異步執行,但執行失敗的話,會有兜底的補償(recovery包),后續再詳細講述

該類的主要工作機制是根據之前寫入的全局事務日志,獲取日志對應的處理器(如從TCC的事務日志獲取對應的TCC日志處理器),以此

  • 判斷當前ET事務的最終狀態(若當前ET事務狀態仍未確定的話)
  • 傳入最終ET事務狀態到日志處理器,依次處理對應的事務日志,處理的典型過程例子:
    • 若存在TRY方法對應的日志
    • 並且找不到TRY對應的CONFRIM/CANCEL日志
    • 則根據ET最終事務狀態,調用對應CONFIRM/CANCEL方法

類ExecuteCacheManager

本類主要服務於ET的以下期望

  • 批量寫入ET事務日志(以減少IO)
  • 批量並發執行遠程業務調用(以減少串行等待遠程相應時間)

其主要實現的是,

  • 對每個傳入的Calleble對象都返回一個經過改寫的Futrure對象
  • 當任意一個Futrue的get方法都沒有被調用前,所有之前傳入的Callable對象都不會執行。
  • 當任意一個Future的get被調用時,所有callable都會被批量執行,這里包含了批量寫入日志以及批量並發執行遠程調用

三、包datasource

主要包含以下兩個接口,其主要作用於業務數據源。

DataSourceSelector
TransStatusLogger

類DataSourceSelector

該類主要用於獲取當前事務/請求對應的數據源及其事務管理器,若應用有多個業務數據源,則需要自行實現對應的數據源選擇器,主要包含以下方法

DataSource selectDataSource(String appId,String busCode,long trxId);
DataSource selectDataSource(String appId,String busCode,EasyTransRequest<?, ?> request);

第一個方法是開啟ET事務時候選擇對應的數據源

第二個方法是被調用方接受到請求時選擇對應的數據源(用於冪等、防懸掛處理,若不需要可忽略)

該接口包含一個默認實現,當只有單數據源時,可以直接用該實現

SingleDataSourceSelector

類TransStatusLogger

該類主要用來讀寫用於判斷ET事務狀態的記錄,該記錄會在ET事務開啟時,寫入當前的數據庫表中,隨着業務對應事務(Spring本地事務)提交而提交,回滾而回滾。

更具體請直接看實現

四、包executor

該包存儲的是事務發起方(遠程服務調用方)相關處理類的位置,不同的事務類型(TCC,可靠事務等)有不同的Executor,以TCC為例講解,其他的事務類型實現都類似。

TccMethodExecutor

該類實現了三個接口

  • EasyTransExecutor
  • LogProcessor
  • DemiLogEventHandler

EasyTransExecutor接口定義了方法

	<P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R  extends Serializable> Future<R> execute(Integer sameBusinessCallSeq, P params);

該方法供類EasyTransFacade.execute使用,其對應的是執行TCC里的TRY方法,具體的,它

  • 將TRY方法調用對應的RPC請求包裝成Runnable類
  • 構建本次調用對應的全局事務日志(主要包含本次調用的具體參數、對應遠程方法等)
  • 然后傳入上面章節提到的類ExecuteCacheManager方法中

LogProcessor接口定義了如何處理事務日志,其包含一個主要方法

	boolean logProcess(LogProcessContext ctx, Content currentContent) 

該方法將會判斷,如果傳入的日志類型是PreTccCallContent(TCC TRY請求對應的日志)的話,將會監聽該日志最終的配對信息(類ConsistentGuardian會在處理當前ET事務的日志后,發送消息,告知所有需要配對的日志的配對結果),如果

  • 監聽到成功配對(找到CONFIRM或者CANCEL對應的日志)的消息,則不再做后續處理
  • 監聽到配對失敗(沒有存在對應的CONFIRM/CANCEL日志)的消息,則根據當前的ET事務狀態執行對應的CONFIRM或者CANCEL操作,並記錄對應的日志

其他的事務形態的實現也類似,不再贅述

五、包recovery

用於兜底恢復事務,實現最終一致。

代碼不復雜,可以自行查看。

六、包Filter

該包主要用於實現ET對應的Filter,該Filter作用於被調用端。我們可以通過實現ET的Filter擴展被調用端的功能,如處理冪等、處理嵌套事務、增加調用上下文的處理等等。

七、包idempotent

實現冪等、方懸掛等處理對應的包

冪等及防懸掛處理的主要原理:

  • 當遠程調用過來時,寫入調用日志到當前的開啟的業務日志中,並記錄 調用對應的ID,調用參數對應的MD5
  • 有結果返回時就將結果更新存儲到日志中
  • 當有重復請求過來時,就檢查ID對應的記錄是否存在,若存在則檢查參數的MD5是否一致,若一致則返回之前的存儲結果
  • 防懸掛也類似,在上述的日志中,將會記錄調用的方法是什么,如
    • 當找不到請求對應日志時,但當前為cancel操作的話,框架將直接返回成功
    • 上述cancel已經成功執行后,try方法再來到時,發現cancel已經執行,就直接將try報錯返回

八、包idgen

用於生成ET的分布式事務ID,當自行制定ID時,本包對應的方法不會被調用。當不指定ID時,將會自動生成一個。

九、包log

定義ET事務日志對應的Class,以及其讀寫接口。
事務日志在之前TccExecutor等章節已提到,不再贅述。

需要擴展事務日志存儲實現的,直接實現以下接口即可

TransactionLogReader
TransactionLogWritter

(ET事務日志的讀寫不需要與業務事務在同一個事務中,也不能在同一個事務中)

十、包master

用於在同一個appId中選擇一個作為master進行兜底最終一致補償的包。

實際上選擇不需要太精確,任意一個appId下的實例均可,也可以同時有多個master存在(但目前沒有意義,也會浪費性能)

十一、包monitor

用於提供ET實例狀態的包,可供Dashboard,監控等擴展使用

十二、包protocol

供客戶直接定義分布式事務服務的包,其包含一些客戶直接使用的 父類、配置接口、配置注解等

十三、包provider.factory

從Spring中獲取並存儲對應的bean實現,以便於快速方便地通過ET對應的定義,取得對應bean

十四、包queue

若要擴展新增對應的消息隊列實現,則實現這個包對應的接口

十五、包rpc

同上

十六、包serialization

ET框架所使用的序列化形式,可自行擴展

十七、包stingcodec

用於壓縮字符串,將字符串替換為數字id,以提高存儲效率


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM