分布式事務解決方案之2PC(Seata)


 什么是2PC

2PC即兩階段提交協議,是將整個事務流程分為兩個階段,准備階段(Prepare phase)、提交階段(commit
phase),2是指兩個階段,P是指准備階段,C是指提交階段。
舉例:張三和李四好久不見,老友約起聚餐,飯店老板要求先買單,才能出票。這時張三和李四分別抱怨近況不如
意,囊中羞澀,都不願意請客,這時只能AA。只有張三和李四都付款,老板才能出票安排就餐。但由於張三和李四
都是鐵公雞,形成了尷尬的一幕:
准備階段:老板要求張三付款,張三付款。老板要求李四付款,李四付款。
提交階段:老板出票,兩人拿票紛紛落座就餐。
例子中形成了一個事務,若張三或李四其中一人拒絕付款,或錢不夠,店老板都不會給出票,並且會把已收款退回。
整個事務過程由事務管理器和參與者組成,店老板就是事務管理器,張三、李四就是事務參與者,事務管理器負責
決策整個分布式事務的提交和回滾,事務參與者負責自己本地事務的提交和回滾。
在計算機中部分關系數據庫如Oracle、MySQL支持兩階段提交協議,如下圖:
1. 准備階段(Prepare phase):事務管理器給每個參與者發送Prepare消息,每個數據庫參與者在本地執行事
務,並寫本地的Undo/Redo日志,此時事務沒有提交。
(Undo日志是記錄修改前的數據,用於數據庫回滾,Redo日志是記錄修改后的數據,用於提交事務后寫入數
據文件)
2. 提交階段(commit phase):如果事務管理器收到了參與者的執行失敗或者超時消息時,直接給每個參與者
發送回滾(Rollback)消息;否則,發送提交(Commit)消息;參與者根據事務管理器的指令執行提交或者回滾操
作,並釋放事務處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
下圖展示了2PC的兩個階段,分成功和失敗兩個情況說明:
成功情況:

 失敗情況:

解決方案
XA方案
2PC的傳統方案是在數據庫層面實現的,如Oracle、MySQL都支持2PC協議,為了統一標准減少行業內不必要的對
接成本,需要制定標准化的處理模型及接口標准,國際開放標准組織Open Group定義了分布式事務處理模型
DTP(Distributed Transaction Processing Reference Model)。
為了讓大家更明確XA方案的內容程,下面新用戶注冊送積分為例來說明:

執行流程如下:
1、應用程序(AP)持有用戶庫和積分庫兩個數據源。
2、應用程序(AP)通過TM通知用戶庫RM新增用戶,同時通知積分庫RM為該用戶新增積分,RM此時並未提交事務,此時用戶和積分資源鎖定。
3、TM收到執行回復,只要有一方失敗則分別向其他RM發起回滾事務,回滾完畢,資源鎖釋放。
4、TM收到執行回復,全部成功,此時向所有RM發起提交事務,提交完畢,資源鎖釋放。

DTP模型定義如下角色:
AP(Application Program):即應用程序,可以理解為使用DTP分布式事務的程序。
RM(Resource Manager):即資源管理器,可以理解為事務的參與者,一般情況下是指一個數據庫實例,通過
資源管理器對該數據庫進行控制,資源管理器控制着分支事務。
TM(Transaction Manager):事務管理器,負責協調和管理事務,事務管理器控制着全局事務,管理事務生命
周期,並協調各個RM。全局事務是指分布式事務處理環境中,需要操作多個數據庫共同完成一個工作,這個
工作即是一個全局事務。
DTP模型定義TM和RM之間通訊的接口規范叫XA,簡單理解為數據庫提供的2PC接口協議,基於數據庫的XA
協議來實現2PC又稱為XA方案。
以上三個角色之間的交互方式如下:
1)TM向AP提供 應用程序編程接口,AP通過TM提交及回滾事務。
2)TM交易中間件通過XA接口來通知RM數據庫事務的開始、結束以及提交、回滾等。

總結:
整個2PC的事務流程涉及到三個角色AP、RM、TM。AP指的是使用2PC分布式事務的應用程序;RM指的是資
源管理器,它控制着分支事務;TM指的是事務管理器,它控制着整個全局事務。
1)在准備階段RM執行實際的業務操作,但不提交事務,資源鎖定;
2)在提交階段TM會接受RM在准備階段的執行回復,只要有任一個RM執行失敗,TM會通知所有RM執行回滾操
作,否則,TM將會通知所有RM提交該事務。提交階段結束資源鎖釋放。
XA方案的問題:
1、需要本地數據庫支持XA協議。
2、資源鎖需要等到兩個階段結束才釋放,性能較差。

Seata方案
Seata是由阿里中間件團隊發起的開源項目 Fescar,后更名為Seata,它是一個是開源的分布式事務框架。
傳統2PC的問題在Seata中得到了解決,它通過對本地關系數據庫的分支事務的協調來驅動完成全局事務,是工作
在應用層的中間件。主要優點是性能較好,且不長時間占用連接資源,它以高效並且對業務0侵入的方式解決微服
務場景下面臨的分布式事務問題,它目前提供AT模式(即2PC)及TCC模式的分布式事務解決方案。
Seata的設計思想如下:

Seata的設計目標其一是對業務無侵入,因此從業務無侵入的2PC方案着手,在傳統2PC的基礎上演進,並解決
2PC方案面臨的問題。
Seata把一個分布式事務理解成一個包含了若干分支事務的全局事務。全局事務的職責是協調其下管轄的分支事務
達成一致,要么一起成功提交,要么一起失敗回滾。此外,通常分支事務本身就是一個關系數據庫的本地事務,下
圖是全局事務與分支事務的關系圖:

 與 傳統2PC 的模型類似,Seata定義了3個組件來協議分布式事務的處理過程:

Transaction Coordinator (TC): 事務協調器,它是獨立的中間件,需要獨立部署運行,它維護全局事務的運
行狀態,接收TM指令發起全局事務的提交與回滾,負責與RM通信協調各各分支事務的提交或回滾。
Transaction Manager (TM): 事務管理器,TM需要嵌入應用程序中工作,它負責開啟一個全局事務,並最終
向TC發起全局提交或全局回滾的指令。
Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器TC的指令,驅動分
支(本地)事務的提交和回滾。

還拿新用戶注冊送積分舉例Seata的分布式事務過程:

具體的執行流程如下:
1. 用戶服務的 TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的XID。
2. 用戶服務的 RM 向 TC 注冊 分支事務,該分支事務在用戶服務執行新增用戶邏輯,並將其納入 XID 對應全局
事務的管轄。
3. 用戶服務執行分支事務,向用戶表插入一條記錄。
4. 邏輯執行到遠程調用積分服務時(XID 在微服務調用鏈路的上下文中傳播)。積分服務的RM 向 TC 注冊分支事
務,該分支事務執行增加積分的邏輯,並將其納入 XID 對應全局事務的管轄。
5. 積分服務執行分支事務,向積分記錄表插入一條記錄,執行完畢后,返回用戶服務。
6. 用戶服務分支事務執行完畢。
7. TM 向 TC 發起針對 XID 的全局提交或回滾決議。
8. TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。

Seata實現2PC與傳統2PC的差別:
架構層次方面,傳統2PC方案的 RM 實際上是在數據庫層,RM 本質上就是數據庫自身,通過 XA 協議實現,而
Seata的 RM 是以jar包的形式作為中間件層部署在應用程序這一側的。
兩階段提交方面,傳統2PC無論第二階段的決議是commit還是rollback,事務性資源的鎖都要保持到Phase2完成
才釋放。而Seata的做法是在Phase1 就將本地事務提交,這樣就可以省去Phase2持鎖的時間,整體提高效率。

seata實現2PC事務
業務說明
本示例通過Seata中間件實現分布式事務,模擬三個賬戶的轉賬交易過程。
兩個賬戶在三個不同的銀行(張三在bank1、李四在bank2),bank1和bank2是兩個個微服務。交易過程是,張三
給李四轉賬指定金額。

交互流程如下:
1、請求bank1進行轉賬,傳入轉賬金額。
2、bank1減少轉賬金額,調用bank2,傳入轉賬金額。
3.創建數據庫

/*
SQLyog v10.2 
MySQL - 5.7.21-log : Database - bank1
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `bank1`;

/*Table structure for table `account_info` */

DROP TABLE IF EXISTS `account_info`;

CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名',
  `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號',
  `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼',
  `account_balance` double DEFAULT NULL COMMENT '帳戶余額',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;

/*Data for the table `account_info` */

insert  into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'張三','1',NULL,1000);

/*Table structure for table `de_duplication` */

DROP TABLE IF EXISTS `de_duplication`;

CREATE TABLE `de_duplication` (
  `tx_no` varchar(64) COLLATE utf8_bin NOT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;

/*Data for the table `de_duplication` */

/*Table structure for table `local_cancel_log` */

DROP TABLE IF EXISTS `local_cancel_log`;

CREATE TABLE `local_cancel_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_cancel_log` */

/*Table structure for table `local_confirm_log` */

DROP TABLE IF EXISTS `local_confirm_log`;

CREATE TABLE `local_confirm_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_confirm_log` */

/*Table structure for table `local_trade_log` */

DROP TABLE IF EXISTS `local_trade_log`;

CREATE TABLE `local_trade_log` (
  `tx_no` bigint(20) NOT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;

/*Data for the table `local_trade_log` */



DROP TABLE IF EXISTS `local_try_log`;

CREATE TABLE `local_try_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_try_log` */

/*Table structure for table `undo_log` */

DROP TABLE IF EXISTS `undo_log`;

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=167 DEFAULT CHARSET=utf8;


/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

bank2庫,包含李四賬戶

/*
SQLyog v10.2 
MySQL - 5.7.21-log : Database - bank2
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank2` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `bank2`;

/*Table structure for table `account_info` */

DROP TABLE IF EXISTS `account_info`;

CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名',
  `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號',
  `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼',
  `account_balance` double DEFAULT NULL COMMENT '帳戶余額',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;

/*Data for the table `account_info` */

insert  into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的賬戶','2',NULL,0);

/*Table structure for table `de_duplication` */

DROP TABLE IF EXISTS `de_duplication`;

CREATE TABLE `de_duplication` (
  `tx_no` varchar(64) COLLATE utf8_bin NOT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;

/*Data for the table `de_duplication` */

/*Table structure for table `local_cancel_log` */

DROP TABLE IF EXISTS `local_cancel_log`;

CREATE TABLE `local_cancel_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_cancel_log` */

/*Table structure for table `local_confirm_log` */

DROP TABLE IF EXISTS `local_confirm_log`;

CREATE TABLE `local_confirm_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_confirm_log` */

/*Table structure for table `local_trade_log` */

DROP TABLE IF EXISTS `local_trade_log`;

CREATE TABLE `local_trade_log` (
  `tx_no` bigint(20) NOT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;


/*Table structure for table `local_try_log` */

DROP TABLE IF EXISTS `local_try_log`;

CREATE TABLE `local_try_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事務id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `local_try_log` */

/*Table structure for table `undo_log` */

DROP TABLE IF EXISTS `undo_log`;

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `undo_log` */

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

4.啟動TC(事務協調器)
(1)下載seata服務器
下載地址:https://github.com/seata/seata/releases/download/v0.7.1/seata-server-0.7.1.zip
也可以直接解壓:資料\seata-server-0.7.1.zip
(2)解壓並啟動
[seata服務端解壓路徑]/bin/seata-server.bat -p 8888 -m file
注:其中8888為服務端口號;file為啟動模式,這里指seata服務將采用文件的方式存儲信息。

 由於阿里雲沒有安裝jdk環境,使用就在本地開啟

測試代碼結構

注冊中心配置

spring:
    application:
        name: seata-demo-discovery
server:
    port: 9900 #啟動端口

eureka:
  server:
    enable-self-preservation: false    #關閉服務器自我保護,客戶端心跳檢測15分鍾內錯誤達到80%服務會保護,導致別人還認為是好用的服務
    eviction-interval-timer-in-ms: 10000 #清理間隔(單位毫秒,默認是60*1000)5秒將客戶端剔除的服務在服務注冊列表中剔除# 
    shouldUseReadOnlyResponseCache: true #eureka是CAP理論種基於AP策略,為了保證強一致性關閉此切換CP 默認不關閉 false關閉
    response-cache-update-interval-ms: 3000  #eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上 #eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上默認30s
    response-cache-auto-expiration-in-seconds: 180   #eureka server緩存readWriteCacheMap失效時間,這個只有在這個時間過去后緩存才會失效,失效前不會更新,過期后從registry重新讀取注冊服務信息,registry是一個ConcurrentHashMap。
  client: 
    register-with-eureka: false  #false:不作為一個客戶端注冊到注冊中心
    fetch-registry: false      #為true時,可以啟動,但報異常:Cannot execute request on any known server
    instance-info-replication-interval-seconds: 10 
    serviceUrl: 
      defaultZone: http://localhost:${server.port}/eureka/
  instance:
    hostname: ${spring.cloud.client.ip-address}
    prefer-ip-address: true
    instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
    lease-renewal-interval-in-seconds: 5    # 續約更新時間間隔(默認30秒)
    lease-expiration-duration-in-seconds: 10 # 續約到期時間(默認90秒)

logging: 
  config: classpath:log4j2-dev.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration monitorInterval="180" packages="">
    <properties>
    	<property name="prjname">${project.name}</property>
        <property name="logdir">logs</property>
        <property name="PATTERN"> %date{YYYY-MM-dd HH:mm:ss,SSS} %level [%thread][%file:%line] - %msg%n%throwable</property>
    </properties>
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="${PATTERN}"/>
        </Console>

        <RollingFile name="ErrorAppender" fileName="${logdir}/${prjname}_error.log"
            filePattern="${logdir}/$${date:yyyy-MM-dd}/${prjname}_error.%d{yyyy-MM-dd-HH}.log" append="true">
            <PatternLayout pattern="${PATTERN}"/>
            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1" modulate="true" />
            </Policies>
        </RollingFile>

        <RollingFile name="DebugAppender" fileName="${logdir}/${prjname}_info.log"
            filePattern="${logdir}/$${date:yyyy-MM-dd}/${prjname}_info.%d{yyyy-MM-dd-HH}.log" append="true">
            <PatternLayout pattern="${PATTERN}"/>
            <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
            <Policies>
                <TimeBasedTriggeringPolicy interval="1" modulate="true" />
            </Policies>
        </RollingFile>
        
        <!--異步appender-->
         <Async name="AsyncAppender" includeLocation="true">
            <AppenderRef ref="ErrorAppender"/>
            <AppenderRef ref="DebugAppender"/>
        </Async>
    </Appenders>
    
    <Loggers>
        <logger name="org.springframework" level="INFO">
        </logger>
        <logger name="org.mybatis" level="INFO">
        </logger>
        <logger name="springfox" level="INFO">
        </logger>
		<logger name="org.apache.http" level="INFO">
        </logger>
        <logger name="com.netflix.discovery" level="INFO">
        </logger>

        <Root level="INFO" includeLocation="true">
            <AppenderRef ref="AsyncAppender"/>
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

配置

把壓縮包里面的2個配置文件拿到resource下面,修改配置

 

 

 

 在file.conf中更改service.vgroup_mapping.[springcloud服務名]-fescar-service-group = "default",並修改

service.default.grouplist =[seata服務端地址]

關於vgroup_mapping的配置:

vgroup_mapping.事務分組服務名=Seata Server集群名稱(默認名稱為default)

default.grouplist = Seata Server集群地址

Seata執行流程

1、正常提交流程

 

 2、回滾流程

回滾流程省略前的RM注冊過程。

 

 要點說明:

1、每個RM使用DataSourceProxy連接數據庫,其目的是使用ConnectionProxy,使用數據源和數據連接代理的目

的就是在第一階段將undo_log和業務數據放在一個本地事務提交,這樣就保存了只要有業務操作就一定有

undo_log。

2、在第一階段undo_log中存放了數據修改前和修改后的值,為事務回滾作好准備,所以第一階段完成就已經將分

支事務提交,也就釋放了鎖資源。

3、TM開啟全局事務開始,將XID全局事務id放在事務上下文中,通過feign調用也將XID傳入下游分支事務,每個

分支事務將自己的Branch ID分支事務ID與XID關聯。

4、第二階段全局事務提交,TC會通知各各分支參與者提交分支事務,在第一階段就已經提交了分支事務,這里各

各參與者只需要刪除undo_log即可,並且可以異步執行,第二階段很快可以完成。

5、第二階段全局事務回滾,TC會通知各各分支參與者回滾分支事務,通過 XID 和 Branch ID 找到相應的回滾日

志,通過回滾日志生成反向的 SQL 並執行,以完成分支事務回滾到之前的狀態,如果回滾失敗則會重試回滾操

作。

bank1實現如下功能:

1、張三賬戶減少金額,開啟全局事務。

2、遠程調用bank2向李四轉賬。

配置類:

@Configuration
public class DatabaseConfiguration {

    private final ApplicationContext applicationContext;

    public DatabaseConfiguration(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.ds0")
    public DruidDataSource ds0() {
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean
    public DataSource dataSource(DruidDataSource ds0)  {
        DataSourceProxy pds0 = new DataSourceProxy(ds0);
        return pds0;
    }

}

web層

@RestController
public class Bank1Controller {

    @Autowired
    AccountInfoService accountInfoService;

    //張三轉賬
    @GetMapping("/transfer")
    public String transfer(Double amount){
        accountInfoService.updateAccountBalance("1",amount);
        return "bank1"+amount;
    }
}

dao

@Mapper
@Component
public interface AccountInfoDao {

    //更新賬戶金額
    @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}

Service層

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    Bank2Client bank2Client;

    @Transactional
    @GlobalTransactional//開啟全局事務
    @Override
    public void updateAccountBalance(String accountNo, Double amount) {
        log.info("bank1 service begin,XID:{}", RootContext.getXID());
        //扣減張三的金額
        accountInfoDao.updateAccountBalance(accountNo,amount *-1);
        //調用李四微服務,轉賬
        String transfer = bank2Client.transfer(amount);
        if("fallback".equals(transfer)){
            //調用李四微服務異常
            throw new RuntimeException("調用李四微服務異常");
        }
        if(amount == 2){
            //人為制造異常
            throw new RuntimeException("bank1 make exception..");
        }
    }
}

將@GlobalTransactional注解標注在全局事務發起的Service實現方法上,開啟全局事務:

GlobalTransactionalInterceptor會攔截@GlobalTransactional注解的方法,生成全局事務ID(XID),XID會在整個

分布式事務中傳遞。

在遠程調用時,spring-cloud-alibaba-seata會攔截Feign調用將XID傳遞到下游服務。

FeignClient

@FeignClient(value="seata-demo-bank2",fallback=Bank2ClientFallback.class)
public interface Bank2Client {
    //遠程調用李四的微服務
    @GetMapping("/bank2/transfer")
    public  String transfer(@RequestParam("amount") Double amount);
}
@Component
public class Bank2ClientFallback implements Bank2Client {

    @Override
    public String transfer(Double amount) {

        return "fallback";
    }
}

bank2實現如下功能:

1、李四賬戶增加金額。

bank2在本賬號事務中作為分支事務不使用@GlobalTransactional。

Web層

@RestController
public class Bank2Controller {

    @Autowired
    AccountInfoService accountInfoService;

    //接收張三的轉賬
    @GetMapping("/transfer")
    public String transfer(Double amount){
        //李四增加金額
        accountInfoService.updateAccountBalance("2",amount);
        return "bank2"+amount;
    }
}

Dao層

@Mapper
@Component
public interface AccountInfoDao {
    //更新賬戶
    @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}

Service

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;


    @Transactional
    @Override
    public void updateAccountBalance(String accountNo, Double amount) {
        log.info("bank2 service begin,XID:{}",RootContext.getXID());
        //李四增加金額
        accountInfoDao.updateAccountBalance(accountNo,amount);
        if(amount==3){
            //人為制造異常
            throw new RuntimeException("bank2 make exception..");
        }
    }
}

測試場景

張三向李四轉賬成功。

 

李四事務失敗,張三事務回滾成功。

 

張三事務失敗,李四事務回滾成功。

 

到最后發現金額表都沒有發送變化

 當你把ribbon和feign連接時間調大點,斷點一下可以發現undo_log表會有數據,結果以后會刪掉

 當把表刪了,就會報錯,之前剛測試的時候,發現一直沒有數據,以為那里有問題,雖然每次查都是空表,只是清空了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM