分布式事務五-TC全局事務協調器


Seata Server - TC全局事務協調器

《分布式事務(三)Seata分布式事務框架-AT模式介紹》 中介紹了 Seata AT 事務原理,介紹了 AT 事務的三個角色:TC(事務協調器)、TM(事務管理器)和RM(資源管理器),其中 TM 和 RM 是嵌入在業務應用中的,而 TC 則是一個獨立服務。

a

Seata Server 就是 TC,直接從官方倉庫下載啟動即可,下載地址:https://github.com/seata/seata/releases

Seata Server 配置

Seata Server 的配置文件有兩個:

  • seata/conf/registry.conf
  • seata/conf/file.conf

registry.conf

a
Seata Server 要向注冊中心進行注冊,這樣,其他服務就可以通過注冊中心去發現 Seata Server,與 Seata Server 進行通信。

Seata 支持多款注冊中心服務:nacos 、eureka、redis、zk、consul、etcd3、sofa。

我們項目中要使用 eureka 注冊中心,eureka服務的連接地址、注冊的服務名,這需要在 registry.conf 文件中進行配置:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  # 這里選擇 eureka 注冊配置
  type = "eureka"

  nacos {
	......
  }

  # eureka的注冊配置
  eureka {
    # 注冊中心地址
    serviceUrl = "http://localhost:8761/eureka"
    # 注冊的服務ID
    application = "seata-server"
    weight = "1"
  }
  
  redis {
	......
  }
  ......
12345678910111213141516171819202122

file.conf

Seata 需要存儲全局事務信息、分支事務信息、全局鎖信息,這些數據存儲到什么位置?

針對存儲位置的配置,支持放在配置中心,或者也可以放在本地文件。Seata Server 支持的配置中心服務有:nacos 、apollo、zk、consul、etcd3。

這里我們選擇最簡單的,使用本地文件,這需要在 registry.conf 配置文件中來指定:

......

config {
  # file、nacos 、apollo、zk、consul、etcd3
  # 在這里選擇使用本地文件來保存配置
  type = "file"


......

  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  
  file {
    # 在這里設置配置文件的文件名
    name = "file.conf"
  }
}
12345678910111213141516171819

file.conf 中對事務信息的存儲位置進行配置,存儲位置支持:file、db、redis。

這里我們選擇數據庫作為存儲位置,這需要在 file.conf 中進行配置:

store {
  ## store mode: file、db、redis
  # 這里選擇數據庫存儲
  mode = "db"

  ## file store property
  file {
  	......
  }

  # 數據庫存儲
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"

	# 數據庫連接配置
    url = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8"
    user = "root"
    password = "root"
    minConn = 5
    maxConn = 30

	# 事務日志表表名設置
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"

    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
  	......
  }
}
123456789101112131415161718192021222324252627282930313233343536373839

a

啟動參數設置

啟動文件:seata-server.bat

用文本編輯器打開文件,找到文件中這一行:

%JAVACMD% %JAVA_OPTS% -server -Xmx2048m -Xms2048m -Xmn1024m -Xss512k -XX:Sur......
1

看到 Seata Server 默認使用 2G 內存,測試環境我們可以把內存調低:

%JAVACMD% %JAVA_OPTS% -server -Xmx256m -Xms256m -Xmn128m -Xss512k -XX:Sur......
1

啟動 Seata Server

雙擊 seata-server.bat 啟動 Seata Server。

查看 Eureka 注冊中心 Seata Server 的注冊信息:

a

order訂單服務添加 Seata AT 事務

業務

訂單調用庫存和賬戶,我們先從前面的訂單開始。

在訂單項目中要啟動全局事務,還要執行訂單保存的分支事務

order-parent 添加 seata 依賴

 <!-- 先排除,然后添加最新版本-->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-alibaba-seata</artifactId>
          <version>${spring-cloud-alibaba-seata.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>seata-all</artifactId>
              <groupId>io.seata</groupId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>io.seata</groupId>
          <artifactId>seata-all</artifactId>
          <version>${seata.version}</version>
        </dependency>

添加配置

application.yml

TC 事務協調器通過“事務組”的方式將多個服務組織成一個全局事務。每個服務啟動時要向TC注冊,加入同一個事務組。

spring:
  ......
  
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group

......

registry.conf

需要從注冊中心獲得 TC 的地址,這里配置注冊中心的地址。

TC 在注冊中心注冊的服務ID在下面 file.conf 中指定。

只需要修改type 以及type對應的配置

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    # application = "default"
    # weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf

在這里我們指定 TC 的服務ID seata-server

vgroupMapping.order_tx_group = "seata-server"

order_tx_group 對應 application.yml 中注冊的事務組名。

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  # order_tx_group 與 yml 中的 “tx-service-group: order_tx_group” 配置一致
  # “seata-server” 與 TC 服務器的注冊名一致
  # 從eureka獲取seata-server的地址,再向seata-server注冊自己,設置group
  vgroupMapping.order_tx_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  order_tx_group.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

創建 seata 數據源代理

Seata AT 事務對業務代碼無侵入,全自動化處理全局事務,其功能是靠 Seata 的數據源代理工具實現的。

這里我們創建 Seata 的數據源代理,並排除 Spring 默認的數據源。

package cn.tedu.order;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DatasourceConfiguration {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();;
    }

    @Primary  //設置首選數據源對象
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return new DataSourceProxy(druidDataSource);
    }
}

主程序中排除Springboot 的默認數據源:

package cn.tedu.order;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.openfeign.EnableFeignClients;

@EnableFeignClients
@MapperScan("cn.tedu.order.mapper")
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) 
//排除默認的數據源自動配置
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

}

啟動全局事務

Seata AT 對業務無侵入,所以啟動全局事務非常簡單,只需要添加一個 @GlobalTransactional 注解即可。

另外我們一步一步地添加全局事務並測試,這里先把 storage 和 account 調用注掉。

package cn.tedu.order.service;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    EasyIdGeneratorClient easyIdGeneratorClient;
    @Autowired
    private AccountClient accountClient;
    @Autowired
    private StorageClient storageClient;

    // 1 創建TM  2.連接TC 3.注冊全局事務 向TC注冊
    @GlobalTransactional //全局事務 
    @Override
    public void create(Order order) {
        // 從全局唯一id發號器獲得id
        Long orderId = easyIdGeneratorClient.nextId("order_business");
        order.setId(orderId);

        orderMapper.create(order);

        // 修改庫存
        //storageClient.decrease(order.getProductId(), order.getCount());

        // 修改賬戶余額
        //accountClient.decrease(order.getUserId(), order.getMoney());

    }
}

啟動 order 項目進行測試

按順序啟動服務:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

觀察控制台,看到全局事務和訂單的分支事務已經啟動,並可以看到全局事務ID(XID)和分支事務ID(Branch ID):

a

然后觀察數據庫中新添加的訂單數據:

a

測試出現異常,回滾的情況

在業務代碼中加一個模擬異常再試一下:

package cn.tedu.order.service;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    EasyIdGeneratorClient easyIdGeneratorClient;
    @Autowired
    private AccountClient accountClient;
    @Autowired
    private StorageClient storageClient;

    @GlobalTransactional
    @Override
    public void create(Order order) {
        // 從全局唯一id發號器獲得id
        Long orderId = easyIdGeneratorClient.nextId("order_business");
        order.setId(orderId);

        orderMapper.create(order);
		//模擬回滾業務
        if (Math.random() < 0.5) {
            throw new RuntimeException("模擬異常");
        }

        // 修改庫存
        //storageClient.decrease(order.getProductId(), order.getCount());

        // 修改賬戶余額
        //accountClient.decrease(order.getUserId(), order.getMoney());

    }
}

重啟 order 項目,並調用保存訂單:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

可以看到全局事務回滾的日志:

a

訂單啟動全局事務部分完成,在繼續之前,先把模擬異常注釋掉:

        ......

        //if (Math.random() < 0.5) {
        //    throw new RuntimeException("模擬異常");
        //}

        ......

storage庫存服務添加 Seata AT 事務

配置

與訂單項目中添加的配置完全相同,請參考訂單配置章節配置下面三個文件:

  • application.yml
  • registry.conf
  • file.conf

創建 seata 數據源代理

與訂單項目中數據源代理完全相同,請參考訂單中數據源代理章節,在 cn.tedu.storage 包下創建數據源配置類 DatasourceConfiguration。主程序注解排除 DataSourceAutoConfiguration 自動配置類。

package cn.tedu.storage;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration  //自動配置
public class DatasourceConfiguration {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return new DruidDataSource();
    }
    @Primary //設置首選數據源對象
    @Bean("dataSource")
    public DataSourceProxy dataSource(DataSource druidDataSource){
        return  new DataSourceProxy(druidDataSource);
    }

}

啟動分支事務

在業務方法上添加 @Transactional 注解啟動本地事務:

package cn.tedu.storage.service;

import cn.tedu.storage.mapper.StorageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StorageServiceImpl implements StorageService {
    @Autowired
    private StorageMapper storageMapper;

    @Transactional //該注解可加可不加
    @Override
    public void decrease(Long productId, Integer count) throws Exception {
        storageMapper.decrease(productId,count);
    }
}

order 的業務類中調用減少商品庫存

前面我們把調用商品庫存注釋掉了,現把注釋打開:

// 修改庫存
storageClient.decrease(order.getProductId(), order.getCount());

啟動 storage 項目進行測試

按順序啟動項目:

  1. Eureka
  2. Seata Server -單獨的服務器需要開啟
  3. Easy Id Generator ---id發號器
  4. Storage
  5. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

訂單會調用庫存,這兩個服務會分別啟動一個分支事務,兩個分支事務一起組成一個全局事務:

a

觀察兩個項目的控制台都有Seata AT事務的日志,Storage 項目控制台如下:

a

然后觀察數據庫中新添加的訂單和減少的庫存:

減少的庫存:
a

測試出現異常,進行回滾

在業務代碼中加一個模擬異常再試一下:

通過拋出運行時異常模擬事務回滾

package cn.tedu.storage.service;

import cn.tedu.storage.mapper.StorageMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class StorageServiceImpl implements StorageService {
    @Autowired
    private StorageMapper storageMapper;

    @Transactional
    @Override
    public void decrease(Long productId, Integer count) throws Exception {
        storageMapper.decrease(productId,count);

        if (Math.random() < 0.5) {
            throw new RuntimeException("模擬異常");
        }
    }
}

重啟 storage 項目,並調用保存訂單:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

查看數據庫表 order 和 storage,如果執行成功會新增訂單、減少庫存,如果執行失敗則數據沒有變化,被回滾了。

storage 分支事務部分完成,在繼續之前,先把模擬異常注釋掉:

        ......

        //if (Math.random() < 0.5) {
        //    throw new RuntimeException("模擬異常");
        //}

        ......

account賬戶服務添加 Seata AT 事務

配置

與訂單項目中添加的配置完全相同,請參考訂單配置章節配置下面三個文件:

  • application.yml
  • registry.conf
  • file.conf

創建 seata 數據源代理

與訂單項目中數據源代理完全相同,請參考訂單中數據源代理章節,在 cn.tedu.account 包下創建數據源配置類 DatasourceConfiguration。主程序注解排除 DataSourceAutoConfiguration 自動配置類。

package cn.tedu.account;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;

@Configuration
public class DatasourceConfiguration {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource(){
        return  new DruidDataSource();
    }

    @Primary
    @Bean("dataSource")
    public DataSourceProxy dataSourceProxy(DataSource druidDataSource){
        return  new DataSourceProxy(druidDataSource);
    }

}

啟動分支事務

在業務方法上添加 @Transactional 注解啟動本地事務:

package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountMapper accountMapper;

    @Transactional
    @Override
    public void decrease(Long userId, BigDecimal money) {
        accountMapper.decrease(userId,money);
    }
}

order 的業務類中調用扣減賬戶金額

把調用賬戶注釋打開:

// 修改賬戶余額
accountClient.decrease(order.getUserId(), order.getMoney());

啟動 account 項目進行測試

按順序啟動項目:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Account
  6. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

訂單會調用庫存和賬戶,這三個服務會分別啟動一個分支事務,三個分支事務一起組成一個全局事務:

a

觀察三個項目的控制台都有Seata AT事務的日志,account 項目控制台如下:

a

然后觀察數據庫中的訂單表、庫存表和賬戶表。

這是賬戶表,看到金額已經被扣減:
a

測試出現異常,進行回滾

在業務代碼中加一個模擬異常再試一下:

package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountMapper accountMapper;

    @Transactional
    @Override
    public void decrease(Long userId, BigDecimal money) {
        accountMapper.decrease(userId,money);

        if (Math.random() < 0.5) {
            throw new RuntimeException("模擬異常");
        }
    }
}

重啟 account 項目,並調用保存訂單:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

查看數據庫表 order、storage 和 account,如果執行成功會新增訂單、減少庫存、扣減金額,如果執行失敗則數據沒有變化,被回滾了。

失敗時,在 order 和 storage 控制台可以看到回滾日志。

這是 storage 的回滾日志:

 rm handle branch rollback 
.....
io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 
....
PhaseTwo_Rollbacked

a

account 分支事務部分完成,最后把模擬異常注釋掉:

        ......

        //if (Math.random() < 0.5) {
        //    throw new RuntimeException("模擬異常");
        //}

        ......


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM