seata AT模式實戰 (圖解_秒懂_史上最全)


文章很長,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分布式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鍾看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
11: 分布式事務( 圖解 + 史上最全 + 吐血推薦 ) 12:seata AT模式實戰(圖解+秒懂+史上最全)
13:seata 源碼解讀(圖解+秒懂+史上最全) 14:seata TCC模式實戰(圖解+秒懂+史上最全)

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多線程面試題(史上最全) 30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

seata AT模式實戰(圖解+秒懂+史上最全)

閱讀此文之前,請先閱讀 :

分布式事務( 圖解 + 史上最全 + 吐血推薦 )

參考鏈接
系統架構知識圖譜(一張價值10w的系統架構知識圖譜)

https://www.processon.com/view/link/60fb9421637689719d246739

秒殺系統的架構

https://www.processon.com/view/link/61148c2b1e08536191d8f92f

先來看下為什么會產生分布式事務問題

分布式事務使用場景

簡單來說:

一次業務操作需要跨多個數據源或需要跨多個系統進行遠程調用,就會產生分布式事務問題

作為典型案例,搬出經典的銀行轉賬問題:

需求:

假設銀行(bank)中有兩個客戶(name)張三和李四, 我們需要將張三的1000元存款(sal)轉到李四的賬戶上

約束:

不能出現中間狀態,張三減1000,李四沒加 , 或者 反之

在這里插入圖片描述

如果兩個用戶對應的銀行存款數據在一個數據源中,即一個數據庫中,那么通過spring框架下的@Transactional注解來保證單一數據源增刪改查的一致性。

數據庫的水平分割倒逼分布式事務

但是隨着業務的不斷擴大,用戶數在不斷變多,幾百萬幾千萬用戶時數據可以存一個庫甚至一個表里,假設有10個億的用戶?

一個表當然放不下,需要分表,當然需要分庫來配合。

為了解決數據庫上的瓶頸,分庫是很常見的解決方案,不同用戶就可能落在不同的數據庫里,原來一個庫里的事務操作,現在變成了跨數據庫的事務操作。
在這里插入圖片描述

此時@Transactional注解就失效了,這就是跨數據庫分布式事務問題

微服務化倒逼分布式事務

當然,更多的情形是隨着業務不斷增長,將業務中不同模塊服務拆分成微服務后,同時調用多個微服務所產生的

設想一個傳統的單體應用,無論多少內部調用,最后終歸是在同一個數據庫上進行操作來完成一向業務操作,如圖:

img

隨着業務量的發展,業務需求和架構發生了巨大的變化,整體架構由原來的單體應用逐漸拆分成為了微服務,

原來的3個服務被從一個單體架構上拆開了,成為了3個獨立的服務,分別使用獨立的數據源,也不在之前共享同一個數據源了,具體的業務將由三個服務的調用來完成,如圖:

img

此時,每一個服務的內部數據一致性仍然有本地事務來保證。

但是面對整個業務流程上的事務應該如何保證呢?這就是在微服務架構下面臨的挑戰,如何保證在微服務中的數據一致性。

微服務化的銀行轉賬情景往往是這樣的

  1. 調用交易系統服務創建交易訂單;

  2. 調用支付系統記錄支付明細;

  3. 調用賬務系統執行 A 扣錢;

  4. 調用賬務系統執行 B 加錢;

  5. 調用賬務系統執行 B 加錢;

在這里插入圖片描述

如圖所示,每個系統都對應一個獨立的數據源,且可能位於不同機房,同時調用多個系統的服務很難保證同時成功,這就是跨服務分布式事務問題

10WQPS秒殺實操的分庫架構

在這里插入圖片描述

Spring Cloud Alibaba Seata

解決分布式事務問題,有兩個設計初衷

  • 對業務無侵入:即減少技術架構上的微服務化所帶來的分布式事務問題對業務的侵入
  • 高性能:減少分布式事務解決方案所帶來的性能消耗

seata中常用的有兩種分布式事務實現方案,AT及TCC

  • AT模式主要關注多 DB 訪問的數據一致性,當然也包括多服務下的多 DB 數據訪問一致性問題
  • TCC 模式主要關注業務拆分,在按照業務橫向擴展資源時,解決微服務間調用的一致性問題

AT模式(業務侵入小)

Seata AT模式是基於XA事務演進而來的一個分布式事務中間件,

XA是一個基於數據庫實現的分布式事務協議,本質上和兩階段提交一樣,需要數據庫支持,Mysql5.6以上版本支持XA協議,其他數據庫如Oracle,DB2也實現了XA接口

AT模式角色如下

在這里插入圖片描述

  1. Transaction Coordinator (TC):
    事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾

  2. Transaction Manager ™:

控制全局事務的邊界,負責開啟一個全局事務,並最終發起全局提交或全局回滾的決議

  1. Resource Manager (RM):

控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾

AT模式(2PC)基本處理邏輯如下

在這里插入圖片描述
Branch就是指的分布式事務中每個獨立的本地局部事務

AT模式第一階段

Seata 的 JDBC 數據源代理通過對業務 SQL 的解析,把業務數據在更新前后的數據鏡像組織成回滾日志,利用 本地事務 的 ACID 特性,將業務數據的更新和回滾日志的寫入在同一個 本地事務 中提交。

這樣,可以保證:任何提交的業務數據的更新一定有相應的回滾日志存在

在這里插入圖片描述

基於這樣的機制,分支的本地事務便可以在全局事務的第一階段提交,並馬上釋放本地事務鎖定的資源

這也是Seata和XA事務的不同之處:

經典的2PC兩階段提交(XA)往往對資源的鎖定需要持續到第二階段實際的提交或者回滾操作

在這里插入圖片描述

AT模式,可以在第一階段釋放對資源的鎖定,降低了鎖范圍

誰的功勞:回滾日志

AT模式第二階段

在這里插入圖片描述

場景一:提交,全局提交

如果決議是全局提交,此時分支事務此時已經完成提交,不需要同步協調處理(只需要異步清理回滾日志),Phase2 可以非常快速地完成
在這里插入圖片描述

場景2:回滾,全局回滾

如果決議是全局回滾,RM 收到協調器發來的回滾請求,通過 XID 和 Branch ID 找到相應的回滾日志記錄,通過回滾記錄生成反向的更新 SQL 並執行,以完成分支的回滾
在這里插入圖片描述

AT模式相對於XA模式的優勢

在這里插入圖片描述

提高效率,即使第二階段發生異常需要回滾,只需找對undolog中對應數據並反解析成sql來達到回滾目的

同時Seata無入侵,通過代理數據源將業務sql的執行解析成undolog來與業務數據的更新同時入庫,達到了對業務無侵入的效果

10WQPS秒殺實操的AT分布式事務架構

在這里插入圖片描述

快速開始搭建Seata環境

全局事務和分支事務的存儲模式

因為 TC 需要進行全局事務和分支事務的記錄,所以需要對應的存儲。

目前,TC 有兩種存儲模式( store.mode ):

  • file 模式:適合單機模式,全局事務會話信息在內存中讀寫,並持久化本地文件 root.data,性能較高。
  • db 模式:適合集群模式,全局事務會話信息通過 db 共享,相對性能差點。

file 模式

全局事務會話信息在內存中讀寫

並持久化本地文件 root.data

file 模式,最終部署單機 TC Server 如下圖所示:

c97604a80f3dad9727473e9350a753be.png

db 模式

集群 Seata TC Server,實現高可用,生產環境下必備。

在集群時,多個 Seata TC Server 通過 db 數據庫,實現全局事務會話信息的共享。

每個 Seata TC Server 可以注冊自己到注冊中心上,方便應用從注冊中心獲得到他們。 集群 TC Server 如下圖所示:

d6fd48e578b11107e6fd321e389dbe06.png

Seata TC Server 對主流的注冊中心都提供了集成。國內使用 Nacos 作為注冊中心越來越流行,推薦使用nacos。

配置和啟動Seata-server服務(TC服務)

seata 官方文檔地址:

http://seata.io/zh-cn/docs/overview/what-is-seata.html

下載seata-server-1.3.0和seata-1.3.0源碼

seate-server下載: https://seata.io/zh-cn/blog/download.html,

seata-1.3.0源碼下載:

https://github.com/seata/seata/releases

https://github.com/seata/seata/releases/download/v1.3.0/seata-server-1.3.0.tar.gz

https://gitee.com/seata-io/seata.git

下載所有的源碼后,切換到1.3的分支

seata-server包下載和解壓

第一步:https://github.com/seata/seata/releases 下載seata-server包

推薦使用1.3.0版本,最新版本有些配套的依賴包,不一定來得及更新,可能會出現一些奇怪的問題

seata-server-1.3 上傳后,大致如下:

[root@cdh1 ~]# cd /work/
[root@cdh1 work]# ll
total 333360

-rw-r--r--  1 root root 33959771 Sep 13 16:00 seata-server-1.3.0.tar.gz

drwxr-xr-x  5 root root       87 Dec 26  2020 zookeeper

然后解壓縮,tar 命令如下:

[root@cdh1 ~]# rm -rf /work/seata
[root@cdh1 ~]# cd /work/
[root@cdh1 work]# tar -zxvf seata-server-1.3.0.tar.gz
seata/LICENSE
seata/conf/
seata/conf/META-INF/
seata/conf/META-INF/services/
seata/conf/logback.xml
seata/conf/file.conf
seata/conf/registry.conf
seata/conf/META-INF/services/io.seata.server.session.SessionManager
seata/conf/META-INF/services/io.seata.core.rpc.RegisterCheckAuthHandler
seata/conf/META-INF/services/io.seata.core.store.db.DataSourceProvider
seata/conf/META-INF/services/io.seata.server.coordinator.AbstractCore
seata/conf/META-INF/servi......

啟動 TC Server(單體的實例)

bin下執行 sh ./seata-server.sh 命令,啟動 TC Server 在后台。

我們看到如下日志,說明啟動成功:

[root@cdh1 bin]# cd /work/seata/bin/
[root@cdh1 bin]# sh bin/seata-server.sh
sh: bin/seata-server.sh: No such file or directory
[root@cdh1 bin]# sh ./seata-server.sh
Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /work/seata/logs/seata_gc.log due to No such file or directory

.....
2021-09-15 04:47:36.143 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.file.lock.FileLockManager has already been loaded, skipped
2021-09-15 04:47:36.248 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...


  • 默認配置下,Seata TC Server 啟動在 8091 端點。

使用 File 存儲器全局會話

因為我們沒有修改任何配置文件,默認情況seata使用的是file模式進行數據持久化,所以可以看到用於持久化的本地文件 root.data;操作命令如下:

[root@cdh1 bin]# ll -ls /work/seata/bin/sessionStore/
total 0
0 -rw-r--r-- 1 root root 0 Sep 15 11:47 root.data

seata-server的TC端配置

file.conf 配置文件,是RM(各大微服務的)和TC之間的通信配置

TC端的參數清單,大致如下:

key desc remark
transaction.undo.log.save.days undo保留天數 默認7天,log_status=1(附錄3)和未正常清理的undo
transaction.undo.log.delete.period undo清理線程間隔時間 默認86400000,單位毫秒
service.max.commit.retry.timeout 二階段提交重試超時時長 單位ms,s,m,h,d,對應毫秒,秒,分,小時,天,默認毫秒。默認值-1表示無限重試。公式: timeout>=now-globalTransactionBeginTime,true表示超時則不再重試
service.max.rollback.retry.timeout 二階段回滾重試超時時長 同commit
recovery.committing-retry-period 二階段提交未完成狀態全局事務重試提交線程間隔時間 默認1000,單位毫秒
recovery.asyn-committing-retry-period 二階段異步提交狀態重試提交線程間隔時間 默認1000,單位毫秒
recovery.rollbacking-retry-period 二階段回滾狀態重試回滾線程間隔時間 默認1000,單位毫秒
recovery.timeout-retry-period 超時狀態檢測重試線程間隔時間 默認1000,單位毫秒,檢測出超時將全局事務置入回滾會話管理器
store.mode 事務會話信息存儲方式 file本地文件(不支持HA),db數據庫(支持HA)
store.file.dir file模式文件存儲文件夾名 默認sessionStore
store.db.datasource db模式數據源類型 默認dbcp
store.db.db-type db模式數據庫類型 默認mysql
store.db.driver-class-name db模式數據庫驅動 默認com.mysql.jdbc.Driver
store.db.url db模式數據源庫url 默認jdbc:mysql://127.0.0.1:3306/seata
store.db.user db模式數據庫賬戶 默認mysql
store.db.min-conn db模式數據庫初始連接數 默認1
store.db.max-conn db模式數據庫最大連接數 默認3
store.db.global.table db模式全局事務表名 默認global_table
store.db.branch.table db模式分支事務表名 默認branch_table
store.db.lock-table db模式全局鎖表名 默認lock_table
store.db.query-limit db模式查詢全局事務一次的最大條數 默認1000
metrics.enabled 是否啟用Metrics 默認false關閉,在False狀態下,所有與Metrics相關的組件將不會被初始化,使得性能損耗最低
metrics.registry-type 指標注冊器類型 Metrics使用的指標注冊器類型,默認為內置的compact(簡易)實現,這個實現中的Meter僅使用有限內存計數,性能高足夠滿足大多數場景;目前只能設置一個指標注冊器實現
metrics.exporter-list 指標結果Measurement數據輸出器列表 默認prometheus,多個輸出器使用英文逗號分割,例如"prometheus,jmx",目前僅實現了對接prometheus的輸出器
metrics.exporter-prometheus-port prometheus輸出器Client端口號 默認9898

部署集群 TC Server

本小節,我們來學習部署集群 Seata TC Server,實現高可用,生產環境下必備。

在集群時,多個 Seata TC Server 通過 db 數據庫,實現全局事務會話信息的共享。

每個 Seata TC Server 可以注冊自己到注冊中心上,方便應用從注冊中心獲得到他們。最終我們部署 集群 TC Server 如下圖所示:

在這里插入圖片描述

使用 Nacos 作為注冊中心

修改 conf/registry.conf 配置文件,設置使用 Nacos 注冊中心。如下圖所示:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # type = "file"
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

如果設置的 -t 參數,表示設置了命名空間,則需要提前創建命名空間。然后拿到其 命名空間ID---- uuid。

在這里插入圖片描述

TC端數據庫初始化

下載TC端的配置文件,可以從github:

https://github.com/seata-io/seata/tree/v1.3.0/script/server/db

或者 gitee:

https://gitee.com/seata-io/seata/tree/v1.3.0/script/server/db

下載Seata1.3的版本對應的腳本。目前支持mysql、oracle、postgresql這三種數據庫,

上述三種腳本是針對Seata的Sever端在協調處理分布式事務時所需要的3張表,提供了不同數據庫的global_table表、branch_table表、lock_table表創建腳本,根據自身數據庫執行對應的sql腳本執行即可。

這里以mysql為例,在你的mysql數據庫中創建名為seata的庫,並執行以下sql,將會生成三張表:global_table表、branch_table表、lock_table表

在這里插入圖片描述

創建TC端的專屬獨立庫seata。

連接MYSQL:

格式: mysql -h主機地址 -u用戶名 -p用戶密碼

mysql -h192.168.9.1 -uroot -p123456

mysql -uroot -p123456
/usr/bin/mysql -uroot -p"123456" --connect-expired-password <<EOF
    CREATE DATABASE `seata` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
    show databases;
	grant all privileges on seata.* to root@'%' identified by '123456'  WITH GRANT OPTION;
	flush privileges;
EOF


授權的時候,如果出現出現以下提示:

ERROR 1819 (HY000): Your password does not satisfy the current policy requirements
那么導致問題的原因就是密碼規則沒有修改嘍,以下提供兩種方法更改簡單密碼:

1.在/etc/my.cnf關閉密碼規則
validate_password = off

2.進入mysql以后修改密碼規則

   set global validate_password_policy=0; ----------把密碼策略設置成0
    set global validate_password_length=0; ----------把密碼長度限制設置成0
    flush privileges; -------------------------------刷新權限使其生效


需要在剛才配置的數據庫中執行數據初始腳本 db_store.sql ,這個是全局事務控制的表,需要提前初始化。

db_store.sql 為seata庫必須的表,執行sql即可

通過 source 命令,導致sql腳本就可以了。

mysql> source /work/seata/conf/db_store.sql;

mysql> use seata;
Database changed
mysql> source   /work/seata/conf/db_store.sql;
Query OK, 0 rows affected, 1 warning (0.00 sec)

Query OK, 0 rows affected (0.02 sec)

Query OK, 0 rows affected, 1 warning (0.00 sec)

Query OK, 0 rows affected (0.00 sec)

Query OK, 0 rows affected, 1 warning (0.00 sec)

Query OK, 0 rows affected (0.01 sec)

這里我們只是做演示,理論上上面三個業務服務應該分屬不同的數據庫,這里我們只是在同一台數據庫下面創建三個 Schema ,分別為 db_account 、 db_order 和 db_storage ,具體如圖:


mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| branch_table    |
| global_table    |
| lock_table      |
+-----------------+
3 rows in set (0.00 sec)


通過windows 工具操作,也是類似的,導入 db_store.sql之后,創建了3個表 db_account 、 db_order 和 db_storage ,具體如圖:

10bc6bbb40535db878403d34014d156b.png

db 數據庫共享全局事務會話信息:

如果使用file作為配置文件

修改 conf/file.conf 配置文件,修改使用 db 數據庫,實現 Seata TC Server 的全局事務會話信息的共享。

如下圖所示:


## transaction log store, only used in seata-server
store {
  ## store mode: file、db
  ## mode = "file"
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    ## driverClassName = "com.mysql.jdbc.Driver"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://192.168.56.121:3306/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=true&serverTimezone=UTC"
    user = "root"
    password = "123456"
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
}

如何使用nacos作為配置中心:

修改使用 db 數據庫,實現 Seata TC Server 的全局事務會話信息的共享。

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.vgroupMapping.seata-seckill-demo-seata-service-group=default
service.vgroupMapping.seata-order-demo-seata-service-group=default
service.vgroupMapping.seata-stock-demo-seata-service-group=default
service.default.grouplist=cdh1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
#store.mode=file
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
補充:MySQL8 的支持

如果使用的 MySQL 是 8.X 版本,需要下載 MySQL 8.X JDBC 驅動,命令行操作如下:

$ cd lib
$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用該 MySQL 8.X JDBC 驅動。如下圖所示:

在這里插入圖片描述

啟動 第一個TC Server

執行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,啟動第一個 TC Server 在后台。

-p:Seata TC Server 監聽的端口。

-n:Server node。在多個 TC Server 時,需區分各自節點,用於生成不同區間的 transactionId 事務編號,以免沖突。

nohup sh /work/seata/bin/seata-server.sh -p 18091 -n 1 > /work/seata/bin/consol.log & 
	
tail -f /work/seata/bin/consol.log

在 consol.log 文件中,我們看到如下日志,說明啟動成功:

e.session.FileSessionManager has already been loaded, skipped
2021-09-15 07:46:33.472 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.db.session.DataBaseSessionManager has already been loaded, skipped
.....
2021-09-15 07:46:34.094 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.file.lock.FileLockManager has already been loaded, skipped
2021-09-15 07:46:34.220 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ...


啟動 第2個TC Server

執行 nohup sh bin/seata-server.sh -p 18092 -n 2 & 命令,啟動第二個 TC Server 在后台。

nohup sh /work/seata/bin/seata-server.sh -p 18092 -n 1 > /work/seata/bin/consol2.log & 
	
tail -f /work/seata/bin/consol2.log

實驗演示:注冊到nacos之后,可以在nacos上看效果

具體演示,參見此博客的配套視頻

控制台提交nacos配置腳本

下載nacos的配置文件,原始config.txt文件可以從github:

https://github.com/seata/seata/tree/develop/script/config-center

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/config-center

下載的並修改:

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=true
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroup_mapping.my_test_tx_group=default
service.vgroup_mapping.seata-seckill-demo=default
service.vgroup_mapping.seata-order-demo=default
service.vgroup_mapping.seata-stock-demo=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
store.mode=file
store.lock.mode=file
store.session.mode=file
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=cdh1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=123456
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

修改 自己定義的服務組

在 nacos-config.txt 文件 修改 自己定義的服務組 ,參考如下:

service.vgroup_mapping.my_test_tx_group=default
service.vgroup_mapping.seata-seckill-demo=default
service.vgroup_mapping.seata-order-demo=default
service.vgroup_mapping.seata-stock-demo=default

中間的${your-service-gruop}為自己定義的服務組名稱,這里需要我們在程序的配置文件中配置,筆者這里直接使用程序的 spring.application.name。

修改數據庫連接

用到2個文件,nacos-config.txt和nacos-config.sh,都在config目錄下。

config.txt需要:

  • 修改數據庫連接

store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver # 這里
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true # 這里
store.db.user=username # 這里
store.db.password=password # 這里

將文件中的配置導入nacos 即可

下載nacos的配置文件,原始config.txt文件可以從github:

https://github.com/seata/seata/tree/develop/script/config-center

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/config-center

提供的nacos腳本nacos-config.sh,將上面的config.txt文件復制到seata的config目錄,通過 nacos-config.sh ,將文件中的配置導入nacos 即可。執行以下命令:

sh /work/seata/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u nacos -w nacos

將以上信息提交到nacos控制台,當然,如果有需要修改參數,可直接通過登錄nacos控制台修改。

[root@cdh1 script]# sh /work/seata/script/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u nacos -w nacos
set nacosAddr=localhost:8848
set group=SEATA_GROUP
Set transport.type=TCP successfully
Set transport.server=NIO successfully
Set transport.heartbeat=true successfully
Set transport.enableClientBatchSendRequest=true successfully
Set transport.threadFactory.bossThreadPrefix=NettyBoss successfully
Set transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker successfully
Set transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler successfully
Set transport.threadFactory.shareBossWorker=false successfully
Set transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector successfully
Set transport.threadFactory.clientSelectorThreadSize=1 successfully
Set transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread successfully
Set transport.threadFactory.bossThreadSize=1 successfully
Set transport.threadFactory.workerThreadSize=default successfully
Set transport.shutdown.wait=3 successfully
Set service.vgroup_mapping.my_test_tx_group=default successfully
Set service.vgroup_mapping.seata-seckill-demo=default successfully
Set service.vgroup_mapping.seata-order-demo=default successfully
Set service.vgroup_mapping.seata-stock-demo=default successfully
Set service.default.grouplist=127.0.0.1:8091 successfully
Set service.enableDegrade=false successfully
Set service.disableGlobalTransaction=false successfully
Set client.rm.asyncCommitBufferLimit=10000 successfully
Set client.rm.lock.retryInterval=10 successfully
Set client.rm.lock.retryTimes=30 successfully
Set client.rm.lock.retryPolicyBranchRollbackOnConflict=true successfully
Set client.rm.reportRetryCount=5 successfully
Set client.rm.tableMetaCheckEnable=false successfully
Set client.rm.tableMetaCheckerInterval=60000 successfully
Set client.rm.sqlParserType=druid successfully
Set client.rm.reportSuccessEnable=false successfully
Set client.rm.sagaBranchRegisterEnable=false successfully
Set client.rm.tccActionInterceptorOrder=-2147482648 successfully
Set client.tm.commitRetryCount=5 successfully
Set client.tm.rollbackRetryCount=5 successfully
Set client.tm.defaultGlobalTransactionTimeout=60000 successfully
Set client.tm.degradeCheck=false successfully
Set client.tm.degradeCheckAllowTimes=10 successfully
Set client.tm.degradeCheckPeriod=2000 successfully
Set client.tm.interceptorOrder=-2147482648 successfully
Set store.mode=file successfully
Set store.lock.mode=file successfully
Set store.session.mode=file successfully
Set store.publicKey= failure
Set store.file.dir=file_store/data successfully
Set store.file.maxBranchSessionSize=16384 successfully
Set store.file.maxGlobalSessionSize=512 successfully
Set store.file.fileWriteBufferCacheSize=16384 successfully
Set store.file.flushDiskMode=async successfully
Set store.file.sessionReloadReadSize=100 successfully
Set store.db.datasource=druid successfully
Set store.db.dbType=mysql successfully
Set store.db.driverClassName=com.mysql.jdbc.Driver successfully
Set store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true&rewriteBatchedStatements=true successfully
Set store.db.user=root successfully
Set store.db.password=123456 successfully
Set store.db.minConn=5 successfully
Set store.db.maxConn=30 successfully
Set store.db.globalTable=global_table successfully
Set store.db.branchTable=branch_table successfully
Set store.db.queryLimit=100 successfully
Set store.db.lockTable=lock_table successfully
Set store.db.maxWait=5000 successfully
Set store.redis.mode=single successfully
Set store.redis.single.host=cdh1 successfully
Set store.redis.single.port=6379 successfully
Set store.redis.sentinel.masterName= failure
Set store.redis.sentinel.sentinelHosts= failure
Set store.redis.maxConn=10 successfully
Set store.redis.minConn=1 successfully
Set store.redis.maxTotal=100 successfully
Set store.redis.database=0 successfully
Set store.redis.password=123456 successfully
Set store.redis.queryLimit=100 successfully
Set server.recovery.committingRetryPeriod=1000 successfully
Set server.recovery.asynCommittingRetryPeriod=1000 successfully
Set server.recovery.rollbackingRetryPeriod=1000 successfully
Set server.recovery.timeoutRetryPeriod=1000 successfully
Set server.maxCommitRetryTimeout=-1 successfully
Set server.maxRollbackRetryTimeout=-1 successfully
Set server.rollbackRetryTimeoutUnlockEnable=false successfully
Set server.distributedLockExpireTime=10000 successfully
Set client.undo.dataValidation=true successfully
Set client.undo.logSerialization=jackson successfully
Set client.undo.onlyCareUpdateColumns=true successfully
Set server.undo.logSaveDays=7 successfully
Set server.undo.logDeletePeriod=86400000 successfully
Set client.undo.logTable=undo_log successfully
Set client.undo.compress.enable=true successfully
Set client.undo.compress.type=zip successfully
Set client.undo.compress.threshold=64k successfully
Set log.exceptionRate=100 successfully
Set transport.serialization=seata successfully
Set transport.compressor=none successfully
Set metrics.enabled=false successfully
Set metrics.registryType=compact successfully
Set metrics.exporterList=prometheus successfully
Set metrics.exporterPrometheusPort=9898 successfully
=========================================================================
 Complete initialization parameters,  total-count:97 ,  failure-count:3
=========================================================================
 init nacos config fail.

順利完成后,nacos中如下

在這里插入圖片描述

如果設置的 -t 參數,表示設置了命名空間,則需要提前創建命名空間。然后拿到其 命名空間ID---- uuid。

在這里插入圖片描述

直接使用nacos 的data-id配置文件

制作和上傳配置文件

把上面的配置,做成一個文件,放在nacos中

在這里插入圖片描述

config文件中的修改

##配置seata-server的注冊中心,支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # type = "file"
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "cdh1:8848"
    namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}
##配置seata-server的配置中心,支持file、nacos 、apollo、zk、consul、etcd3

config {
  # file、nacos 、apollo、zk、consul、etcd3
  # type = "file"
  type = "nacos"
  #nacos {
  #   serverAddr = "ch"
  #  namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4"
  #  group = "SEATA_GROUP"
  #  username = ""
  #  password = ""
  # }
  nacos {
    application = "seata-server"
    #  serverAddr = "192.168.56.121:8848"
    serverAddr = "cdh1:8848"
    namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4"
    group = "SEATA_GROUP"
    dataId = "seata-tc.properties"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }

  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

重啟 TC,一切OK

NameSpace、Group、DataId

nacos中提供了NameSpace、Group、DataId,他的作用是能讓我們對配置文件進行分類管理,三個能夠確定唯一的配置文件。我說一點,可能不同的公司,會對這3個的定義是不同的。

 比如 定義一:

    1. NameSpace:區分不同的環境

    2. Group:區分不同的項目或系統

    3. DataId:項目中的配置文件

 定義二:

    1. NameSpace:區分不同的項目

    2. Group:區分不同的模塊

    3. DataId:區分不同的環境

 

 定義三:

    1. NameSpace:區分不同的租戶

    2. Group:區分不同的應用

    3. DataId:區分不同的環境

 還有其他的定義,看公司。

NameSpace:區分不同的環境

一般開發都會有多套環境,如果多套環境公用一個nacos,那么配置中心和注冊中心都會發生沖突,所以需要用namespace隔離開

  address: 0.0.0.0
  port: 8083
  servlet:
    # 這里設置了context-path
    context-path: /settlement/v1

spring:
  application:
    name: settlement
  cloud:
    nacos:
      config:
        server-addr: nacos-headless.default.svc.cluster.local:8848
        # 控制台創建命名空間得到的uuid
        namespace: c9ad103a-5420-4628-aba7-c147e3048d9d
      discovery:
        server-addr: nacos-headless.default.svc.cluster.local:8848
        # 控制台創建命名空間得到的uuid
        namespace: c9ad103a-5420-4628-aba7-c147e3048d9d
        metadata:
          management:
            # 這里要適配下健康檢查的endpoint
            context-path: '${server.servlet.context-path}/actuator'

management:
  endpoints:
    web:
      exposure:
        # actuator暴露所有endpoint
        include: "*"

進行Group的切換

 

假設要進行Group的切換,只需要改下面的配置的值:

server:
  port: 3377
spring:
  application:
    name: nacos-config-client
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 #注冊中心的地址
      config:
        server-addr: localhost:8848 #配置中心的地址
        file-extension: yaml # 要讀取nacos上的配置文件的后綴,這里只能是yaml,不能是yml
        group: TEST_GROUP

  

進行NameSpace的切換

假設要進行NameSpace的切換,只需要改下面的配置的值:

server:
  port: 3377
spring:
  application:
    name: nacos-config-client
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 #注冊中心的地址
        namespace: 命名空間的ID
      config:
        server-addr: localhost:8848 #配置中心的地址
        file-extension: yaml # 要讀取nacos上的配置文件的后綴,這里只能是yaml,不能是yml
        namespace: 命名空間的ID

 假設同一個NameSpace、Group,要進行DataId的 profile 后綴切換,只需要改下面的配置的值:

spring:
  profiles:
    active: dev

  

AT模式的RM&TM&TC直接的交互流程

在這里插入圖片描述

- 事務協調器 TC :  維護全局和分支事務的狀態,指示全局提交或者回滾。
- 事務管理者 TM :  開啟、提交或者回滾一個全局事務。  
- 資源管理者 RM(數據庫) :  管理執行分支事務的那些資源,向TC注冊分支事務、上報分支事務狀態、控制分支事務的提交或者回滾。

大致的流程

① TM 請求 TC, 開始一個新的全局事務,TC 會為這個全局事務生成一個 唯一XID
② XID 通過微服務的調用鏈傳遞到其他微服務。
③ RM 向TC 注冊分支事務, 將其納入XID 對應全局事務的管轄 ;
④ TM 請求 TC 對這個 XID 進行提交或回滾
⑤ TC 指揮這個 XID 下面的所有分支事務進行提交、回滾。

上面的流程有點復雜,如果搞不清楚,可以看下 配套視頻

TM&RM 應用開發

如果你經過前面的步驟搭建Seata環境完成了,那么你可以嘗試一下分布式事務的應用開發,也就是TM&RM這塊。

在這里插入圖片描述

業務場景

那么下面准備以Seata官方文檔上的一個經典例子為題,模擬用戶下單,創建訂單同時扣減庫存數量這一過程中產生的分布式事務問題,然后使用Seata解決,正好使用以下Seata的特性。

在這里插入圖片描述

中間件版本選型

在當下微服務架構比較火熱時,新一代微服務解決方案Spring Cloud Alibaba提供的開源分布式事務解決框架Seata無疑成為了我們在解決分布式事務時的首要之選,

版本選擇: Spring Cloud Alibaba與Spring Boot、Spring Cloud版本對應關系

img

坑點:

如果項目中使用了druid數據庫連接池,引入的是SpringBoot的Starter依賴druid-spring-boot-starter,那么需要把druid-spring-boot-starter依賴換成druid1.1.23,

因為seata源碼中引入的druid依賴跟druid-spring-boot-starter的自動裝配類沖突了,沖突的情況下項目啟動出現異常

數據庫准備

在這里插入圖片描述

連接MYSQL:

格式: mysql -h主機地址 -u用戶名 -p用戶密碼

mysql -h192.168.9.1 -uroot -p123456

mysql -uroot -p123456

創建RM端的專屬獨立庫seata。

/usr/bin/mysql -uroot -p"123456" --connect-expired-password <<EOF
CREATE DATABASE `seata-stock-demo` default character set utf8mb4 collate utf8mb4_unicode_ci;
CREATE DATABASE `seata-order-demo`  default character set utf8mb4 collate utf8mb4_unicode_ci;
show databases;
grant all privileges on `seata-order-demo`.* to root@'%' identified by '123456'  WITH GRANT OPTION;
grant all privileges on `seata-stock-demo`.* to root@'%' identified by '123456'  WITH GRANT OPTION;
flush privileges;
EOF


在這里插入圖片描述

創建RM端的專屬獨立的表。

在自己微服務數據庫里面創建undo_log表,sql在源碼里面有

下載TM端的配置文件,原始config.txt文件可以從github:

https://github.com/seata-io/seata/blob/v1.3.0/script/client/at/db/mysql.sql

或者 gitee:

https://gitee.com/seata-io/seata/blob/v1.3.0/script/client/at/db/mysql.sql

mysql> source /work/seata/conf/un_do.sql;

mysql -uroot -p123456

mysql> use seata-stock-demo;
Database changed
mysql>source   /work/seata/conf/un_do.sql;


mysql> use seata-order-demo;
Database changed
mysql>source   /work/seata/conf/un_do.sql;

這里我們只是做演示,理論上上面三個業務服務應該分屬不同的數據庫,這里我們只是在同一台數據庫下面創建三個 Schema ,分別為 db_account 、 db_order 和 db_storage ,具體如圖:

mysql> show tables;
+-----------------+
| Tables_in_seata |
+-----------------+
| branch_table    |
| global_table    |
| lock_table      |
+-----------------+
3 rows in set (0.00 sec)


TM端的注冊中心配置

下載TM端的配置文件,原始config.txt文件可以從github:

https://github.com/seata/seata/tree/develop/script/client

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/client

下載的並修改 register.config:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
  # type = "file"
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "cdh1"
    namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4"
    cluster = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }
 #  nacos {
  #   application = "seata-server"
 #    serverAddr = "127.0.0.1:8848"
 #    group = "SEATA_GROUP"
 #    namespace = ""
 #    username = ""
 #    password = ""
 #  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    timeout = "0"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
  custom {
    name = ""
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
    dataId = "seata.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
    apolloAccesskeySecret = ""
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
  custom {
    name = ""
  }
}

file.conf本地配置

如果配置中心的類型都不是file時,就不需要file.conf文件了

如果配置中心的類型是file時,就需要file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  # my_test_tx_group為分組名稱,需要與應用中的配置分組名稱一致
  # "default"為seata-server的集群名稱,即seata-server注冊到注冊中心的名稱
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  # 當registry.type=file時,客戶端通過ip:port連接到seata-server
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    # undo表名稱,SEATA AT模式需要UNDO_LOG表
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

file.conf 配置文件分部分說明

這里說一下配置的內容,已默認的file為例:
在file.conf中有3部分配置內容:

1.transport

transport部分的配置是關於Netty的配置,主要體現在io.seata.core.rpc.netty包下的NettyBaseConfig、NettyServerConfig、NettyClientConfig,client與server的通信使用的是Netty

2.service

僅針對client有效

# service configuration, only used in client side
service {
  #transaction service group mapping
  #my_test_tx_group--->自定義分布式事務組名稱  
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  #db模式改配置無效
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  #是否啟用seata的分布式事務
  disableGlobalTransaction = false
}

io.seata.spring.annotation.GlobalTransactionScanner

3.client

僅針對client有效

#client transaction configuration, only used in client side
client {
  rm {
  	#RM接收TC的commit通知后緩沖上限
    asyncCommitBufferLimit = 10000
    lock {
      #校驗或占用全局鎖重試間隔 默認10,單位毫秒
      retryInterval = 10
      #校驗或占用全局鎖重試次數 默認30
      retryTimes = 30
      #分支事務與其它全局回滾事務沖突時鎖策略	默認true,優先釋放本地鎖讓回滾成功
      retryPolicyBranchRollbackOnConflict = true
    }
    #一階段結果上報TC重試次數	默認5次
    reportRetryCount = 5
    #自動刷新緩存中的表結構	默認false
    tableMetaCheckEnable = false
    #是否上報一階段成功	true、false,從1.1.0版本開始,默認false.true用於保持分支事務生命周期記錄完整,false可提高不少性能
    reportSuccessEnable = false
    sqlParserType = druid
  }
  tm {
  	#一階段全局提交結果上報TC重試次數	默認1次,建議大於1
    commitRetryCount = 5
    #一階段全局回滾結果上報TC重試次數	默認1次,建議大於1
    rollbackRetryCount = 5
  }
  undo {
  	#二階段回滾鏡像校驗	默認true開啟,false關閉
    dataValidation = true
    #undo序列化方式	默認jackson
    logSerialization = "jackson"
    #自定義undo表名	默認undo_log
    logTable = "undo_log"
  }
  log {
  	#日志異常輸出概率 默認100,目前用於undo回滾失敗時異常堆棧輸出,百分之一的概率輸出,回滾失敗基本是臟數據,無需輸出堆棧占用硬盤空間
    exceptionRate = 100
  }
}

項目准備

模塊架構

在這里插入圖片描述

模塊的角色架構

在這里插入圖片描述

maven依賴

<!-- 分布式事務seata包 --> 
<!--seata begin--> 
<dependency> 
   <groupId>com.alibaba.cloud</groupId> 
   <artifactId>spring-cloud-starter-alibaba-seata</artifactId> 
   <version>2.1.3.RELEASE</version> 
   <exclusions> 
     <exclusion> 
        <groupId>io.seata</groupId> 
        <artifactId>seata-spring-boot-starter</artifactId> 
     </exclusion>    
   </exclusions> 
</dependency> 
<dependency> 
    <groupId>io.seata</groupId> 
    <artifactId>seata-spring-boot-starter</artifactId> 
    <version>1.3.0</version> 
</dependency> 
<!--seata end-->

事務分組是什么?

於是,上官網查看相關的參數配置,搜索serviceGroup

https://seata.io/zh-cn/docs/user/transaction-group.html

事務分組是什么?
事務分組是 Seata 的資源邏輯,類似於服務實例。在file.conf中的
my_test_tx_group就是一個事務分組。

通過事務分組如何找到后端集群?
首先程序中配置了事務分組(GlobalTransactionScanner 構造方法的txServiceGroup參數),程序會通過用戶配置的配置中心去尋找
service.vgroupMapping.事務分組配置項,取得配置項的值就是TC集群的名稱。拿到集群名稱程序通過一定的前后綴+集群名稱去構造服務名,
各配置中心的服務名實現不同。拿到服務名去相應的注冊中心去拉取相應服務名的服務列表,獲得后端真實的TC服務列表。

為什么這么設計,不直接取服務名?
這里多了一層獲取事務分組到映射集群的配置。這樣設計后,事務分組可以作為資源的邏輯隔離單位,當發生故障時可以快速failover。

根據第 2 點的說明,就可以知道問題所在了:

TM&RM配置詳解

  1. registry.conf:配置注冊中心和配置中心,默認是file。
  2. file.conf:seata工作規則信息
  3. DataSourceConfig:配置代理數據源實現分支事務,如果沒有注入,事務無法成功回滾

registry.conf配置:配置注冊中心和配置中心

該文件包含兩部分配置:

  1. 注冊中心

  2. 配置中心

注冊中心

registry { # 注冊中心配置
  # 可選項:file 、nacos 、eureka、redis、zk
  type = "nacos" # 指定nacos注冊中心,默認是file。由於項目整體使用nacos,所以后續選擇nacos

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "public"
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:1001/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6381"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  file {
    name = "file.conf"
  }
}

配置中心

config { # 配置中心
  # 可選項:file、nacos 、apollo、zk
  type = "file" # 指向file配置中心,也可以指向nacos等其他注冊中心

  nacos {
    serverAddr = "localhost"
    namespace = "public"
    cluster = "default"
  }
  apollo {
    app.id = "fescar-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  file {
    name = "file.conf"   # 通過file.conf配置seata參數,指向第二個配置文件
  }
}

file.conf

該文件的命名取決於registry.conf配置中心的配置

由於registry.conf中配置的是

img

也就是說:file.conf

文件名取決於registry的配置中心(config{...})配置,如果registry配置的配置中心不是file,可以沒有改文件。

如果配置中心是nacos,這是file.conf文件就不需要了,把file.conf文件內容交給nacos就可

事務日志存儲配置:

store {
  ## store mode: file、db
  mode = "file"  # 存儲方式file、db

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }

  ## database store
  db {
    driver_class = ""
    url = ""
    user = ""
    password = ""
  }
}

TC信息配置

當前微服務在seata服務器中注冊的信息配置:

service {
  #vgroup->rgroup
  #必須和服務名一致:${spring.applicaiton.name}
  #vgroup_mapping.${spring.application.name}-fescar-service-group = "default"
  vgroup_mapping.${spring.application.name}-fescar-service-group = "default"
  #only support single node
  default.grouplist = "cdh1:18091" #seata-server服務器地址,默認是8091
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
}

客戶端相關工作的機制

client {
  async.commit.buffer.limit = 10000
  lock {
    retry.internal = 10
    retry.times = 30
  }
}

DataSourceConfig

每一個微服務原來自己的數據源都必須使用DataSourceProxy代理,這樣seata才能掌控所有事務。

@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    /**
     * 需要將 DataSourceProxy 設置為主數據源,否則事務無法回滾
     *
     * @param druidDataSource The DruidDataSource
     * @return The default datasource
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

事務注解

RM主業務方法添加全局事務:@GlobalTransactional

TM分支業務方法添加本地事務注解:@Transactional

說明:以上的配置,如果不能夠理解,請參考配套視頻

代碼實現

10WQPS秒殺實操的AT分布式事務架構

在這里插入圖片描述

如果你經過前面的步驟搭建Seata環境完成了,那么你可以嘗試一下啟動項目,控制台無異常則搭建成功。

那么下面准備以Seata官方文檔上的一個經典例子為題,模擬用戶下單,創建訂單同時扣減庫存數量這一過程中產生的分布式事務問題,然后使用Seata解決,正好使用以下Seata的特性。

服務秒殺

seckillController


@RestController
@RequestMapping("/api/seckill/seglock/")
@Api(tags = "秒殺練習分布式事務 版本")
public class SeckillBySegmentLockController {


    @Resource
    SeataSeckillServiceImpl seataSeckillServiceImpl;


    /**
     * 執行秒殺的操作
     * 減庫存,下訂單
     * <p>
     * {
     * "exposedKey": "4b70903f6e1aa87788d3ea962f8b2f0e",
     * "newStockNum": 10000,
     * "seckillSkuId": 1247695238068177920,
     * "seckillToken": "0f8459cbae1748c7b14e4cea3d991000",
     * "userId": 37
     * }
     *
     * @return
     */
    @ApiOperation(value = "秒殺")
    @PostMapping("/doSeckill/v1")
    RestOut<SeckillDTO> doSeckill(@RequestBody SeckillDTO dto) {

        seataSeckillServiceImpl.doSeckill(dto);

        return RestOut.success(dto).setRespMsg("秒殺成功");
    }


}

seckillServiceImpl

public class SeataSeckillServiceImpl {

    @Autowired
    private SeataDemoOrderFeignClient stockFeignClient;
    @Autowired
    private SeataDemoStockFeignClient orderFeignClient;

    /**
     * 減庫存,下訂單
     */
    @GlobalTransactional  //開啟全局事務(重點) 使用 seata 的全局事務
    public void doSeckill(@RequestBody SeckillDTO dto) {

        orderFeignClient.minusStock(dto);
        stockFeignClient.addOrder(dto);
    }
}

庫存服務的Feign類

@FeignClient(name = "seata-stock-demo", path = "/seata-stock-demo/api/seckill/sku/")
public interface SeataDemoStockFeignClient {
    /**
     * minusStock 秒殺庫存
     *
     * @param dto 商品與庫存
     * @return 商品 skuDTO
     */
    @RequestMapping(value = "/minusStock/v1", method = RequestMethod.POST)
    RestOut<SeckillSkuDTO> minusStock(@RequestBody SeckillDTO dto);


}

訂單服務的Feign類


@FeignClient(name = "seata-order-demo", path = "/seata-order-demo/api/seckill/order/")
public interface SeataDemoOrderFeignClient {

    @RequestMapping(value = "/addOrder/v1", method = RequestMethod.POST)
    RestOut<SeckillOrderDTO> addOrder(@RequestBody SeckillDTO dto);
}

訂單服務

orderController



@RestController
@RequestMapping("/api/seckill/order/")
@Api(tags = "秒殺練習 訂單管理")
public class SeataATOrderController {
    @Resource
    SeckillOrderServiceImpl seckillOrderService;


    /**
     * 查詢用戶訂單信息
     *
     * @param userId 用戶id
     * @return 商品 dto
     */
    @PostMapping("/user/{id}/list/v1")
    @ApiOperation(value = "查詢用戶訂單信息")
    RestOut<PageOut<SeckillOrderDTO>> userOrders(
            @PathVariable(value = "id") Long userId, @RequestBody PageReq pageReq) {
        PageOut<SeckillOrderDTO> dto = seckillOrderService.findOrderByUserID(userId, pageReq);
        if (null != dto) {
            return RestOut.success(dto).setRespMsg("查詢成功");
        }
        return RestOut.error("查詢失敗");
    }

    /**
     * 查詢用戶的訂單信息
     *
     * @param userId 用戶id
     * @param skuId  商品id
     * @return 商品 dto
     */
    @GetMapping("/{userId}/{skuId}/v1")
    @ApiOperation(value = "查詢用戶訂單信息")
    RestOut<SeckillOrderDTO> userOrders(
            @PathVariable(value = "userId") Long userId,
            @PathVariable(value = "skuId") Long skuId,
            @RequestBody PageReq pageReq) {
        List<SeckillOrderPO> pos = seckillOrderService.findOrderByUserIDAndSkuId(userId, skuId);
        if (null != pos && pos.size() > 0) {
            SeckillOrderDTO orderDTO = new SeckillOrderDTO();
            BeanUtils.copyProperties(pos.get(0), orderDTO);
            return RestOut.success(orderDTO).setRespMsg("查詢成功");
        }
        return RestOut.error("查詢失敗");
    }


    /**
     * 清除用戶訂單信息
     *
     * @param dto 含有  用戶id的dto
     * @return 操作結果
     */
    @PostMapping("/user/clear/v1")
    @ApiOperation(value = "清除用戶訂單信息")
    RestOut<String> userOrdersClear(@RequestBody SeckillDTO dto) {
        Long userId = dto.getUserId();
        String result = seckillOrderService.clearOrderByUserID(userId);

        return RestOut.success(result).setRespMsg("處理完成");
    }


    /**
     * 執行秒殺的操作
     * <p>
     * <p>
     * {
     * "exposedKey": "4b70903f6e1aa87788d3ea962f8b2f0e",
     * "newStockNum": 10000,
     * "seckillSkuId": 1157197244718385152,
     * "seckillToken": "0f8459cbae1748c7b14e4cea3d991000",
     * "userId": 37
     * }
     *
     * @return
     */
    @ApiOperation(value = "下訂單")
    @PostMapping("/addOrder/v1")
    RestOut<SeckillOrderDTO> addOrder(@RequestBody SeckillDTO dto) {
        SeckillOrderDTO orderDTO = seckillOrderService.addOrder(dto);
        return RestOut.success(orderDTO).setRespMsg("下訂單成功");
    }
}

OrderServiceImpl

 /**
     * 執行秒殺下單
     *
     * @param inDto
     * @return
     */
    @Transactional //開啟本地事務
    // @GlobalTransactional//不,開啟全局事務(重點) 使用 seata 的全局事務
    public SeckillOrderDTO addOrder(SeckillDTO inDto) {

        long skuId = inDto.getSeckillSkuId();
        Long userId = inDto.getUserId();


        /**
         * 創建訂單對象
         */
        SeckillOrderPO order =
                SeckillOrderPO.builder()
                        .skuId(skuId).userId(userId).build();


        Date nowTime = new Date();
        order.setCreateTime(nowTime);
        order.setStatus(SeckillConstants.ORDER_VALID);


        SeckillOrderDTO dto = null;

        /**
         * 創建重復性檢查的訂單對象
         */
        SeckillOrderPO checkOrder =
                SeckillOrderPO.builder().skuId(
                        order.getSkuId()).userId(order.getUserId()).build();

        //記錄秒殺訂單信息
        long insertCount = seckillOrderDao.count(Example.of(checkOrder));

        //唯一性判斷:skuId,id 保證一個用戶只能秒殺一件商品
        if (insertCount >= 1) {
            //重復秒殺
            log.error("重復秒殺");
            throw BusinessException.builder().errMsg("重復秒殺").build();
        }


        /**
         * 插入秒殺訂單
         */
        seckillOrderDao.save(order);


        dto = new SeckillOrderDTO();
        BeanUtils.copyProperties(order, dto);


        return dto;

    }


庫存服務

stockController


@Slf4j
@RestController
@RequestMapping("/api/seckill/sku/")
@Api(tags = "商品庫存")
public class SeataSeckillStockController {
    @Resource
    SeataStockServiceImpl seckillSkuStockService;

    /**
     * minusStock 秒殺庫存
     *
     * @param dto 商品與庫存
     * @return 商品 skuDTO
     */
    @PostMapping("/minusStock/v1")
    @ApiOperation(value = "減少秒殺庫存")
    RestOut<SeckillSkuDTO> minusStock(@RequestBody SeckillDTO dto, HttpServletRequest request) {

        // 綁定 XID,自動創建分支事物
        // 異常后,整個調用鏈路回滾
        String keyId = request.getHeader(RootContext.KEY_XID);


        if (null != keyId) {

            log.info("RootContext.KEY_XID is {}", keyId);
            // 綁定 XID,自動創建分支事物
            RootContext.bind(keyId);
        }


        Long skuId = dto.getSeckillSkuId();

        SeckillSkuDTO skuDTO = seckillSkuStockService.minusStock(dto);

        if (null != skuDTO) {
            return RestOut.success(skuDTO).setRespMsg("減少秒殺庫存成功");
        }
        return RestOut.error("未找到指定秒殺商品");
    }

}

StockServiceImpl

   /**
     * 執行秒殺下單
     *
     * @param inDto
     * @return
     */
    @Transactional
    public SeckillSkuDTO minusStock(SeckillDTO inDto) {

        long skuId = inDto.getSeckillSkuId();


        Optional<SeckillSkuPO> optional = seckillSkuDao.findById(skuId);

        if (!optional.isPresent()) {
            throw BusinessException.builder().errMsg("商品不存在").build();
        }

        SeckillSkuPO po = optional.get();
        if (po.getStockCount() <= 0) {
            throw BusinessException.builder().errMsg("庫存不夠").build();
        }
        seckillSkuDao.decreaseStockCountById(skuId);
//        po.setStockCount(po.getStockCount() - 1);
        SeckillSkuDTO dto = new SeckillSkuDTO();
        dto.setStockCount(po.getStockCount() - 1);
        BeanUtils.copyProperties(po, dto);
        return dto;


    }


實驗演示:啟動后進行事務演示

啟動之后,可以進行秒殺下單演示,注意,第二階段的成功或者回滾

具體演示,參見此博客的配套視頻

錯誤排除

NettyClientChannelManager : no available service ‘null‘ found, please make sure registry config...

項目中在使用 Seata 作為分布式事務的時候,好像特別容易遇到這個錯誤~
總結原因,還是對Seata配置參數不熟悉導致的。
首先是看到錯誤描述,找到拋出錯誤的地方:

在這里插入圖片描述

在這里插入圖片描述

看到 getAvailServerList(transactionServiceGroup); 的入參是:transactionServiceGroup。剛好在配置文件中也有一項配置與之對應:

在這里插入圖片描述

Seata AT分布式事務執行流程演示

同步調用的3個階段:

1、主事務調用分支事務之前;

2、分支事務結束並返回之前;

3、分支事務提交后,且主事務提交前

同步調用出問題的情況:

1、主事務成功,分支事務失敗

2、主事務失敗,分支事務成功

正常執行

觀察每個階段的undo_log和TC的記錄

階段一全局事務:

通過seckill 服務,觀察 TC的記錄 幾個表的數據。

階段一分支事務觀察:

通過庫存服務,觀察undolog記錄;

階段一觀察:

觀察seata全局鎖。

階段二全局事務觀察:

通過seckill 服務,觀察 TC的記錄 幾個表的數據。

階段二分支事務觀察:

通過庫存服務,觀察undolog記錄;

本實驗演示:通過配套視頻給出

異常執行

觀察每個階段的undo_log和TC的記錄

階段一全局事務:

通過seckill 服務,觀察 TC的記錄 幾個表的數據。

階段一分支事務觀察:

通過庫存服務,觀察undolog記錄;

階段二全局事務觀察:

通過seckill 服務,觀察 TC的記錄 幾個表的數據。

階段二分支事務觀察:

通過庫存服務,觀察undolog記錄;

本實驗演示:通過配套視頻給出

參考面試題

簡述:Seata AT模式的事務流程

Seata的分布式事務解決方案是業務層面的解決方案,只依賴於單台數據庫的事務能力。Seata框架中一個分布式事務包含3中角色:

  • Transaction Coordinator (TC): 事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾。
  • Transaction Manager (TM): 控制全局事務的邊界,負責開啟一個全局事務,並最終發起全局提交或全局回滾的決議。
  • Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

其中,TM是一個分布式事務的發起者和終結者,TC負責維護分布式事務的運行狀態,而RM則負責本地事務的運行。如下圖所示:

img

下面是一個分布式事務在Seata中的執行流程:

  1. TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的 XID。
  2. XID 在微服務調用鏈路的上下文中傳播。
  3. RM 向 TC 注冊分支事務,接着執行這個分支事務並提交(重點:RM在第一階段就已經執行了本地事務的提交/回滾),最后將執行結果匯報給TC。
  4. TM 根據 TC 中所有的分支事務的執行情況,發起全局提交或回滾決議。
  5. TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。

回顧: Seata執行流程

下面是一個Seata中一個分布式事務執行的詳細過程:

  1. 首先TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的 XID。
  2. XID 在微服務調用鏈路的上下文中傳播。
  3. RM 開始執行這個分支事務,RM首先解析這條SQL語句,生成對應的UNDO_LOG記錄。下面是一條UNDO_LOG中的記錄:
{
    "branchId": 641789253,
    "undoItems": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "GTS"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "TXC"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "sqlType": "UPDATE"
    }],
    "xid": "xid:xxx"
}

可以看到,UNDO_LOG表中記錄了分支ID,全局事務ID,以及事務執行的redo和undo數據以供二階段恢復。

  1. RM在同一個本地事務中執行業務SQL和UNDO_LOG數據的插入。

    在提交這個本地事務前,RM會向TC申請關於這條記錄的全局鎖。

    如果申請不到,則說明有其他事務也在對這條記錄進行操作,因此它會在一段時間內重試,重試失敗則回滾本地事務,並向TC匯報本地事務執行失敗。如下圖所示:

img

  1. RM在事務提交前,申請到了相關記錄的全局鎖,因此直接提交本地事務,並向TC匯報本地事務執行成功。此時全局鎖並沒有釋放,全局鎖的釋放取決於二階段是提交命令還是回滾命令。
  2. TC根據所有的分支事務執行結果,向RM下發提交或回滾命令。
  3. RM如果收到TC的提交命令,首先立即釋放相關記錄的全局鎖,然后把提交請求放入一個異步任務的隊列中,馬上返回提交成功的結果給 TC。異步隊列中的提交請求真正執行時,只是刪除相應 UNDO LOG 記錄而已。

img

提交.png

  1. RM如果收到TC的回滾命令,則會開啟一個本地事務,通過 XID 和 Branch ID 查找到相應的 UNDO LOG 記錄。將 UNDO LOG 中的后鏡與當前數據進行比較,如果有不同,說明數據被當前全局事務之外的動作做了修改。這種情況,需要根據配置策略來做處理。否則,根據 UNDO LOG 中的前鏡像和業務 SQL 的相關信息生成並執行回滾的語句並執行,然后提交本地事務達到回滾的目的,最后釋放相關記錄的全局鎖。

img

回滾.png

為什么Seata在第一階段就直接提交了分支事務?

Seata能夠在第一階段直接提交事務,原因是啥?

因為Seata框架為每一個RM維護了一張UNDO_LOG表(這張表需要客戶端自行創建),其中保存了每一次本地事務的回滾數據。

因此,二階段的回滾並不依賴於本地數據庫事務的回滾,而是RM直接讀取這張UNDO_LOG表,並將數據庫中的數據更新為UNDO_LOG中存儲的歷史數據。

Seata的本地事務提交,並不是發生在第二個階段,為啥?

如果第二階段是提交命令,那么RM事實上並不會對數據進行提交(因為一階段已經提交了),而事實上,seata是發起一個異步請求,刪除UNDO_LOG中關於本事務的記錄。

怎么使用Seata框架,來保證事務的隔離性?

Seata隔離級別

由於Seata一階段直接提交了本地事務,因此會造成隔離性問題,因此Seata的默認隔離級別為Read Uncommitted。

然而Seata也支持Read Committed的隔離級別,該如何如何實現呢?

Seata由於一階段RM自動提交本地事務的原因,默認隔離級別為Read Uncommitted。如果希望隔離級別為Read Committed,那么可以使用SELECT...FOR UPDATE語句。Seata引擎重寫了SELECT...FOR UPDATE語句執行邏輯,SELECT...FOR UPDATE 語句的執行會申請 全局鎖 ,如果 全局鎖 被其他事務持有,則釋放本地鎖(回滾 SELECT...FOR UPDATE 語句的本地執行)並重試。這個過程中,查詢是被 block 住的,直到 全局鎖 拿到,即讀取的相關數據是已提交的才返回。

img

SELECT FOR UPDATE.PNG

出於總體性能上的考慮,Seata 目前的方案並沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

因 seata一階段本地事務已提交,為防止其他事務臟讀臟寫需要加強隔離。
【1】臟讀 select語句加 for update(會去申請全局鎖),代理方法增加 @GlobalLock或 @GlobalTransaction;
【2】臟寫 必須使用 @GlobalTransaction;
【3】注:如果你查詢的業務的接口沒有 GlobalTransactional 包裹,也就是這個方法上壓根沒有分布式事務的需求,這時你可以在方法上標注 @GlobalLock注解,並且在查詢語句上加 for update。 如果你查詢的接口在事務鏈路上外層有GlobalTransactional注解,那么你查詢的語句只要加for update就行。設計這個注解的原因是在沒有這個注解之前,需要查詢分布式事務讀已提交的數據,但業務本身不需要分布式事務。 若使用 GlobalTransactional注解就會增加一些沒用的額外的 rpc開銷比如 begin 返回xid,提交事務等。GlobalLock簡化了 rpc過程,使其做到更高的性能。

默認spring只在發生未被捕獲的runtimeexcetpion時才回滾。

請比較一下:經典 XA和Seata AT模式

img

XA和Seata.PNG

注:Seata的曾用名為FESCAR。

如圖所示,XA 方案的 RM 實際上是在數據庫層,RM 本質上就是數據庫自身(通過提供支持 XA 的驅動程序來供應用使用)。而 Seata 的 RM 是以二方包的形式作為中間件層部署在應用程序這一側的,不依賴與數據庫本身對協議的支持,當然也不需要數據庫支持 XA 協議。這點對於微服務化的架構來說是非常重要的:應用層不需要為本地事務和分布式事務兩類不同場景來適配兩套不同的數據庫驅動。

另外,XA方案無論 Phase2 的決議是 commit 還是 rollback,事務性資源的鎖都要保持到 Phase2 完成才釋放。而對於Seata,將鎖分為了本地鎖和全局鎖,本地鎖由本地事務管理,在分支事務Phase1結束時就直接釋放。而全局鎖由TC管理,在決議 Phase2 全局提交時,全局鎖馬上可以釋放。只有在決議全局回滾的情況下,全局鎖 才被持有至分支的 Phase2 結束。因此,Seata對於資源的占用時間要少的多。對比如下圖所示:

img

XA鎖資源.PNG

img

Seata鎖資源.PNG

Seata 的方案其實一個 XA 兩階段提交的改進版,具體區別如下:

架構的層面

img

XA 方案的 RM 實際上是在數據庫層,RM 本質上就是數據庫自身(通過提供支持 XA 的驅動程序來供應用使用)。

而 Seata 的 RM 是以二方包的形式作為中間件層部署在應用程序這一側的,不依賴與數據庫本身對協議的支持,當然也不需要數據庫支持 XA 協議。這點對於微服務化的架構來說是非常重要的:應用層不需要為本地事務和分布式事務兩類不同場景來適配兩套不同的數據庫驅動。

這個設計,剝離了分布式事務方案對數據庫在 協議支持 上的要求。

兩階段提交

img

無論 Phase2 的決議是 commit 還是 rollback,事務性資源的鎖都要保持到 Phase2 完成才釋放。

設想一個正常運行的業務,大概率是 90% 以上的事務最終應該是成功提交的,我們是否可以在 Phase1 就將本地事務提交呢?這樣 90% 以上的情況下,可以省去 Phase2 持鎖的時間,整體提高效率。

img

  • 分支事務中數據的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。
  • 同時,隨着本地事務結束,連接 也得以釋放。
  • 分支事務中數據的 全局鎖 在事務協調器側管理,在決議 Phase2 全局提交時,全局鎖馬上可以釋放。只有在決議全局回滾的情況下,全局鎖 才被持有至分支的 Phase2 結束。

這個設計,極大地減少了分支事務對資源(數據和連接)的鎖定時間,給整體並發和吞吐的提升提供了基礎。

簡述:AT模式的性能問題?

參考文檔:

seata 官方文檔地址:

http://seata.io/zh-cn/docs/overview/what-is-seata.html

https://www.cnblogs.com/babycomeon/p/11504210.html

https://www.cnblogs.com/javashare/p/12535702.html

https://blog.csdn.net/qq853632587/article/details/111356009

https://blog.csdn.net/qq_35721287/article/details/103573862

https://www.cnblogs.com/anhaogoon/p/13033986.html

https://blog.51cto.com/u_15072921/2606182

https://blog.csdn.net/weixin_45661382/article/details/105539999

https://blog.csdn.net/f4761/article/details/89077400

https://blog.csdn.net/qq_27834905/article/details/107353159

https://zhuanlan.zhihu.com/p/266584169


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM