seata基本使用


seata基本使用

官方文檔:https://seata.io/zh-cn/docs/overview/what-is-seata.html

seata是一款開源的分布式事務解決方案,致力於在微服務架構下提供高性能和簡單易用的分布式事務服務

一ID+三組件模型

Transaction ID XID:全局唯一的事務ID

Transaction Coordinator(TC):事務協調者,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾

Transaction Manager(TM):事務管理器,定義全局事務的范圍:開始全局事務、提交或回滾全局事務

Resource Manager(RM):資源管理器,控制分支事務,負責分支注冊、狀態匯報、並接收事務協調器的指令,驅動分支事務的提交和回滾

seata分布式事務處理過程

  • TM向TC申請開啟一個全局事務,全局事務創建成功並生成一個全局唯一的XID
  • XID在微服務調用鏈路的上下文中傳播
  • RM向TC注冊分支事務,將其納入XID對應全局事務的管轄
  • TM向TC發起針對XIK的全局提交或回滾決議
  • TC調度XID下管轄的全部分支事務完成提交或回滾請求

windows安裝啟動seata服務端

下載seata,地址:https://github.com/seata/seata/releases

下載后解壓,不同版本的seata解壓后的內容不一樣,但配置都沒有太大的區別,筆者下載的是1.4.1版本

修改conf目錄下的file.conf配置文件

1、設置存儲模式為db

mode = "db"

2、設置db配置的用戶名和密碼(自己本地mysql的用戶名和密碼)

user = "root"
password = "123456"

注意:如果mysql版本是8.0及以上,driverClassName和url需要如下配置:

driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?serverTimezone=UTC"

3、新建seata數據庫並執行對應sql腳本,conf目錄下的README-zh.md文件中server鏈接了server端的配置和sql,,為了方便,腳本內容貼在下面:

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);

修改conf目錄下的registry.conf配置文件

4、設置注冊類型為nacos

type = "nacos"

5、設置服務注冊到nacos(namespace不指定默認為public)

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "426851b4-b7d3-4977-a440-bf6efa561760"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

至此,配置完成,啟動nacos,然后雙擊bin目錄下的seata-server.bat啟動seata,可以在nacos看到seata服務成功注冊進去

registry.conf默認將配置存儲在file.conf,也可以設置存儲在nacos

實戰案例

業務背景

創建三個服務,一個訂單服務,一個庫存服務,一個賬戶服務

當用戶下單時,會在訂單服務中創建一個訂單,然后通過遠程調用庫存服務來扣減下單商品的庫存,再通過遠程調用賬戶服務來扣減用戶賬戶里面的余額,最后在訂單服務中修改訂單狀態為已完成

該操作跨越三個數據庫,有兩次遠程調用,很明顯會有分布式事務問題

數據庫

准備三個服務對應數據庫和表,sql腳本如下:

CREATE DATABASE seata_order;
USE seata_order;
CREATE TABLE t_order(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    user_id BIGINT(11) DEFAULT NULL COMMENT '用戶id',
    product_id BIGINT(11) DEFAULT NULL COMMENT '產品id',
    count INT(11) DEFAULT NULL COMMENT '數量',
    money DECIMAL(11,0) DEFAULT NULL COMMENT '金額',
    status INT(1) DEFAULT NULL COMMENT '訂單狀態:0創建中,1已完結'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;

CREATE DATABASE seata_storage;
USE seata_storage;
CREATE TABLE t_storage(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    product_id BIGINT(11) DEFAULT NULL COMMENT '產品id',
    total INT(11) DEFAULT NULL COMMENT '總庫存',
    used INT(11) DEFAULT NULL COMMENT '已用庫存',
    residue INT(11) DEFAULT NULL COMMENT '剩余庫存'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;
INSERT INTO t_storage(id, product_id, total, used, residue) VALUES(1,1,100,0,100);

CREATE DATABASE seata_account;
USE seata_account;
CREATE TABLE t_account(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY ,
    user_id BIGINT(11) DEFAULT NULL COMMENT '用戶id',
    total DECIMAL(10,0) DEFAULT NULL COMMENT '總額度',
    used DECIMAL(10,0) DEFAULT NULL COMMENT '已用額度',
    residue DECIMAL(10,0) DEFAULT 0 COMMENT '剩余可用額度'
)ENGINE=InnoDB AUTO_INCREMENT=7 CHARSET=utf8;
INSERT INTO t_account(id, user_id, total, used, residue) VALUES(1,1,1000,0,1000);

每個數據庫需要分別創建一張seata回滾日志表,conf目錄下的README-zh.md文件中client 鏈接了該sql腳本(和舊版seata有所不同),為了方便,腳本內容貼在下面:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

新建三個項目,分別對應訂單、庫存、賬戶,項目除了業務不同配置基本一致,由於業務代碼量太多(基本就是增刪改查比較簡單)所以下面只講解訂單項目相關配置:

導入依賴

spring-cloud-starter-alibaba-seata中包含了seata-all依賴,由於spring-cloud-starter-alibaba-seata版本可能跟本地安裝的seata版本不一致,所以需要排除spring-cloud-starter-alibaba-seata中的seata依賴,重新導入seata依賴保持版本和本地安裝的seata版本一致

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>seata-all</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>1.4.1</version>
</dependency>

seata客戶端配置

在項目resource文件夾下新建registry.conf配置文件,內容如下:

# 注冊管理
registry {
  # 注冊類型:file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "426851b4-b7d3-4977-a440-bf6efa561760"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

# 配置管理
config {
  # 配置信息存儲類型:file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
      serverAddr = "127.0.0.1:8848"
      namespace = ""
      group = "SEATA_GROUP"
      username = "nacos"
      password = "nacos"
    }
}

conf目錄下的README-zh.md文件中config-center鏈接了seata客戶端配置文件(config.txt)和讀取seata客戶端配置文件在nacos生成配置的腳本(nacos-confg-interactive.sh)

將config.txt和nacos-confg-interactive.sh下載到本地(config.txt需要放在nacos-confg-interactive.sh所在目錄的上一級目錄)

config.txt內容包含了seata服務端和客戶端所有配置,我們只需要保存客戶端相關配置即可,為了方便,內容貼在下面:

#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client 事務組,default_tx_group可以自定義配置
service.vgroupMapping.default_tx_group=default

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.rm.sqlParserType=druid
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

在nacos-confg-interactive.sh所在目錄啟動git終端執行以下命令運行腳本(本地需要先安裝git)

sh nacos-confg-interactive.sh

運行后根據提示在終端輸入nacos相關參數(和registry.conf配置保持一致)在nacos生成seata配置文件

application.yml配置文件seata相關配置

spring:
  cloud:
    alibaba:
      seata:
        # 配置事務組名稱
        tx-service-group: default_tx_group
        # 配置事務組值(service.vgroupMapping.default_tx_group中的default_tx_group是tx-service-group配置的值)
        service:
          vgroupMapping:
            default_tx_group: default

注意:tx-service-group配置的值和config.txt中service.vgroupMapping配置保持一致

使用Seata對數據源進行代理

新建配置類

package com.yl.seata.order.config;

import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

/**
 * 使用Seata對數據源進行代理
 *
 * @auther Y-wee
 */
@Configuration
public class DataSourceProxyConfig {

    @Value("${mybatis.mapperLocations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSourceProxy);
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
        sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
        return sqlSessionFactoryBean.getObject();
    }

}

主啟動類

主動類取消數據源的自動創建

package com.yl.seata.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

//@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
// 取消數據源的自動創建
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class SeataOrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeataOrderApplication.class,args);
    }

}

業務層

業務方法加@GlobalTransactional注解實現事務控制

package com.yl.seata.order.service.impl;

import com.yl.seata.order.entity.Order;
import com.yl.seata.order.mapper.OrderMapper;
import com.yl.seata.order.service.AccountService;
import com.yl.seata.order.service.OrderService;
import com.yl.seata.order.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import io.seata.spring.annotation.GlobalTransactional;

import javax.annotation.Resource;

/**
 * 訂單
 *
 * @auther Y-wee
 */
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Resource
    private OrderMapper orderDao;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;

    /**
     * 創建訂單->調用庫存服務扣減庫存->調用賬戶服務扣減賬戶余額->修改訂單狀態
     * 簡單說:下訂單->扣庫存->減余額->改狀態
     *
     * @param order
     */
    @Override
    @GlobalTransactional(rollbackFor = Exception.class)
    public void create(Order order) {
        log.info("-----開始新建訂單");
        //1 新建訂單
        orderDao.create(order);

        //2 扣減庫存
        log.info("-----訂單微服務開始調用庫存,做扣減Count");
        storageService.decrease(order.getProductId(), order.getCount());
        log.info("-----訂單微服務開始調用庫存,做扣減end");

        //3 扣減賬戶
        log.info("-----訂單微服務開始調用賬戶,做扣減Money");
        accountService.decrease(order.getUserId(), order.getMoney());
        log.info("-----訂單微服務開始調用賬戶,做扣減end");

        //4 修改訂單狀態,從零到1,1代表已經完成
        log.info("-----修改訂單狀態開始");
        orderDao.update(order.getUserId(), 0);
        log.info("-----修改訂單狀態結束");

        log.info("-----下訂單結束了,O(∩_∩)O哈哈~");
    }
}

配置完成->啟動nacos->啟動seata服務端->分別啟動訂單、庫存、賬戶項目->發送請求拋出異常事務回滾


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM