本文只介紹Seata的簡單使用,沒有涉及其原理.
1.在本地搭建一個TC服務(事務協調者).
1.1 下載seata的安裝包
官網(https://github.com/seata/seata/releases)
1.2 配置
打開解壓目錄下的conf/registry.conf文件如下
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# 可以把seata-server理解為一個服務,它需要把自己注冊到某個注冊中心上去,方便使用seata的服務來找到自己
#在這里就是指定注冊中心的類型,由於我們項目用的是eureka,所以這里我選擇eureka,即這一堆配置就下面一個eureka生效了
#這里默認的是file,即文件,選了文件就可以不用搭注冊中心,直接從文件里讀取服務列表
#復制之后一定要改一改
type = "eureka"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
eureka { #"只有我生效啦"
serviceUrl = "http://localhost:10086/eureka" #eureka地址
application = "seata_tc_server" #在eureka里顯示的名字
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
#在這里選擇配置中心,這里我們選擇file
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "localhost"
namespace = ""
group = "SEATA_GROUP"
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
app.id = "seata-server"
apollo.meta = "http://192.168.1.204:8801"
namespace = "application"
}
zk {
serverAddr = "127.0.0.1:2181"
session.timeout = 6000
connect.timeout = 2000
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
#由於選擇了file,所以這里生效了
name = "file.conf"
}
}
## transaction log store, only used in seata-server
store {
## store mode: file、db
#選擇配置中心的存儲模式,由於選擇file存到文件里(性能高)會變為二進制流不好觀察,所以選擇數據庫
#復制之后一定要改一改
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
#選擇了數據庫必定要做出一些配置,數據庫里一定要有這3張表
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://192.168.206.99:3306/seata"
user = "root"
password = "root"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
建表SQL如下:
CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(96), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
1.3 啟動
如果是linux環境(要有JRE),執行seata-server.sh
如果是windows環境,執行
2 改造微服務
只要是需要用到seata(分布式事務)的服務,都要做類似的配置.
2.1 引入依賴
我這里是springboot項目,所以我先在父pom中聲明了.如下
<properties> <alibaba.seata.version>2.1.0.RELEASE</alibaba.seata.version> <seata.version>1.1.0</seata.version> </properties> <dependencyManagement> <dependencies> <!--seata--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> <version>${alibaba.seata.version}</version> <exclusions> <exclusion> <artifactId>seata-all</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <dependency> <artifactId>seata-all</artifactId> <groupId>io.seata</groupId> <version>${seata.version}</version> </dependency> </dependencies> </dependencyManagement>
接下來只要在需要seata的微服務里添加依賴就好了.
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-seata</artifactId> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> </dependency>
spring:
cloud:
alibaba:
seata:
tx-service-group: test_tx_group # 定義事務組的名稱
2.3 在resources目錄下添加2個文件file.conf和registry.conf
registry.conf和前面的一樣,直接復制過來就好.
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#這里注意,等號前后都是配置,前面是yml里配置的事務組,后面是register.conf里定義的seata-server
vgroupMapping.test_tx_group = "seata_tc_server"
#only support when registry.type=file, please don't set multiple addresses
seata_tc_server.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
-
transport:與TC交互的一些配置-
heartbeat:client和server通信心跳檢測開關 -
enableClientBatchSendRequest:客戶端事務消息請求是否批量合並發送
-
-
service:TC的地址配置,用於獲取TC的地址-
vgroupMapping.test_tx_group = "seata_tc_server":-
test_tx_group:是事務組名稱,要與application.yml中配置一致, -
seata_tc_server:是TC服務端集群的名稱,將來通過注冊中心獲取TC地址 -
enableDegrade:服務降級開關,默認關閉。如果開啟,當業務重試多次失敗后會放棄全局事務 -
disableGlobalTransaction:全局事務開關,默認false。false為開啟,true為關閉
-
-
default.grouplist:這個當注冊中心為file的時候,才用到
-
-
client:客戶端配置-
rm:資源管理器配-
asynCommitBufferLimit:二階段提交默認是異步執行,這里指定異步隊列的大小 -
lock:全局鎖配置-
retryInterval:校驗或占用全局鎖重試間隔,默認10,單位毫秒 -
retryTimes:校驗或占用全局鎖重試次數,默認30次 -
retryPolicyBranchRollbackOnConflict:分支事務與其它全局回滾事務沖突時鎖策略,默認true,優先釋放本地鎖讓回滾成功
-
-
reportRetryCount:一階段結果上報TC失敗后重試次數,默認5次
-
-
tm:事務管理器配置-
commitRetryCount:一階段全局提交結果上報TC重試次數,默認1 -
rollbackRetryCount:一階段全局回滾結果上報TC重試次數,默認1
-
-
undo:undo_log的配置-
dataValidation:是否開啟二階段回滾鏡像校驗,默認true -
logSerialization:undo序列化方式,默認Jackson -
logTable:自定義undo表名,默認是undo_log
-
-
log:日志配置-
exceptionRate:出現回滾異常時的日志記錄頻率,默認100,百分之一概率。回滾失敗基本是臟數據,無需輸出堆棧占用硬盤空間
-
-
2.4 代理DataSource
由於在一階段是通過攔截sql分析語義來生成回滾策略,原來的數據源已經不夠用了,得換個牛逼的.在服務里新建一個配置類.
-
如果是使用的是mybatis
import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; @Configuration public class DataSourceProxyConfig { @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception { // 因為使用的是mybatis,這里定義SqlSessionFactoryBean SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); // 配置數據源代理 sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource)); return sqlSessionFactoryBean.getObject(); } }
如果使用的是mybatis-plus
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; @Configuration public class DataSourceProxyConfig { @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception { // 訂單服務中引入了mybatis-plus,所以要使用特殊的SqlSessionFactoryBean MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); // 代理數據源 sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource)); // 生成SqlSessionFactory return sqlSessionFactoryBean.getObject(); } }
2.5 加上注解
給事務發起者的方法上加上@GlobalTransactional即可,其它的參與者只要加@Transactional
3.踩坑記錄
1.由於更換了數據源,不知道為什么我在yml里給mybatis配置的駝峰映射失效了,導致我查到的數據缺少了某些字段.不知道這個問題在mybatis-plus中會不會出現.
解決辦法:單獨給數據源配置映射規則就好了.所以我把上面的配置類加了一個設置,修改后的代碼如下.
import io.seata.rm.datasource.DataSourceProxy; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; @Configuration public class DataSourceProxyConfig { @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSource dataSource) throws Exception { // 因為使用的是mybatis,這里定義SqlSessionFactoryBean SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); // 配置數據源代理 sqlSessionFactoryBean.setDataSource(new DataSourceProxy(dataSource)); SqlSessionFactory object = sqlSessionFactoryBean.getObject(); assert object != null;
// 單獨給數據源設置駝峰映射 object.getConfiguration().setMapUnderscoreToCamelCase(true); return object; } }
