什么是2PC
2PC即兩階段提交協議,是將整個事務流程分為兩個階段,准備階段(Prepare phase)、提交階段(commit
phase),2是指兩個階段,P是指准備階段,C是指提交階段。
舉例:張三和李四好久不見,老友約起聚餐,飯店老板要求先買單,才能出票。這時張三和李四分別抱怨近況不如
意,囊中羞澀,都不願意請客,這時只能AA。只有張三和李四都付款,老板才能出票安排就餐。但由於張三和李四
都是鐵公雞,形成了尷尬的一幕:
准備階段:老板要求張三付款,張三付款。老板要求李四付款,李四付款。
提交階段:老板出票,兩人拿票紛紛落座就餐。
例子中形成了一個事務,若張三或李四其中一人拒絕付款,或錢不夠,店老板都不會給出票,並且會把已收款退回。
整個事務過程由事務管理器和參與者組成,店老板就是事務管理器,張三、李四就是事務參與者,事務管理器負責
決策整個分布式事務的提交和回滾,事務參與者負責自己本地事務的提交和回滾。
在計算機中部分關系數據庫如Oracle、MySQL支持兩階段提交協議,如下圖:
1. 准備階段(Prepare phase):事務管理器給每個參與者發送Prepare消息,每個數據庫參與者在本地執行事
務,並寫本地的Undo/Redo日志,此時事務沒有提交。
(Undo日志是記錄修改前的數據,用於數據庫回滾,Redo日志是記錄修改后的數據,用於提交事務后寫入數
據文件)
2. 提交階段(commit phase):如果事務管理器收到了參與者的執行失敗或者超時消息時,直接給每個參與者
發送回滾(Rollback)消息;否則,發送提交(Commit)消息;參與者根據事務管理器的指令執行提交或者回滾操
作,並釋放事務處理過程中使用的鎖資源。注意:必須在最后階段釋放鎖資源。
下圖展示了2PC的兩個階段,分成功和失敗兩個情況說明:
成功情況:
失敗情況:
解決方案
XA方案
2PC的傳統方案是在數據庫層面實現的,如Oracle、MySQL都支持2PC協議,為了統一標准減少行業內不必要的對
接成本,需要制定標准化的處理模型及接口標准,國際開放標准組織Open Group定義了分布式事務處理模型
DTP(Distributed Transaction Processing Reference Model)。
為了讓大家更明確XA方案的內容程,下面新用戶注冊送積分為例來說明:
執行流程如下:
1、應用程序(AP)持有用戶庫和積分庫兩個數據源。
2、應用程序(AP)通過TM通知用戶庫RM新增用戶,同時通知積分庫RM為該用戶新增積分,RM此時並未提交事務,此時用戶和積分資源鎖定。
3、TM收到執行回復,只要有一方失敗則分別向其他RM發起回滾事務,回滾完畢,資源鎖釋放。
4、TM收到執行回復,全部成功,此時向所有RM發起提交事務,提交完畢,資源鎖釋放。
DTP模型定義如下角色:
AP(Application Program):即應用程序,可以理解為使用DTP分布式事務的程序。
RM(Resource Manager):即資源管理器,可以理解為事務的參與者,一般情況下是指一個數據庫實例,通過
資源管理器對該數據庫進行控制,資源管理器控制着分支事務。
TM(Transaction Manager):事務管理器,負責協調和管理事務,事務管理器控制着全局事務,管理事務生命
周期,並協調各個RM。全局事務是指分布式事務處理環境中,需要操作多個數據庫共同完成一個工作,這個
工作即是一個全局事務。
DTP模型定義TM和RM之間通訊的接口規范叫XA,簡單理解為數據庫提供的2PC接口協議,基於數據庫的XA
協議來實現2PC又稱為XA方案。
以上三個角色之間的交互方式如下:
1)TM向AP提供 應用程序編程接口,AP通過TM提交及回滾事務。
2)TM交易中間件通過XA接口來通知RM數據庫事務的開始、結束以及提交、回滾等。
總結:
整個2PC的事務流程涉及到三個角色AP、RM、TM。AP指的是使用2PC分布式事務的應用程序;RM指的是資
源管理器,它控制着分支事務;TM指的是事務管理器,它控制着整個全局事務。
1)在准備階段RM執行實際的業務操作,但不提交事務,資源鎖定;
2)在提交階段TM會接受RM在准備階段的執行回復,只要有任一個RM執行失敗,TM會通知所有RM執行回滾操
作,否則,TM將會通知所有RM提交該事務。提交階段結束資源鎖釋放。
XA方案的問題:
1、需要本地數據庫支持XA協議。
2、資源鎖需要等到兩個階段結束才釋放,性能較差。
Seata方案
Seata是由阿里中間件團隊發起的開源項目 Fescar,后更名為Seata,它是一個是開源的分布式事務框架。
傳統2PC的問題在Seata中得到了解決,它通過對本地關系數據庫的分支事務的協調來驅動完成全局事務,是工作
在應用層的中間件。主要優點是性能較好,且不長時間占用連接資源,它以高效並且對業務0侵入的方式解決微服
務場景下面臨的分布式事務問題,它目前提供AT模式(即2PC)及TCC模式的分布式事務解決方案。
Seata的設計思想如下:
Seata的設計目標其一是對業務無侵入,因此從業務無侵入的2PC方案着手,在傳統2PC的基礎上演進,並解決
2PC方案面臨的問題。
Seata把一個分布式事務理解成一個包含了若干分支事務的全局事務。全局事務的職責是協調其下管轄的分支事務
達成一致,要么一起成功提交,要么一起失敗回滾。此外,通常分支事務本身就是一個關系數據庫的本地事務,下
圖是全局事務與分支事務的關系圖:
與 傳統2PC 的模型類似,Seata定義了3個組件來協議分布式事務的處理過程:
Transaction Coordinator (TC): 事務協調器,它是獨立的中間件,需要獨立部署運行,它維護全局事務的運
行狀態,接收TM指令發起全局事務的提交與回滾,負責與RM通信協調各各分支事務的提交或回滾。
Transaction Manager (TM): 事務管理器,TM需要嵌入應用程序中工作,它負責開啟一個全局事務,並最終
向TC發起全局提交或全局回滾的指令。
Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器TC的指令,驅動分
支(本地)事務的提交和回滾。
還拿新用戶注冊送積分舉例Seata的分布式事務過程:
具體的執行流程如下:
1. 用戶服務的 TM 向 TC 申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的XID。
2. 用戶服務的 RM 向 TC 注冊 分支事務,該分支事務在用戶服務執行新增用戶邏輯,並將其納入 XID 對應全局
事務的管轄。
3. 用戶服務執行分支事務,向用戶表插入一條記錄。
4. 邏輯執行到遠程調用積分服務時(XID 在微服務調用鏈路的上下文中傳播)。積分服務的RM 向 TC 注冊分支事
務,該分支事務執行增加積分的邏輯,並將其納入 XID 對應全局事務的管轄。
5. 積分服務執行分支事務,向積分記錄表插入一條記錄,執行完畢后,返回用戶服務。
6. 用戶服務分支事務執行完畢。
7. TM 向 TC 發起針對 XID 的全局提交或回滾決議。
8. TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。
Seata實現2PC與傳統2PC的差別:
架構層次方面,傳統2PC方案的 RM 實際上是在數據庫層,RM 本質上就是數據庫自身,通過 XA 協議實現,而
Seata的 RM 是以jar包的形式作為中間件層部署在應用程序這一側的。
兩階段提交方面,傳統2PC無論第二階段的決議是commit還是rollback,事務性資源的鎖都要保持到Phase2完成
才釋放。而Seata的做法是在Phase1 就將本地事務提交,這樣就可以省去Phase2持鎖的時間,整體提高效率。
seata實現2PC事務
業務說明
本示例通過Seata中間件實現分布式事務,模擬三個賬戶的轉賬交易過程。
兩個賬戶在三個不同的銀行(張三在bank1、李四在bank2),bank1和bank2是兩個個微服務。交易過程是,張三
給李四轉賬指定金額。
交互流程如下:
1、請求bank1進行轉賬,傳入轉賬金額。
2、bank1減少轉賬金額,調用bank2,傳入轉賬金額。
3.創建數據庫
/* SQLyog v10.2 MySQL - 5.7.21-log : Database - bank1 ********************************************************************* */ /*!40101 SET NAMES utf8 */; /*!40101 SET SQL_MODE=''*/; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank1`; /*Table structure for table `account_info` */ DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'張三','1',NULL,1000); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `de_duplication` */ /*Table structure for table `local_cancel_log` */ DROP TABLE IF EXISTS `local_cancel_log`; CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_cancel_log` */ /*Table structure for table `local_confirm_log` */ DROP TABLE IF EXISTS `local_confirm_log`; CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_confirm_log` */ /*Table structure for table `local_trade_log` */ DROP TABLE IF EXISTS `local_trade_log`; CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `local_trade_log` */ DROP TABLE IF EXISTS `local_try_log`; CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_try_log` */ /*Table structure for table `undo_log` */ DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=167 DEFAULT CHARSET=utf8; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
bank2庫,包含李四賬戶
/* SQLyog v10.2 MySQL - 5.7.21-log : Database - bank2 ********************************************************************* */ /*!40101 SET NAMES utf8 */; /*!40101 SET SQL_MODE=''*/; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank2` /*!40100 DEFAULT CHARACTER SET utf8 */; USE `bank2`; /*Table structure for table `account_info` */ DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '戶主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '銀行卡號', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帳戶密碼', `account_balance` double DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `account_info` */ insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的賬戶','2',NULL,0); /*Table structure for table `de_duplication` */ DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Data for the table `de_duplication` */ /*Table structure for table `local_cancel_log` */ DROP TABLE IF EXISTS `local_cancel_log`; CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_cancel_log` */ /*Table structure for table `local_confirm_log` */ DROP TABLE IF EXISTS `local_confirm_log`; CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_confirm_log` */ /*Table structure for table `local_trade_log` */ DROP TABLE IF EXISTS `local_trade_log`; CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC; /*Table structure for table `local_try_log` */ DROP TABLE IF EXISTS `local_try_log`; CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事務id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `local_try_log` */ /*Table structure for table `undo_log` */ DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; /*Data for the table `undo_log` */ /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
4.啟動TC(事務協調器)
(1)下載seata服務器
下載地址:https://github.com/seata/seata/releases/download/v0.7.1/seata-server-0.7.1.zip
也可以直接解壓:資料\seata-server-0.7.1.zip
(2)解壓並啟動
[seata服務端解壓路徑]/bin/seata-server.bat -p 8888 -m file
注:其中8888為服務端口號;file為啟動模式,這里指seata服務將采用文件的方式存儲信息。
由於阿里雲沒有安裝jdk環境,使用就在本地開啟
測試代碼結構
注冊中心配置
spring: application: name: seata-demo-discovery server: port: 9900 #啟動端口 eureka: server: enable-self-preservation: false #關閉服務器自我保護,客戶端心跳檢測15分鍾內錯誤達到80%服務會保護,導致別人還認為是好用的服務 eviction-interval-timer-in-ms: 10000 #清理間隔(單位毫秒,默認是60*1000)5秒將客戶端剔除的服務在服務注冊列表中剔除# shouldUseReadOnlyResponseCache: true #eureka是CAP理論種基於AP策略,為了保證強一致性關閉此切換CP 默認不關閉 false關閉 response-cache-update-interval-ms: 3000 #eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上 #eureka server刷新readCacheMap的時間,注意,client讀取的是readCacheMap,這個時間決定了多久會把readWriteCacheMap的緩存更新到readCacheMap上默認30s response-cache-auto-expiration-in-seconds: 180 #eureka server緩存readWriteCacheMap失效時間,這個只有在這個時間過去后緩存才會失效,失效前不會更新,過期后從registry重新讀取注冊服務信息,registry是一個ConcurrentHashMap。 client: register-with-eureka: false #false:不作為一個客戶端注冊到注冊中心 fetch-registry: false #為true時,可以啟動,但報異常:Cannot execute request on any known server instance-info-replication-interval-seconds: 10 serviceUrl: defaultZone: http://localhost:${server.port}/eureka/ instance: hostname: ${spring.cloud.client.ip-address} prefer-ip-address: true instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}} lease-renewal-interval-in-seconds: 5 # 續約更新時間間隔(默認30秒) lease-expiration-duration-in-seconds: 10 # 續約到期時間(默認90秒) logging: config: classpath:log4j2-dev.xml
<?xml version="1.0" encoding="UTF-8"?> <Configuration monitorInterval="180" packages=""> <properties> <property name="prjname">${project.name}</property> <property name="logdir">logs</property> <property name="PATTERN"> %date{YYYY-MM-dd HH:mm:ss,SSS} %level [%thread][%file:%line] - %msg%n%throwable</property> </properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="${PATTERN}"/> </Console> <RollingFile name="ErrorAppender" fileName="${logdir}/${prjname}_error.log" filePattern="${logdir}/$${date:yyyy-MM-dd}/${prjname}_error.%d{yyyy-MM-dd-HH}.log" append="true"> <PatternLayout pattern="${PATTERN}"/> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> </RollingFile> <RollingFile name="DebugAppender" fileName="${logdir}/${prjname}_info.log" filePattern="${logdir}/$${date:yyyy-MM-dd}/${prjname}_info.%d{yyyy-MM-dd-HH}.log" append="true"> <PatternLayout pattern="${PATTERN}"/> <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/> <Policies> <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> </RollingFile> <!--異步appender--> <Async name="AsyncAppender" includeLocation="true"> <AppenderRef ref="ErrorAppender"/> <AppenderRef ref="DebugAppender"/> </Async> </Appenders> <Loggers> <logger name="org.springframework" level="INFO"> </logger> <logger name="org.mybatis" level="INFO"> </logger> <logger name="springfox" level="INFO"> </logger> <logger name="org.apache.http" level="INFO"> </logger> <logger name="com.netflix.discovery" level="INFO"> </logger> <Root level="INFO" includeLocation="true"> <AppenderRef ref="AsyncAppender"/> <AppenderRef ref="Console"/> </Root> </Loggers> </Configuration>
配置
把壓縮包里面的2個配置文件拿到resource下面,修改配置
在file.conf中更改service.vgroup_mapping.[springcloud服務名]-fescar-service-group = "default",並修改
service.default.grouplist =[seata服務端地址]
關於vgroup_mapping的配置:
vgroup_mapping.事務分組服務名=Seata Server集群名稱(默認名稱為default)
default.grouplist = Seata Server集群地址
Seata執行流程
1、正常提交流程
2、回滾流程
回滾流程省略前的RM注冊過程。
要點說明:
1、每個RM使用DataSourceProxy連接數據庫,其目的是使用ConnectionProxy,使用數據源和數據連接代理的目
的就是在第一階段將undo_log和業務數據放在一個本地事務提交,這樣就保存了只要有業務操作就一定有
undo_log。
2、在第一階段undo_log中存放了數據修改前和修改后的值,為事務回滾作好准備,所以第一階段完成就已經將分
支事務提交,也就釋放了鎖資源。
3、TM開啟全局事務開始,將XID全局事務id放在事務上下文中,通過feign調用也將XID傳入下游分支事務,每個
分支事務將自己的Branch ID分支事務ID與XID關聯。
4、第二階段全局事務提交,TC會通知各各分支參與者提交分支事務,在第一階段就已經提交了分支事務,這里各
各參與者只需要刪除undo_log即可,並且可以異步執行,第二階段很快可以完成。
5、第二階段全局事務回滾,TC會通知各各分支參與者回滾分支事務,通過 XID 和 Branch ID 找到相應的回滾日
志,通過回滾日志生成反向的 SQL 並執行,以完成分支事務回滾到之前的狀態,如果回滾失敗則會重試回滾操
作。
bank1實現如下功能:
1、張三賬戶減少金額,開啟全局事務。
2、遠程調用bank2向李四轉賬。
配置類:
@Configuration public class DatabaseConfiguration { private final ApplicationContext applicationContext; public DatabaseConfiguration(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Bean @ConfigurationProperties(prefix = "spring.datasource.ds0") public DruidDataSource ds0() { DruidDataSource druidDataSource = new DruidDataSource(); return druidDataSource; } @Primary @Bean public DataSource dataSource(DruidDataSource ds0) { DataSourceProxy pds0 = new DataSourceProxy(ds0); return pds0; } }
web層
@RestController public class Bank1Controller { @Autowired AccountInfoService accountInfoService; //張三轉賬 @GetMapping("/transfer") public String transfer(Double amount){ accountInfoService.updateAccountBalance("1",amount); return "bank1"+amount; } }
dao
@Mapper @Component public interface AccountInfoDao { //更新賬戶金額 @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); }
Service層
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Autowired Bank2Client bank2Client; @Transactional @GlobalTransactional//開啟全局事務 @Override public void updateAccountBalance(String accountNo, Double amount) { log.info("bank1 service begin,XID:{}", RootContext.getXID()); //扣減張三的金額 accountInfoDao.updateAccountBalance(accountNo,amount *-1); //調用李四微服務,轉賬 String transfer = bank2Client.transfer(amount); if("fallback".equals(transfer)){ //調用李四微服務異常 throw new RuntimeException("調用李四微服務異常"); } if(amount == 2){ //人為制造異常 throw new RuntimeException("bank1 make exception.."); } } }
將@GlobalTransactional注解標注在全局事務發起的Service實現方法上,開啟全局事務:
GlobalTransactionalInterceptor會攔截@GlobalTransactional注解的方法,生成全局事務ID(XID),XID會在整個
分布式事務中傳遞。
在遠程調用時,spring-cloud-alibaba-seata會攔截Feign調用將XID傳遞到下游服務。
FeignClient
@FeignClient(value="seata-demo-bank2",fallback=Bank2ClientFallback.class) public interface Bank2Client { //遠程調用李四的微服務 @GetMapping("/bank2/transfer") public String transfer(@RequestParam("amount") Double amount); }
@Component public class Bank2ClientFallback implements Bank2Client { @Override public String transfer(Double amount) { return "fallback"; } }
bank2實現如下功能:
1、李四賬戶增加金額。
bank2在本賬號事務中作為分支事務不使用@GlobalTransactional。
Web層
@RestController public class Bank2Controller { @Autowired AccountInfoService accountInfoService; //接收張三的轉賬 @GetMapping("/transfer") public String transfer(Double amount){ //李四增加金額 accountInfoService.updateAccountBalance("2",amount); return "bank2"+amount; } }
Dao層
@Mapper @Component public interface AccountInfoDao { //更新賬戶 @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); }
Service
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Transactional @Override public void updateAccountBalance(String accountNo, Double amount) { log.info("bank2 service begin,XID:{}",RootContext.getXID()); //李四增加金額 accountInfoDao.updateAccountBalance(accountNo,amount); if(amount==3){ //人為制造異常 throw new RuntimeException("bank2 make exception.."); } } }
測試場景
張三向李四轉賬成功。
李四事務失敗,張三事務回滾成功。
張三事務失敗,李四事務回滾成功。
到最后發現金額表都沒有發送變化
當你把ribbon和feign連接時間調大點,斷點一下可以發現undo_log表會有數據,結果以后會刪掉
當把表刪了,就會報錯,之前剛測試的時候,發現一直沒有數據,以為那里有問題,雖然每次查都是空表,只是清空了