一直說寫有關最新技術的文章,但前面似乎都有點偏了,只能說算主流技術,今天這個主題,我覺得應該名副其實。分布式微服務的深水區並不是單個微服務的設計,而是服務間的數據一致性問題!解決了這個問題,才算是把分布式正式收編了!但分布式事務解決方案並沒有統一的標准,只能說根據業務特點來適配,有實時的,非實時的,同步或異步的,之前已經實現了異步MQ的分布式事務方案,今天來看看Seata方案,自19年初才推出,還幾易其名,目前還不算特別完善,但其光環太耀眼,作為一名IT人,還是有必要來瞧一瞧的。單說Seata,就有AT、TCC、Saga和XA模式,看來是盤大菜。
作者原創文章,謝絕一切轉載!
本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。
**工具:**
Idea201902/JDK11/Gradle5.6.2/Mysql8.0.11/Lombok0.26/Postman7.5.0/SpringBoot2.1.9/Nacos1.1.3/Seata0.8.1/SeataServer0.8.1/Dubbo2.7.3
**難度:**
新手--戰士--老兵--大師
**目標:**
1.多模塊微服務Dubbo框架整合Seata實現分布式事務的AT模式
2.使用Seata實現訂單模塊與其他模塊的關聯型事務的TCC模式
***
**步驟:**
**為了更好的遇到各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day17,https://github.com/xiexiaobiao/dubbo-project.git**
文中圖片有些顯示不全,是圖片很大,我擔心縮放會看不清,所以部分顯示不全的,可以下載圖片再看。
1.先照搬來點背景材料,分布式事務典型場景如下圖,一個business主事務發起多個分支事務,並需要保證一致的commit或rollback:
Seata框架,有三個模塊,分別是
- - TM-TransactionManager事務管理器:定義全局事務的范圍,開啟、提交或回滾全局事務;
- - RM -ResourceManager資源管理器:管理分支事務的資源,注冊分支事務到TC,與TC通信反饋分支事務狀態,驅動分支事務提交或回滾;
- - TC-TransactionCoordinator事務協調器:維護全局和分支事務,驅動全局提交或回滾;
分布式事務流程:
I. TM 請求TC 發起一個全局事務,同時TC生成一個 XID作為全局事務ID.
II. XID將分發給事務調用鏈上的所有微服務.
III. RM響應全局事務XID向TC注冊本地分支事務.
IV. TM向TC發出提交或回滾全局事務XID的請求.
V. TC響應全局事務XID,驅動所有分支事務提交或 回滾本地分支事務.
其中 TM 和 RM 是作為 Seata 的客戶端與業務系統集成在一起,TC 作為 Seata 的服務端獨立部署。
再說seata的AT模式:AT 模式是一種無侵入的分布式事務解決方案。在 AT 模式下,用戶只需關注自己的“業務 SQL”,用戶的 “業務 SQL” 作為一階段,Seata 框架會自動生成事務的二階段提交和回滾操作。
- - 在一階段,Seata 會攔截“業務 SQL”,首先解析 SQL 語義,找到“業務 SQL”要更新的業務數據,在業務數據被更新前,將其保存成“before image”,然后執行“業務 SQL”更新業務數據,在業務數據更新之后,再將其保存成“after image”,最后生成行鎖。以上操作全部在一個數據庫事務內完成,這樣保證了一階段操作的原子性。
- - 二階段如果是提交的話,因為“業務 SQL”在一階段已經提交至數據庫, 所以 Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理即可。
- - 二階段如果是回滾的話,Seata 就需要回滾一階段已經執行的“業務 SQL”,還原業務數據。回滾方式便是用“before image”還原業務數據;但在還原前要首先要校驗臟寫,對比“數據庫當前業務數據”和 “after image”,如果兩份數據完全一致就說明沒有臟寫,可以還原業務數據,如果不一致就說明有臟寫,出現臟寫就需要轉人工處理。
2.為了單一化技術點,我直接新建一個gradle項目,以官方例子為原型做抽取制作,模擬電商業務,整體架構為多模塊微服務Dubbo框架,建立5個module,common為公共模塊,account為用戶賬戶處理,order為訂單處理,storage為庫存處理,business為業務處理,整體的處理邏輯為第一圖。
3.在build.gradle中引入依賴,強烈建議邊寫代碼邊逐步引入,比如使用到druid才加入druid的依賴,這樣才能知道每個依賴的作用和用法。
4.建表,項目文件中已有SQL.script,幾個業務模塊的對應的表,比較簡單,略。重點關注下undo_log,此表為MQ存儲事務執行前后的日志表,為**AT模式所必須**,用於事務提交和回滾,其中最關鍵字段即xid(全局事務ID)和branch_id(分支事務ID)。另外,我將各模塊DB獨立,是為了模擬分布式DB環境。
5.使用common模塊的mbg快速生成各模塊的Entity、Service、Impl、Mapper、Dao和Controller,可參考往期文章《》。注意每次生成時,需修改配置。
6.common模塊:放公共的對象,如全局Enum,Exception,Dto等,還有Dubbo的接口。
7.storage模塊:`com.biao.mall.storage.conf.SeataAutoConfig`進行Seata配置:
- - 先通過SpringBoot自動取得DataSourceProperties,並獲取JDBC的連接信息;
- - 注入Druid連接池對象,並對DruidDataSource做屬性設置,如線程池參數,超時參數等;
- - 注入RM的DataSourceProxy代理,來代理DruidDataSource;
- - 初始化Mybatis的SqlSessionFactory,這里使用的是DataSourceProxy實參,並將Mapper文件加入,映射Entity和Table;
- - 分支通過GlobalTransactionScanner來掃描XID,並注冊本地事務;
@Configuration public class SeataAutoConfig { private DataSourceProperties dataSourceProperties; @Autowired public SeataAutoConfig(DataSourceProperties dataSourceProperties){ this.dataSourceProperties = dataSourceProperties; } /** * init durid datasource * @Return: druidDataSource datasource instance */ @Bean @Primary public DruidDataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(dataSourceProperties.getUrl()); druidDataSource.setUsername(dataSourceProperties.getUsername()); druidDataSource.setPassword(dataSourceProperties.getPassword()); druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName()); druidDataSource.setInitialSize(0); druidDataSource.setMaxActive(180); druidDataSource.setMaxWait(60000); druidDataSource.setMinIdle(0); druidDataSource.setValidationQuery("Select 1 from DUAL"); druidDataSource.setTestOnBorrow(false); druidDataSource.setTestOnReturn(false); druidDataSource.setTestWhileIdle(true); druidDataSource.setTimeBetweenEvictionRunsMillis(60000); druidDataSource.setMinEvictableIdleTimeMillis(25200000); druidDataSource.setRemoveAbandoned(true); druidDataSource.setRemoveAbandonedTimeout(1800); druidDataSource.setLogAbandoned(true); return druidDataSource; } /** * init datasource proxy * @Param: druidDataSource datasource bean instance * @Return: DataSourceProxy datasource proxy */ @Bean public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){ return new DataSourceProxy(druidDataSource); } /** * init mybatis sqlSessionFactory * @Param: dataSourceProxy datasource proxy * @Return: DataSourceProxy datasource proxy */ @Bean public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSourceProxy); factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver() .getResources("classpath:/mapper/*Mapper.xml")); factoryBean.setTransactionFactory(new JdbcTransactionFactory()); return factoryBean.getObject(); } /** * init global transaction scanner * @Return: GlobalTransactionScanner */ @Bean public GlobalTransactionScanner globalTransactionScanner(){ return new GlobalTransactionScanner("${spring.application.name}", "my_test_tx_group"); } }
com.biao.mall.storage.dubbo.StorageDubboServiceImpl
:Dubbo微服務storage的具體實現,@Service注解為com.apache.dubbo.config.annotation.Service
,將該服務注冊到注冊中心,本項目注冊中心使用Nacos,不是ZK。
@Service(version = "1.0.0",protocol = "${dubbo.protocol.id}", application = "${dubbo.application.id}",registry = "${dubbo.registry.id}") public class StorageDubboServiceImpl implements StorageDubboService { @Autowired private ProductService productService; @Override public ObjectResponse decreaseStorage(CommodityDTO commodityDTO) { System.out.println("全局事務id :" + RootContext.getXID()); return productService.decreaseStorage(commodityDTO); } }
另外注意, `com.biao.mall.storage.impl.ProductServiceImpl`中,這里的本地方法,並不需要@Transactional注解。
8.account模塊和order模塊和storage模塊類似,只是order模塊中`com.biao.mall.order.impl.OrdersServiceImpl`多了一個通過@Reference調用account服務的注解,其他,略。
9.business模塊:SeataAutoConfig中因無本地事務,只需一個GlobalTransactionScanner,BusinessServiceImpl中:
- - 類注解@Service是Spring的注解;
- - 通過@Reference從注冊中心獲取storage和order服務;
- - handleBusiness方法上通過@GlobalTransactional發起全局事務,方法內就是具體使用storage和order服務;
@Service public class BusinessServiceImpl implements BusinessService { @Reference(version = "1.0.0") private StorageDubboService storageDubboService; @Reference(version = "1.0.0") private OrderDubboService orderDubboService; private boolean flag; @Override @GlobalTransactional(timeoutMills = 30000,name = "dubbo-seata-at-springboot") public ObjectResponse handleBusiness(BusinessDTO businessDTO) { System.out.println("開始全局事務,XID = " + RootContext.getXID()); ObjectResponse<Object> objectResponse = new ObjectResponse<>(); //1,減庫存 CommodityDTO commodityDTO = new CommodityDTO(); commodityDTO.setCommodityCode(businessDTO.getCommodityCode()); commodityDTO.setCount(businessDTO.getCount()); ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO); //2,創建訂單 OrderDTO orderDTO = new OrderDTO(); orderDTO.setUserId(businessDTO.getUserId()); orderDTO.setCommodityCode(businessDTO.getCommodityCode()); orderDTO.setOrderCount(businessDTO.getCount()); orderDTO.setOrderAmount(businessDTO.getAmount()); ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO); //打開注釋測試事務發生異常后,全局回滾功能 // if (!flag) { // throw new RuntimeException("測試拋異常后,分布式事務回滾!"); // } if (storageResponse.getStatus() != 200 || response.getStatus() != 200) { throw new DefaultException(RspStatusEnum.FAIL); } objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode()); objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage()); objectResponse.setData(response.getData()); return objectResponse; } }
10.寫個BusinessController的方法,用於測試:
@PostMapping("/buy") ObjectResponse handleBusiness(@RequestBody BusinessDTO businessDTO){ LOGGER.info("請求參數:{}",businessDTO.toString()); return businessService.handleBusiness(businessDTO); }
11.下載安裝TC ,即 Seata 的服務端,需要獨立部署運行,下載地址:https://github.com/seata/seata/releases,解壓,支持window和linux下直接啟動運行,如下linux命令,運行參數將指定port、host和imageFile的存儲方式:
sh seata-server.sh -p 8091 -h 127.0.0.1 -m file
12.測試,按順序啟動:Nacos-->SeataServer-->account-->order-->storage-->business ,啟動后的效果。
Nacos注冊的服務信息,注意Dubbo是區分provider和consumer的,這是不同於SpringCloud的地方,所以同一服務不同身份就有兩個了:
可以看到各RM向TC注冊的信息:
Postman提交至Controller:
提交運行后,一階段更新DB,二階段只需釋放鎖:
數據庫情況:
13.回滾測試:將`com.biao.mall.business.service.BusinessServiceImpl`中回滾測試代碼注釋去掉!手動拋出異常,再次Postman提交,可見:
- - business先開啟了全局事務,並傳播了XID,二階段向TC提交rollback狀態;
- - order中,可以看到分支事務是一階段提交了,異常后,二階段根據XID做了rollback;
- - order有SQL信息,是application.yml中設置了logging.level為debug;
- 數據庫信息不變,貼圖,略;
14.測試undo_log表用途:
`com.biao.mall.business.service.BusinessServiceImpl`加個斷點:
其他模塊正常啟動,postman提交:
看undo_log表,這里只是個臨時的數據,二階段后會刪除:
***
復盤記:
1.Seata只能支持RPC模式的事務,對MQ模式的分布式事務不能實施,比較好的搭配是Dubbo+Seata。
2.啟動應用向SeataServer注冊,不一定能一次成功,有時要嘗試多次,可見穩定性一般!
3.依賴沖突問題:報錯提示:`Class path contains multiple SLF4J bindings`,因其來自於以下兩個jar,
`logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class`
,`slf4j-nop-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class`
由於logback是主流,不排除,直接去掉`slf4j-nop`依賴,問題解決!
4.報錯:`NoSuchMethodError:org.yaml.snakeyaml.nodes.ScalarNode.getScalarStyle`,**特別注意**這種情況很多時候也是依賴沖突,而不是缺少類,處理方法:
a.先百度,需要加入snakeyaml依賴,結果還是報錯,
b.再先全局搜索,雙擊shift鍵,查找`ScalarNode`類,發現出現在兩個地方,估計沖突了,
c.在Idea中使用依賴分析命令,`order`為module名,`snakeyaml`為依賴名:
`gradle :order:dependencyInsight --dependency snakeyaml`
發現有多方引入的情況,結果是dubbo本身也使用了snakeyaml,直接在dubbo依賴中使用exclude語法排除,問題解決!
5.報錯:`NoSuchBeanDefinitionException: No qualifying bean of type 'com.biao.mall.order.dao.OrdersDao' available`:
表面上看是Mapper類無Bean實例,確定加了@Mapper和@Repository注解,還是錯誤!想到既然是缺少注入的Bean,可能是缺少mybatis-plus依賴導致,添加`mybatis-plus-boot-starter`,問題解決!
6.報錯:`io.seata.common.exception.NotSupportYetException: not support register type: null`,需添加 registry.conf 和 file.conf。
7.seata server安裝和啟動方法:
https://github.com/seata/seata/wiki/Quick-Start
8.報錯:`com.alibaba.nacos.api.exception.NacosException: java.lang.ClassNotFoundException`,添加Nacos相關依賴 dubbo-registry-nacos/spring-context-support/nacos-api/nacos-client。
9.dubbo的service是明顯區分consumer和provider的,如果使用Nacos做注冊中心,可以通過detail查看其服務角色,還有其提供的方法。
10.`com.biao.mall.storage.conf.SeataAutoConfig`中設置Mapper路徑,需使用`getResources("classpath:/mapper/*Mapper.xml"))`;不可使用`getResources("${mybatis.mapper-locations}"))`配置方式,
會告警:`Property 'mapperLocations' was specified but matching resources are not found`,最后導致Mapper文件無法加載,Dao方法讀取失敗,應用運行會異常,我估計是Bean加載順序問題,但沒有驗證,sorry。
11.本文參考文章地址:https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
***
微信公眾號,只寫原創文章!
推薦閱讀:
- 1 Docker部署RocketMQ
- 2 流式計算(五)-Flink 計算模型
- 3 流式計算(四)-Flink Stream API 篇二
- 4 流式計算(三)-Flink Stream 篇一
- 5 流式計算(二)-Kafka Stream