Dubbo學習系列之十四(Seata分布式事務方案AT模式)


一直說寫有關最新技術的文章,但前面似乎都有點偏了,只能說算主流技術,今天這個主題,我覺得應該名副其實。分布式微服務的深水區並不是單個微服務的設計,而是服務間的數據一致性問題!解決了這個問題,才算是把分布式正式收編了!但分布式事務解決方案並沒有統一的標准,只能說根據業務特點來適配,有實時的,非實時的,同步或異步的,之前已經實現了異步MQ的分布式事務方案,今天來看看Seata方案,自19年初才推出,還幾易其名,目前還不算特別完善,但其光環太耀眼,作為一名IT人,還是有必要來瞧一瞧的。單說Seata,就有AT、TCC、Saga和XA模式,看來是盤大菜。

 

作者原創文章,謝絕一切轉載!

 本文只發表在"公眾號"和"博客園",其他均屬復制粘貼!如果覺得排版不清晰,請查看公眾號文章。 

 

**工具:**

Idea201902/JDK11/Gradle5.6.2/Mysql8.0.11/Lombok0.26/Postman7.5.0/SpringBoot2.1.9/Nacos1.1.3/Seata0.8.1/SeataServer0.8.1/Dubbo2.7.3

**難度:**
新手--戰士--老兵--大師

**目標:**

1.多模塊微服務Dubbo框架整合Seata實現分布式事務的AT模式

2.使用Seata實現訂單模塊與其他模塊的關聯型事務的TCC模式
***

**步驟:**

**為了更好的遇到各種問題,同時保持時效性,我盡量使用最新的軟件版本。代碼地址:其中的day17,https://github.com/xiexiaobiao/dubbo-project.git**

文中圖片有些顯示不全,是圖片很大,我擔心縮放會看不清,所以部分顯示不全的,可以下載圖片再看。

1.先照搬來點背景材料,分布式事務典型場景如下圖,一個business主事務發起多個分支事務,並需要保證一致的commit或rollback:

 

 

Seata框架,有三個模塊,分別是

  • - TM-TransactionManager事務管理器:定義全局事務的范圍,開啟、提交或回滾全局事務;
  • - RM -ResourceManager資源管理器:管理分支事務的資源,注冊分支事務到TC,與TC通信反饋分支事務狀態,驅動分支事務提交或回滾;
  • - TC-TransactionCoordinator事務協調器:維護全局和分支事務,驅動全局提交或回滾;

 分布式事務流程:

I. TM 請求TC 發起一個全局事務,同時TC生成一個 XID作為全局事務ID.

II. XID將分發給事務調用鏈上的所有微服務.

III. RM響應全局事務XID向TC注冊本地分支事務.

IV. TM向TC發出提交或回滾全局事務XID的請求.

V. TC響應全局事務XID,驅動所有分支事務提交或 回滾本地分支事務.

其中 TM 和 RM 是作為 Seata 的客戶端與業務系統集成在一起,TC 作為 Seata 的服務端獨立部署。

再說seata的AT模式:AT 模式是一種無侵入的分布式事務解決方案。在 AT 模式下,用戶只需關注自己的“業務 SQL”,用戶的 “業務 SQL” 作為一階段,Seata 框架會自動生成事務的二階段提交和回滾操作。

  • - 在一階段,Seata 會攔截“業務 SQL”,首先解析 SQL 語義,找到“業務 SQL”要更新的業務數據,在業務數據被更新前,將其保存成“before image”,然后執行“業務 SQL”更新業務數據,在業務數據更新之后,再將其保存成“after image”,最后生成行鎖。以上操作全部在一個數據庫事務內完成,這樣保證了一階段操作的原子性。

  

  • - 二階段如果是提交的話,因為“業務 SQL”在一階段已經提交至數據庫, 所以 Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理即可。

  

  • - 二階段如果是回滾的話,Seata 就需要回滾一階段已經執行的“業務 SQL”,還原業務數據。回滾方式便是用“before image”還原業務數據;但在還原前要首先要校驗臟寫,對比“數據庫當前業務數據”和 “after image”,如果兩份數據完全一致就說明沒有臟寫,可以還原業務數據,如果不一致就說明有臟寫,出現臟寫就需要轉人工處理。

 

 2.為了單一化技術點,我直接新建一個gradle項目,以官方例子為原型做抽取制作,模擬電商業務,整體架構為多模塊微服務Dubbo框架,建立5個module,common為公共模塊,account為用戶賬戶處理,order為訂單處理,storage為庫存處理,business為業務處理,整體的處理邏輯為第一圖。

 

3.在build.gradle中引入依賴,強烈建議邊寫代碼邊逐步引入,比如使用到druid才加入druid的依賴,這樣才能知道每個依賴的作用和用法。

 

4.建表,項目文件中已有SQL.script,幾個業務模塊的對應的表,比較簡單,略。重點關注下undo_log,此表為MQ存儲事務執行前后的日志表,為**AT模式所必須**,用於事務提交和回滾,其中最關鍵字段即xid(全局事務ID)和branch_id(分支事務ID)。另外,我將各模塊DB獨立,是為了模擬分布式DB環境。

 

5.使用common模塊的mbg快速生成各模塊的Entity、Service、Impl、Mapper、Dao和Controller,可參考往期文章《》。注意每次生成時,需修改配置。

 

6.common模塊:放公共的對象,如全局Enum,Exception,Dto等,還有Dubbo的接口。

 

7.storage模塊:`com.biao.mall.storage.conf.SeataAutoConfig`進行Seata配置:

  • - 先通過SpringBoot自動取得DataSourceProperties,並獲取JDBC的連接信息;
  • - 注入Druid連接池對象,並對DruidDataSource做屬性設置,如線程池參數,超時參數等;
  • - 注入RM的DataSourceProxy代理,來代理DruidDataSource;
  • - 初始化Mybatis的SqlSessionFactory,這里使用的是DataSourceProxy實參,並將Mapper文件加入,映射Entity和Table;
  • - 分支通過GlobalTransactionScanner來掃描XID,並注冊本地事務;
@Configuration
public class SeataAutoConfig {

    private DataSourceProperties dataSourceProperties;

    @Autowired
    public SeataAutoConfig(DataSourceProperties dataSourceProperties){
        this.dataSourceProperties = dataSourceProperties;
    }

    /**
     * init durid datasource
     * @Return: druidDataSource  datasource instance
     */
    @Bean
    @Primary
    public DruidDataSource druidDataSource(){
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setUrl(dataSourceProperties.getUrl());
        druidDataSource.setUsername(dataSourceProperties.getUsername());
        druidDataSource.setPassword(dataSourceProperties.getPassword());
        druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
        druidDataSource.setInitialSize(0);
        druidDataSource.setMaxActive(180);
        druidDataSource.setMaxWait(60000);
        druidDataSource.setMinIdle(0);
        druidDataSource.setValidationQuery("Select 1 from DUAL");
        druidDataSource.setTestOnBorrow(false);
        druidDataSource.setTestOnReturn(false);
        druidDataSource.setTestWhileIdle(true);
        druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
        druidDataSource.setMinEvictableIdleTimeMillis(25200000);
        druidDataSource.setRemoveAbandoned(true);
        druidDataSource.setRemoveAbandonedTimeout(1800);
        druidDataSource.setLogAbandoned(true);
        return druidDataSource;
    }

    /**
     * init datasource proxy
     * @Param: druidDataSource  datasource bean instance
     * @Return: DataSourceProxy  datasource proxy
     */
    @Bean
    public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }

    /**
     * init mybatis sqlSessionFactory
     * @Param: dataSourceProxy  datasource proxy
     * @Return: DataSourceProxy  datasource proxy
     */
    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSourceProxy);
        factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                .getResources("classpath:/mapper/*Mapper.xml"));
        factoryBean.setTransactionFactory(new JdbcTransactionFactory());
        return factoryBean.getObject();
    }

    /**
     * init global transaction scanner
     * @Return: GlobalTransactionScanner
     */
    @Bean
    public GlobalTransactionScanner globalTransactionScanner(){
        return new GlobalTransactionScanner("${spring.application.name}", "my_test_tx_group");
    }
}

 

com.biao.mall.storage.dubbo.StorageDubboServiceImpl:Dubbo微服務storage的具體實現,@Service注解為com.apache.dubbo.config.annotation.Service,將該服務注冊到注冊中心,本項目注冊中心使用Nacos,不是ZK。

@Service(version = "1.0.0",protocol = "${dubbo.protocol.id}",
        application = "${dubbo.application.id}",registry = "${dubbo.registry.id}")
public class StorageDubboServiceImpl implements StorageDubboService {

    @Autowired
    private ProductService  productService;

    @Override
    public ObjectResponse decreaseStorage(CommodityDTO commodityDTO) {
        System.out.println("全局事務id :" + RootContext.getXID());
        return productService.decreaseStorage(commodityDTO);
    }
}

另外注意, `com.biao.mall.storage.impl.ProductServiceImpl`中,這里的本地方法,並不需要@Transactional注解。

 

8.account模塊和order模塊和storage模塊類似,只是order模塊中`com.biao.mall.order.impl.OrdersServiceImpl`多了一個通過@Reference調用account服務的注解,其他,略。

 

9.business模塊:SeataAutoConfig中因無本地事務,只需一個GlobalTransactionScanner,BusinessServiceImpl中:

  • - 類注解@Service是Spring的注解;
  • - 通過@Reference從注冊中心獲取storage和order服務;
  • - handleBusiness方法上通過@GlobalTransactional發起全局事務,方法內就是具體使用storage和order服務;
@Service
public class BusinessServiceImpl implements BusinessService {

    @Reference(version = "1.0.0")
    private StorageDubboService storageDubboService;

    @Reference(version = "1.0.0")
    private OrderDubboService orderDubboService;

    private boolean flag;

    @Override
    @GlobalTransactional(timeoutMills = 30000,name = "dubbo-seata-at-springboot")
    public ObjectResponse handleBusiness(BusinessDTO businessDTO) {
        System.out.println("開始全局事務,XID = " + RootContext.getXID());
        ObjectResponse<Object> objectResponse = new ObjectResponse<>();
        //1,減庫存
        CommodityDTO commodityDTO = new CommodityDTO();
        commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
        commodityDTO.setCount(businessDTO.getCount());
        ObjectResponse storageResponse = storageDubboService.decreaseStorage(commodityDTO);
        //2,創建訂單
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setUserId(businessDTO.getUserId());
        orderDTO.setCommodityCode(businessDTO.getCommodityCode());
        orderDTO.setOrderCount(businessDTO.getCount());
        orderDTO.setOrderAmount(businessDTO.getAmount());
        ObjectResponse<OrderDTO> response = orderDubboService.createOrder(orderDTO);

        //打開注釋測試事務發生異常后,全局回滾功能
//        if (!flag) {
//            throw new RuntimeException("測試拋異常后,分布式事務回滾!");
//        }
        if (storageResponse.getStatus() != 200 || response.getStatus() != 200) {
            throw new DefaultException(RspStatusEnum.FAIL);
        }

        objectResponse.setStatus(RspStatusEnum.SUCCESS.getCode());
        objectResponse.setMessage(RspStatusEnum.SUCCESS.getMessage());
        objectResponse.setData(response.getData());
        return objectResponse;
    }
} 

 

10.寫個BusinessController的方法,用於測試:

    @PostMapping("/buy")
    ObjectResponse handleBusiness(@RequestBody BusinessDTO businessDTO){
        LOGGER.info("請求參數:{}",businessDTO.toString());
        return businessService.handleBusiness(businessDTO);
    }

 

11.下載安裝TC ,即 Seata 的服務端,需要獨立部署運行,下載地址:https://github.com/seata/seata/releases,解壓,支持window和linux下直接啟動運行,如下linux命令,運行參數將指定port、host和imageFile的存儲方式:

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

 

12.測試,按順序啟動:Nacos-->SeataServer-->account-->order-->storage-->business ,啟動后的效果。

Nacos注冊的服務信息,注意Dubbo是區分provider和consumer的,這是不同於SpringCloud的地方,所以同一服務不同身份就有兩個了:

可以看到各RM向TC注冊的信息:

 

Postman提交至Controller:


提交運行后,一階段更新DB,二階段只需釋放鎖:

 

數據庫情況:

 

13.回滾測試:將`com.biao.mall.business.service.BusinessServiceImpl`中回滾測試代碼注釋去掉!手動拋出異常,再次Postman提交,可見:

  • - business先開啟了全局事務,並傳播了XID,二階段向TC提交rollback狀態;
  • - order中,可以看到分支事務是一階段提交了,異常后,二階段根據XID做了rollback;
  • - order有SQL信息,是application.yml中設置了logging.level為debug;

- 數據庫信息不變,貼圖,略;

 

14.測試undo_log表用途:
`com.biao.mall.business.service.BusinessServiceImpl`加個斷點:

 

其他模塊正常啟動,postman提交:

 

看undo_log表,這里只是個臨時的數據,二階段后會刪除:

  

***
復盤記: 

1.Seata只能支持RPC模式的事務,對MQ模式的分布式事務不能實施,比較好的搭配是Dubbo+Seata。

2.啟動應用向SeataServer注冊,不一定能一次成功,有時要嘗試多次,可見穩定性一般!

3.依賴沖突問題:報錯提示:`Class path contains multiple SLF4J bindings`,因其來自於以下兩個jar,
`logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class`
,`slf4j-nop-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class`
由於logback是主流,不排除,直接去掉`slf4j-nop`依賴,問題解決!

4.報錯:`NoSuchMethodError:org.yaml.snakeyaml.nodes.ScalarNode.getScalarStyle`,**特別注意**這種情況很多時候也是依賴沖突,而不是缺少類,處理方法:

  a.先百度,需要加入snakeyaml依賴,結果還是報錯,

  b.再先全局搜索,雙擊shift鍵,查找`ScalarNode`類,發現出現在兩個地方,估計沖突了,

  c.在Idea中使用依賴分析命令,`order`為module名,`snakeyaml`為依賴名:

  `gradle :order:dependencyInsight --dependency snakeyaml`

發現有多方引入的情況,結果是dubbo本身也使用了snakeyaml,直接在dubbo依賴中使用exclude語法排除,問題解決!

5.報錯:`NoSuchBeanDefinitionException: No qualifying bean of type 'com.biao.mall.order.dao.OrdersDao' available`:
表面上看是Mapper類無Bean實例,確定加了@Mapper和@Repository注解,還是錯誤!想到既然是缺少注入的Bean,可能是缺少mybatis-plus依賴導致,添加`mybatis-plus-boot-starter`,問題解決!

6.報錯:`io.seata.common.exception.NotSupportYetException: not support register type: null`,需添加 registry.conf 和 file.conf。

7.seata server安裝和啟動方法:
https://github.com/seata/seata/wiki/Quick-Start

8.報錯:`com.alibaba.nacos.api.exception.NacosException: java.lang.ClassNotFoundException`,添加Nacos相關依賴 dubbo-registry-nacos/spring-context-support/nacos-api/nacos-client。

9.dubbo的service是明顯區分consumer和provider的,如果使用Nacos做注冊中心,可以通過detail查看其服務角色,還有其提供的方法。

10.`com.biao.mall.storage.conf.SeataAutoConfig`中設置Mapper路徑,需使用`getResources("classpath:/mapper/*Mapper.xml"))`;不可使用`getResources("${mybatis.mapper-locations}"))`配置方式,
會告警:`Property 'mapperLocations' was specified but matching resources are not found`,最后導致Mapper文件無法加載,Dao方法讀取失敗,應用運行會異常,我估計是Bean加載順序問題,但沒有驗證,sorry。

11.本文參考文章地址:https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/

 

***

微信公眾號,只寫原創文章!


推薦閱讀:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM