分布式事務之 Seata


Seata 是什么?

  官網 :https://seata.io/zh-cn/docs/overview/what-is-seata.html

  Seata 是一款開源的分布式事務解決方案,致力於在微服務架構下提供高性能和簡單易用的分布式事務服務。在 Seata 開源之前,Seata 對應的內部版本在阿里經濟體內部一直扮演着分布式一致性中間件的角色,幫助經濟體平穩的度過歷年的雙11,對各BU業務進行了有力的支撐。經過多年沉淀與積累,商業化產品先后在阿里雲、金融雲進行售賣。2019.1 為了打造更加完善的技術生態和普惠技術成果,Seata 正式宣布對外開源,未來 Seata 將以社區共建的形式幫助其技術更加可靠與完備。

Seata術語:

  • TC (Transaction Coordinator) - 事務協調者:維護全局和分支事務的狀態,驅動全局事務提交或回滾。
  • TM (Transaction Manager) - 事務管理器:定義全局事務的范圍:開始全局事務、提交或回滾全局事務。
  • RM (Resource Manager) - 資源管理器:管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾。

  其實 Seata 之所以能保證分布式事務的一致性,我的理解是Seata鎖扮演的角色跟Zookeeper是類似的。在服務進行拆分后進行多服務節點的部署,這使得多個節點的事務操作失去了聯系,而Seata 作為事務協調者,扮演着一個上帝視角。對於托管了事務操作的服務來說,Seata是完全可見的。事務是否提交/回滾。均要受到Seata的控制。也正是如此。Seata 可以控制分布式事務。Seata 提供了 AT、TCC、SAGA 和 XA 事務模式,為用戶打造一站式的分布式解決方案。

Seata 各事務模式

Seata AT 模式

  前提

  • 基於支持本地 ACID 事務的關系型數據庫。

  • Java 應用,通過 JDBC 訪問數據庫。

  整體機制

  兩階段提交協議的演變:

  • 一階段:業務數據和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源。

  • 二階段:

    • 提交異步化,非常快速地完成。

    • 回滾通過一階段的回滾日志進行反向補償。

Seata TCC 模式

  根據兩階段行為模式的不同,我們將分支事務划分為 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.

  AT 模式(參考鏈接 TBD)基於 支持本地 ACID 事務 的 關系型數據庫:

    • 一階段 prepare 行為:在本地事務中,一並提交業務數據更新和相應回滾日志記錄。
    • 二階段 commit 行為:馬上成功結束,自動 異步批量清理回滾日志。
    • 二階段 rollback 行為:通過回滾日志,自動 生成補償操作,完成數據回滾。

  相應的,TCC 模式,不依賴於底層數據資源的事務支持:

    • 一階段 prepare 行為:調用 自定義 的 prepare 邏輯。
    • 二階段 commit 行為:調用 自定義 的 commit 邏輯。
    • 二階段 rollback 行為:調用 自定義 的 rollback 邏輯。

  所謂 TCC 模式,是指支持把 自定義 的分支事務納入到全局事務的管理中。

SEATA Saga 模式

  適用場景:

  • 業務流程長、業務流程多

  • 參與者包含其它公司或遺留系統服務,無法提供 TCC 模式要求的三個接口

  優勢:

  • 一階段提交本地事務,無鎖,高性能

  • 事件驅動架構,參與者可異步執行,高吞吐

  • 補償服務易於實現

  缺點:

  • 不保證隔離性

  Saga的實現:

  基於狀態機引擎的 Saga 實現:

  目前SEATA提供的Saga模式是基於狀態機引擎來實現的,機制是:

  1. 通過狀態圖來定義服務調用的流程並生成 json 狀態語言定義文件

  2. 狀態圖中一個節點可以是調用一個服務,節點可以配置它的補償節點

  3. 狀態圖 json 由狀態機引擎驅動執行,當出現異常時狀態引擎反向執行已成功節點對應的補償節點將事務回滾

  4. 可以實現服務編排需求,支持單項選擇、並發、子流程、參數轉換、參數映射、服務執行狀態判斷、異常捕獲等功能

Seata XA 模式

  從編程模型上,XA 模式與 AT 模式保持完全一致。

  可以參考 Seata 官網的樣例:seata-xa

  樣例場景是 Seata 經典的,涉及庫存、訂單、賬戶 3 個微服務的商品訂購業務。

  在樣例中,上層編程模型與 AT 模式完全相同。只需要修改數據源代理,即可實現 XA 模式與 AT 模式之間的切換。

Seata 服務啟動:

   下載服務器軟件包,將其解壓縮。

  Seata-Server包含兩個核心配置文件,其中registry.conf表示配置Seata服務注冊的地址,它目前支持所有主流的注冊中心(nacos 、eureka、redis、zk、consul、etcd3、sofa)。默認是file,表示不依賴於注冊中心以及配置中心。

  file.conf存儲的是Seata服務端的配置信息,完整的配置包含transport、Server、Metrics,分別表示通信配置,服務端配置,監控等。

  配置信息修改參考 :https://seata.io/zh-cn/docs/user/configurations.html

   然后保持默認配置啟動服務即可 sh seata-server.sh  ,后台啟動 nohup ./seata-server.sh >log.out 2>1 &

springboot-dubbo-seata 服務測試 :

  官方提供了常見的各種集成方式,github地址 :https://github.com/seata/seata-samples

  將工程 clone 下來,其中有個 springboot-dubbo-seata 工程,基於 AT 模式。涉及組件 SpringBoot + Dubbo + Mybatis + Nacos + Seata 。按照演示步驟操作

  • 預先安裝並啟動  Nacos 、Mysql
  • 導入sql 目錄下的sql腳本
  • 啟動 Seata 服務
  • 修改應用中application.propertie的相關IP、file.conf 文件的default.grouplist 屬性,指向我們Seata的服務IP端口。然后分別啟動 演示應用服務。連接成功會打印以下日志:

  整合 Seata 中比較重要的操作:

1.依賴 

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>1.3.0</version>
</dependency>

2.配置事務掃描器

@Bean
public GlobalTransactionScanner globalTransactionScanner(){ return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group"); }

  關於事務分組的概念參考:https://seata.io/zh-cn/docs/user/transaction-group.html

3.配置文件file.conf

transport {
  # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #transaction service group mapping vgroupMapping.my_test_tx_group = "default" #only support when registry.type=file, please don't set multiple addresses default.grouplist = "192.168.1.101:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false } client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" logTable = "undo_log" } log { exceptionRate = 100 } }

  registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file" nacos { application = "seata-server" serverAddr = "localhost" namespace = "" username = "" password = "" } eureka { serviceUrl = "http://localhost:8761/eureka" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" timeout = "0" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" } etcd3 { serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig type = "file" nacos { serverAddr = "localhost" namespace = "" group = "SEATA_GROUP" username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }

  然后在 samples-business 模塊的 io.seata.samples.integration.call.service 包下有個 BusinessServiceImpl 測試服務:

@Service
public class BusinessServiceImpl implements BusinessService{ @Reference(version = "1.0.0") private StorageDubboService storageDubboService; @Reference(version = "1.0.0") private OrderDubboService orderDubboService; private boolean flag; /** * 處理業務邏輯 * @Param: * @Return: */ @Override @GlobalTransactional(timeoutMills = 300000, name = "dubbo-gts-seata-example") public ObjectResponse handleBusiness(BusinessDTO businessDTO) { System.out.println("開始全局事務,XID = " + RootContext.getXID()); ObjectResponse<Object> objectResponse = new ObjectResponse<>(); //1、扣減庫存 CommodityDTO commodityDTO = new CommodityDTO(); commodityDTO.setCommodityCode(businessDTO.getCommodityCode()); commodityDTO.setCount(businessDTO.getCount()); ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO); //2、創建訂單 OrderDTO orderDTO = new OrderDTO(); orderDTO.setUserId(businessDTO.getUserId()); orderDTO.setCommodityCode(businessDTO.getCommodityCode()); orderDTO.setOrderCount(businessDTO.getCount()); orderDTO.setOrderAmount(businessDTO.getAmount()); ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO); //打開注釋測試事務發生異常后,全局回滾功能 if (!flag) { throw new RuntimeException("測試拋異常后,分布式事務回滾!"); } if (storageResponse.getStatus() != 200 || response.getStatus() != 200) { throw new DefaultException(RspStatusEnum.FAIL); } objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode()); objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage()); objectResponse.setData(response.getData()); return objectResponse; } }

  可以看到最核心的還是注解  @GlobalTransactional。 可以通過其中的異常測試事務的回滾,當我們在這行代碼打上斷點的時候,再去看數據庫的 undo_log 表,會插入三條回滾日志,用於回滾操作

 

  其中 rollback_info 內容大致如下:

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog", "xid": "192.168.1.101:8091:58238671456194560", "branchId": 58238672634793984, "sqlUndoLogs": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.undo.SQLUndoLog", "sqlType": "UPDATE", "tableName": "t_storage", "beforeImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_storage", "rows": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PRIMARY_KEY", "type": 4, "value": 1 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "count", "keyType": "NULL", "type": 4, "value": 1000 }]] }]] }, "afterImage": { "@class": "io.seata.rm.datasource.sql.struct.TableRecords", "tableName": "t_storage", "rows": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Row", "fields": ["java.util.ArrayList", [{ "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "id", "keyType": "PRIMARY_KEY", "type": 4, "value": 1 }, { "@class": "io.seata.rm.datasource.sql.struct.Field", "name": "count", "keyType": "NULL", "type": 4, "value": 998 }]] }]] } }]] }

  大概就是數據的操作前后鏡像,然后根據這個做回滾操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM