Seata分布式事務簡單使用


  在分布式開發過程中,分布式事務是必須面臨的問題。因為分布式系統中,存在多個服務之間的調用。服務與服務之間存在事務問題,可能在某個服務調用鏈過程中某個服務發生異常導致數據不一致問題。

  每個服務內部的數據一致性由本地事務控制,通常用@Transactional 來控制。但是服務拆分之后,多個服務協同完成的操作如果需要保證數據一致性就需要引入分布式事務。

0.理論基礎

  剛性事務:遵循ACID原則,強一致性,最常見的就是數據庫事務。

  柔性事務:遵循BASE理論,最終一致性;與剛性事務不同,柔性事務允許一定時間內,不同節點的數據不一致,但要求最終一致。一般說的分布式事務也就是柔性事務。

1.事務

  一個操作單元,在這個操作中的所有操作最終要保持一致的行為,要么所有操作都成功,要么所有操作都被撤銷。

單機版事務ACID:

A(Atomicity):原子性,操作這些指令時,要么全部失敗,要么全部成功。

C(Consistency):一致性。事務的執行結果是從一個狀態變為另一個狀態,數據庫的完整性約束沒有被破壞。能量守恆,總量不變。  比如轉賬操作轉來轉去總額不變。

I(Isolation):隔離性。多個並發事務之間相互隔離。信息彼此隔離,互不干擾。

D(Durability):當事務正確完成后,它對於數據的改變是永久性的。

   分布式事務是因為提供服務的節點在不同的機器上,相互之間通過網絡交互,因此分布式事務需要進一步的理論支持。

2.CAP原則

  在之間學習注冊中心的時候也學習過CAP原則,又稱CAP定理,指的是在一個分布式系統中,一致性(Consistency)、可用性(Avaliability)、分區容錯性(Partition tolerance)。這三個原則最多同時兼顧兩個,不能三者都實現。P是一定要滿足的,且一定保留,如果不保證P,那就不是一個分布式系統了。

一致性:在分布式系統的所有數據備份,在同一時刻是否有同樣的值。也就是副本最新。

可用性:在集群中一部分節點故障后,集群是否還能整體響應客戶端的讀寫請求。也就是高可用。每次向未崩潰的節點發送請求,總能收到響應數據,允許數據不是最新的。

分區容忍性:系統任意分區后,在網絡故障時,仍能操作。

(1)由於當前的網絡硬件肯定會出現延遲丟包等問題,所以分區容忍性是我們必須需要實現的。所以我們只能在一致性和可用性之間進行權衡。

CA滿足的情況下,P不能滿足的原因:數據同步需要時間,也要正常的時間內響應(A),那么機器數量就要少,所以P就不滿足

CP 滿足的情況下,A不能滿足的原因:數據同步需要時間, 機器數量也多,但是同步數據需要時間,所以不能再正常時間內響應,所以A就不滿足

AP 滿足的情況下,C不能滿足的原因:機器數量也多,正常的時間內響應(A),那么數據就不能及時同步到其他節點,所以C不滿足。

(2)關於三個注冊中心滿足的原則:

Zookeeper和Consul :CP設計,保證了一致性,集群搭建的時候,某個節點失效,則會進行選舉行的leader,或者半數以上節點不可用,則無法提供服務,因此可用性沒法滿足。

Eureka:AP原則,無主從節點,一個節點掛了,自動切換其他節點可以使用,去中心化。

3.一致性:數據一致性

一致性可以分為強一一致性與弱一致性。所謂強一致性就是復制數據是同步的,弱一致性就是復制數據是異步的。

CAP定理中只能在CAP中選擇兩個,一般是犧牲一致性換取可用性和分區容錯性,所謂犧牲一致性並不是完全放棄數據一致性,而是換取強一致性換取弱一致性。

1.強一致性:系統中的某個數據被更新成功后,后續任何對該數據的操作都將獲取最新的值,也稱為原子一致性。簡單的說就是任意時刻,所有節點的數據都是一樣的。

(1)一個集群如果保證強一致性,集群中某一台服務器的數據變了,那么就需要等待集群內其他服務器的數據同步完成后,才能正常對外提供服務

(2)保證了強一致性,務必會損耗可用性

2.弱一致性

  系統中的某個節點數據更新后,后續對該數據的讀取有可能是更新前的值,也有可能是更新后的值。可以理解為更新數據后,如果能容忍后續的訪問只能訪問到部分或者全部訪問不到,則是弱一致性。

3.最終一致性

是弱一致性的特殊形式,存儲系統保證在沒有新的更新條件下,最終所有的訪問都是最后更新的值。

不保證在任意時刻任意節點上的同一份數據都是相同的,但是隨着時間的遷移,不同節點上的同一份數據總是在向趨同的方向變化。

 簡單說,就是在一段時間后,節點間的數據會達到一致狀態。

4.Base理論

  BASE理論和CAP其實是互補的。BASE理論是對CAP中的一致性和可用性進行權衡的一個結果,理論的核心思想就是:我們無法做到強一致性,但每個應用都可以根據自身業務的特點,采用適當的方法來使系統達到最終一致性。

BA:BasicAvaliable 基本可用。

  整個系統某些不可抗力的情況下,仍然能保證可用性,即一定時間內仍然能返回一個明確的結果。只不過是"基本可用"和"高可用"的區別是:

  • 一定時間內可以適當延長。比如高可用3響應時間3s,基本可用5s響應時間
  • 給部分用戶返回一個服務降級頁面。降級頁面仍然是一個明確的結果。

S:Soft State  柔性狀態。是指允許系統中的數據存在中間狀態,並認為該中間狀態的存在不會影響系統的整體可用性,即允許系統不同節點的數據副本之間進行數據同步的過程存在延時。比如數據加在狀態"同步中"、"等待同步"等。

E:Eventual Consisstency 最終一致性。 同一數據的不同副本的狀態,可以不要實時一致性,但一定要保證經過有一定時間后仍然是一致的。

5. 2階段提交協議2PC

  Two-Phase Commit 是常用的分布式事務解決方法,即將事務的提交過程分為兩個階段來進行處理。

階段:

  • 准備階段,又稱為投票階段

(1)協調者向所有參與者發送事務內容,詢問是否可以提交事務,且等待回復,進入表決過程;

(2)參與者執行執行事務操作,將redo和undo日志記入事務日志中(但不提交事務);

(3)如參與者執行成功給協調者反饋成功,否則反饋失敗

  • 提交階段

  協調者基於第一個階段的投票結果進行決策: 提交或取消。當且僅當所有的參與者同意提交事務,協調者才通知所有的參與者提交事務,否則協調者將通知所有的參與者取消事務,參與者在接收到協調者發來的消息后將執行響應的操作並且釋放事務期間內占用的資源。

參與的角色:

  • 協調者:事務的發起者
  • 參與者:事務的執行者

2PC的優缺點:
優點:盡量保證了數據的強一致性,但也不能百分百保證強一致性
缺點:
(1)性能問題:執行過程中,所有參與節點都是事務阻塞型的。當參與者占用公共資源時,其他第三方節點訪問資源必須處於阻塞狀態
(2)可靠性問題:參與者發生故障,協調者需要給每個參與者額外指定超時機制,超時后整個事務失敗。協調者發生故障,參與者一定會阻塞下去,需要額外的備機進行容錯。
(3)數據一致性問題:二階段無法解決的問題:協調者在發出commit消息之后宕機,而唯一接受消息的參與者也同時宕機,那么即使協調者選舉產生了新的協調者,這條事務的狀態也是不確定的,無法確定事務是否已經被提交。

6.3階段提交3PC

相比於2PC,有兩個改動點:

(1)協調者和參與者都引入超時機制
(2)第一階段和第二階段中間插入了一個准備階段,保證了再最后提交階段之前各參與節點的狀態是一致的。
  也就是說除了引入超時機制之外,3pc把2pc的准備階段一分為二,這樣三階段就有CanCommit,PreCommit,DoCommit
CanCommit:協調者向參與者發送commit請求,參與者如果可以提交就返回Yes響應,否則返回No響應。只詢問是否可以但是不會執行
PreCommit: 進行事務的預執行,記錄undo和redo日志,但不會提交
doCommit:執行事務提交。
  在上面的過程中,如果參與者給協調者的反饋是no,則會中斷事務的操作。在doCommit階段,如果參與者無法及時接收到來自協調者的doCommit或者rebort請求時,會在等待超時之后,會繼續進行事務的提交。簡單的說就是當進入第三階段時,由於網絡超時等原因,雖然參與者沒有收到commit或者abort響應,但是他有理由相信:成功提交的幾率很大。

優缺點:
優點:相比於二階段提交,三階段提交降低了阻塞范圍,在等待超時后協調者或參與者會中斷事務,避免了協調者單點問題,如果階段3 doCommit 階段出現問題,參與者會繼續提交事務,會導致事務的不一致性。
缺點:數據不一致問題依然存在,當參與者接到preCommit請求后等待doCommit指令時,此時如果協調者請求中斷事務,而協調者與參與者無法正常通信,會導致參與者繼續提交事務,造成數據不一致。

7.分布式事務解決方案

1. TCC Try-Confirm-Cancel 最常見。補償事務。核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作
  開源的實現: https://github.com/liuyangming/ByteTCC.git

  當然seata也有TCC模式的實現。
2. 全局消息
3. 基於可靠消息服務的分布式事務
4. 最大努力通知:業務活動的主動方在完成處理之后向業務活動的被動方發送消息,允許消息丟失。業務活動的被動方根據定時策略,向業務活動的主動方查詢,恢復丟失的業務消息。

1.Seata簡介

Seata 是一款開源的分布式事務解決方案,致力於在微服務架構下提供高性能和簡單易用的分布式事務服務。

官網: http://seata.io/zh-cn/

1. 術語

一個ID加三個組件。

(1)XID:Transaction  ID 全局事務唯一ID

(2)三組件:

TC (Transaction Coordinator) - 事務協調者:維護全局和分支事務的狀態,驅動全局事務提交或回滾。

TM (Transaction Manager) - 事務管理器:定義全局事務的范圍:開始全局事務、提交或回滾全局事務。

RM (Resource Manager) - 資源管理器:管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾。

2. 處理過程

如下圖:

1.TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);

2.按業務場景,編排數據庫、服務等事務內資源(RM 向 TC 匯報資源准備狀態 );

3.TM 結束分布式事務,事務一階段結束(TM 通知 TC 提交/回滾分布式事務);

4.TC 匯總事務信息,決定分布式事務是提交還是回滾;

5.TC 通知所有 RM 提交/回滾 資源,事務二階段結束;

 2.Seta安裝

1.下載

http://seata.io/zh-cn/blog/download.html

例如我下載的版本是:seata-server-1.4.0.zip

2.解壓后目錄如下:

3. 查看README文件,內容如下:

# 腳本說明

## [client](https://github.com/seata/seata/tree/develop/script/client) 

> 存放用於客戶端的配置和SQL

- at: AT模式下的 `undo_log` 建表語句
- conf: 客戶端的配置文件
- saga: SAGA 模式下所需表的建表語句
- spring: SpringBoot 應用支持的配置文件

## [server](https://github.com/seata/seata/tree/develop/script/server)

> 存放server側所需SQL和部署腳本

- db: server 側的保存模式為 `db` 時所需表的建表語句
- docker-compose: server 側通過 docker-compose 部署的腳本
- helm: server 側通過 Helm 部署的腳本
- kubernetes: server 側通過 Kubernetes 部署的腳本

## [config-center](https://github.com/seata/seata/tree/develop/script/config-center)

> 用於存放各種配置中心的初始化腳本,執行時都會讀取 `config.txt`配置文件,並寫入配置中心

- nacos: 用於向 Nacos 中添加配置
- zk: 用於向 Zookeeper 中添加配置,腳本依賴 Zookeeper 的相關腳本,需要手動下載;ZooKeeper相關的配置可以寫在 `zk-params.txt` 中,也可以在執行的時候輸入
- apollo: 向 Apollo 中添加配置,Apollo 的地址端口等可以寫在 `apollo-params.txt`,也可以在執行的時候輸入
- etcd3: 用於向 Etcd3 中添加配置
- consul: 用於向 consul 中添加配置

4.到https://github.com/seata/seata/tree/develop/script/server 查看建表語句。(這里暫時記住是AT模式,區別之后分析)

到數據庫執行mysql的建表語句

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

5.將配置導入到nacos (這一步也可以使用本地文件作為配置中心)

(1)nacos新建分命名空間

 

 (2) 參考 https://github.com/seata/seata/tree/develop/script/config-center  將配置文件導入nacos

1》下載config.txt, 並且修改里面的store.mode=db以及賬戶密碼信息,如下:

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

2》將config.txt拷貝到與nacos-config.sh同級目錄(這個文件也是在config-center 下載)

3》執行下面:

liqiang@root MINGW64 ~/Desktop/file/seata/configuration/nacos
$ sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d -u nacos -w nacos

 4》導入成功可以看到:

=========================================================================
 Complete initialization parameters,  total-count:80 ,  failure-count:0
=========================================================================
 Init nacos config finished, please start seata-server.

6.修改conf/registry.conf文件:修改指明注冊中心和配置中心

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "6a1ba3ab-1821-43a6-b7ba-77272ea94c7d"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }
}

7.啟動seata服務

seata\bin/seata-server.bat雙擊即可

8.啟動成功可以到nacos查看服務列表與配置列表進行驗證

3. 簡單使用實現分布式事務

 如下場景涉及分布式事務:

模擬用戶下單,會在訂單服務創建一個訂單,然后遠程調用庫存服務減去庫存,再通過賬戶服務來減去用戶賬戶的余額,最后修改訂單狀態為已完成。

涉及到的服務:訂單、庫存、賬戶服務。

參考:https://gitee.com/itCjb/spring-cloud-alibaba-seata-demo

這里的版本是基於seta1.4.0。 之前看0.9版本的教程都是將register.conf 拷到工程,目前的版本可以直接基於yml配置。

1. 庫存服務

1.數據庫  (注意每個業務數據庫都需要加undo_log表)

/*
Navicat MySQL Data Transfer

Source Server         : mysql
Source Server Version : 50721
Source Host           : localhost:3306
Source Database       : test_storage

Target Server Type    : MYSQL
Target Server Version : 50721
File Encoding         : 65001

Date: 2019-12-08 15:11:00
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `price` double DEFAULT NULL,
  `stock` int(11) DEFAULT NULL,
  `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES ('1', '5', '9', '2019-12-06 21:51:01');

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of undo_log
-- ----------------------------

2. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-alibaba-seata-demo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product-service</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <!-- nacos 作為服務注冊中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- nacos 作為配置中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>druid</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
    </dependencies>

</project>

3.yml配置

1》application.yml

  注意   tx-service-group: my_test_tx_group  需要和之前server端配置的保持一致。 seata.config 可以理解為從nacos讀取一些配置,之前和服務端部署的時候有一些共有的配置;seata.registry 表示seata要注冊到的seata server,從nacos讀取該服務並注冊進去,用於RM和TC直接的交互。

spring:
   datasource:
      type: com.alibaba.druid.pool.DruidDataSource
      url: jdbc:mysql://127.0.0.1:3306/test_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: 123456
      max-wait: 60000
      max-active: 100
      min-idle: 10
      initial-size: 10
mybatis-plus:
   mapper-locations: classpath:/mapper/*Mapper.xml
   typeAliasesPackage: icu.funkye.entity
   global-config:
      db-config:
         field-strategy: not-empty
         id-type: auto
         db-type: mysql
   configuration:
      map-underscore-to-camel-case: true
      cache-enabled: true
      auto-mapping-unknown-column-behavior: none
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
seata:
   enabled: true
   application-id: product-service
   tx-service-group: my_test_tx_group
   config:
      type: nacos
      nacos:
         namespace: 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d
         serverAddr: 127.0.0.1:8848
         group: SEATA_GROUP
         username: "nacos"
         password: "nacos"
   registry:
      type: nacos
      nacos:
         application: seata-server
         server-addr: 127.0.0.1:8848
         group: SEATA_GROUP
         namespace:
         username: "nacos"
         password: "nacos"

2》bootstrap.yml

spring:
   application:
      name: product-service
   main:
      allow-bean-definition-overriding: true
   cloud:
      nacos:
         discovery:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
         config:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
server:
   port: 8083

4.主啟動類

package icu.funkye;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author funkye
 */
@SpringBootApplication
public class ProductServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProductServiceApplication.class, args);
    }

}

5. 業務代碼

業務代碼就不貼了,就是一個簡單的CRUD

6.啟動: 注意先啟動nacos,再啟動seata-server,最后啟動項目

啟動項目可以看到RM相關注冊信息:

 2. 賬戶服務和訂單服務

。。。在最后的git地址貼出

3.最后的client服務

client服務相當於一個網關層,調用feignclient服務完成相關的操作。

測試Controller如下:

    /**
     * 測試異常回滾
     *
     * @return
     * @throws TransactionException
     */
    @GetMapping(value = "testRollback")
    @GlobalTransactional
    public Object testRollback() throws TransactionException {
        Product product = productService.getById(1);
        if (product.getStock() > 0) {
            LocalDateTime now = LocalDateTime.now();
            logger.info("seata分布式事務Id:{}", RootContext.getXID());
            Account account = accountService.getById(1);
            Orders orders = new Orders();
            orders.setCreateTime(now);
            orders.setProductId(product.getId());
            orders.setReplaceTime(now);
            orders.setSum(1);
            orders.setAmount(product.getPrice());
            orders.setAccountId(account.getId());
            product.setStock(product.getStock() - 1);
            account.setSum(account.getSum() != null ? account.getSum() + 1 : 1);
            account.setLastUpdateTime(now);
            // 庫存減去一
            productService.updateById(product);
            // 賬戶加1
            accountService.updateById(account);

            int i = 1 / 0;

            // 創建訂單
            orderService.save(orders);

            return true;
        }
        return false;
    }

4. debug之后測試

1.斷點打在client中    int i = 1 / 0    這一行。

2.代碼執行到這里,查看相關服務日志:

(1) 庫存服務日志如下: 可以看到將庫存數量  stock修改為8

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@7db2daa6] will not be managed by Spring
Original SQL: SELECT id,price,stock,last_update_time FROM product WHERE id=? 
parser sql: SELECT id, price, stock, last_update_time FROM product WHERE id = ?
==>  Preparing: SELECT id, price, stock, last_update_time FROM product WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, price, stock, last_update_time
<==        Row: 1, 5.0, 9, 2019-12-06 21:51:01
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8]
2020-12-13 21:19:35.759  INFO 14932 --- [nio-8083-exec-2] icu.funkye.controller.ProductController  : product:Product(id=1, price=5.0, stock=8, lastUpdateTime=2019-12-06T21:51:01)
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@27e7719a] will not be managed by Spring
Original SQL: UPDATE product  SET price=?,
stock=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 5.0(BigDecimal), 8(Integer), 2019-12-06T21:51:01(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d]

(2)賬戶服務日志如下: 可以看到將sum修改為2,可以理解為消費2個

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@11321cdc] will not be managed by Spring
Original SQL: SELECT id,user_name,sum,last_update_time FROM account WHERE id=? 
parser sql: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ?
==>  Preparing: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, user_name, sum, last_update_time
<==        Row: 1, 1, 1, 2019-12-08 15:05:05
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152]
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@2bbfadf0] will not be managed by Spring
Original SQL: UPDATE account  SET user_name=?,
sum=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 1(String), 2(Integer), 2020-12-13T21:19:35.710(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9]
2020-12-13 21:20:37.238  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81496901952864256,branchId=81496905018900480,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_pay,applicationData=null
2020-12-13 21:20:37.239  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81496901952864256 81496905018900480 jdbc:mysql://127.0.0.1:3306/test_pay
Sun Dec 13 21:20:37 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2020-12-13 21:20:37.404  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81496901952864256 branch 81496905018900480, undo_log deleted with GlobalFinished
2020-12-13 21:20:37.406  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

(3)client服務日志如下:

2020-12-13 21:19:35.677  INFO 19848 --- [nio-8081-exec-6] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [169.254.51.32:8091:81496901952864256]
2020-12-13 21:19:35.710  INFO 19848 --- [nio-8081-exec-6] icu.funkye.controller.TestController     : seata分布式事務Id:169.254.51.32:8091:81496901952864256

3.代碼執行到這里,查看數據庫

(1) 查看seataserver服務器的三個表數據:

mysql> select * from lock_table\G
*************************** 1. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_pay^^^account^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498628714266625
   resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
    table_name: account
            pk: 1
    gmt_create: 2020-12-13 21:26:27
  gmt_modified: 2020-12-13 21:26:27
*************************** 2. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_storage^^^product^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498626721972225
   resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
    table_name: product
            pk: 1
    gmt_create: 2020-12-13 21:26:26
  gmt_modified: 2020-12-13 21:26:26
2 rows in set (0.00 sec)

mysql> select * from branch_table\G
*************************** 1. row ***************************
        branch_id: 81498626721972225
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
      branch_type: AT
           status: 0
        client_id: product-service:169.254.51.32:56219
 application_data: NULL
       gmt_create: 2020-12-13 21:26:26.893741
     gmt_modified: 2020-12-13 21:26:26.893741
*************************** 2. row ***************************
        branch_id: 81498628714266625
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
      branch_type: AT
           status: 0
        client_id: account-service:169.254.51.32:56051
 application_data: NULL
       gmt_create: 2020-12-13 21:26:27.387070
     gmt_modified: 2020-12-13 21:26:27.387070
2 rows in set (0.07 sec)

mysql> select * from global_table\G
*************************** 1. row ***************************
                      xid: 169.254.51.32:8091:81498624360579072
           transaction_id: 81498624360579072
                   status: 1
           application_id: client
transaction_service_group: my_test_tx_group
         transaction_name: testRollback()
                  timeout: 60000
               begin_time: 1607865986218
         application_data: NULL
               gmt_create: 2020-12-13 21:26:26
             gmt_modified: 2020-12-13 21:26:26
1 row in set (0.12 sec)

可以看到lock_table 記錄了行鎖的信息,鎖住了數據庫某個表某條數據的ID信息。

branch_table 記錄了當前Tx參與的會話分支。

global_table 記錄了Tx信息。

(2)查看庫存庫中的產品表:發現庫存數是8

(3) 查看庫存庫中的undo_log中數據如下

 可以看到第三條數據,記錄branchId、xid、以及一個rollback_info與log_status(1是已經處理,2是未處理)。rollback_info信息如下:

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
    "xid": "169.254.51.32:8091:81498624360579072",
    "branchId": 81498626721972225,
    "sqlUndoLogs": ["java.util.ArrayList", [{
        "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
        "sqlType": "UPDATE",
        "tableName": "product",
        "beforeImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 9
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575669061000, 0]]
                }]]
            }]]
        },
        "afterImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 8
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575640261000, 0]]
                }]]
            }]]
        }
    }]]
}

4. 放開斷點讓程序報錯,查看產品庫存:發現仍然是9,證明分布式事務確實回滾。

也可以查看業務日志:如下是庫存服務的日志(xid、branchid與上面不一致是多次試驗截得)

2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81501788107309056,branchId=81501789592092673,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_storage,applicationData=null
2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81501788107309056 81501789592092673 jdbc:mysql://127.0.0.1:3306/test_storage
2020-12-13 21:39:11.408  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81501788107309056 branch 81501789592092673, undo_log deleted with GlobalFinished
2020-12-13 21:39:11.410  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

 5. Seata過程簡單總結

 1. 五步驟再次理解

1.TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
2.按業務場景,編排數據庫、服務等事務內資源(RM 向 TC 匯報資源准備狀態 );
3.TM 結束分布式事務,事務一階段結束(TM 通知 TC 提交/回滾分布式事務);
4.TC 匯總事務信息,決定分布式事務是提交還是回滾;
5.TC 通知所有 RM 提交/回滾 資源,事務二階段結束;

 

 TM可以理解為加了GlobalTransactional 注解的方法,會向TC申請一個全局事務ID(XID),TC就是seataserver。RM就是資源管理器,可以理解為一個數據庫連接就是一個RM,一個業務庫對應一個RM。

2. 上面其實是 Seata的AT模式,可以理解為無代碼侵入,不需自己編寫回滾以及其他方法,seata自己控制。官方對AT模式解釋如下:提供無侵入自動補償的事務模式,目前已支持 MySQL、 Oracle 、PostgreSQL和 TiDB的AT模式。

3.AT模式整體機制如下:

兩階段提交協議的演變:

一階段:業務數據和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源。

二階段:

  提交異步化,非常快速地完成。
  回滾通過一階段的回滾日志進行反向補償。

解釋:

第一階段seata會攔截業務SQL(使用seata代理數據源進行處理,也就是對數據源做手腳):

(1)解析業務SQL將要更新的數據,記錄其更新前的值,"BeforeImage"

(2)執行業務SQL,進行更新

(3)記錄更新后的值,"AfterImage",最后生成行鎖。

  上面的操作在一個事務內操作,可以保證原子性。可以通過上面業務庫的undo_log日志表的rollback_info 字段進行查看,說白了就是記錄一下更新前、更新后的值;如果報錯需要回滾將數據修改為更新前的值,如果正常提交刪掉undo_log中的記錄即可。

第二階段:如果報錯就回滾,否則提交。

提交》如果正常操作進行提交事務。Seata框架只需要將一階段生成的快照和行鎖刪掉即可,完成數據清理。

回滾》回滾時,使用"before image"進行還原數據,但是還原之前要校驗臟寫,對比"數據庫當前數據"和"after image",如果兩份數據一致證明沒有臟寫,可以還原;如果不一致,證明有臟寫,這時候就需要人工處理(可以根據undo_log中的記錄進行處理)。

 

 seataserver安裝文件以及上面測試代碼參考:https://gitee.com/Qiao-Zhi/seata-test

分布式事務 Seata 及其三種模式詳解:  http://seata.io/zh-cn/blog/seata-at-tcc-saga.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM