前景
公司划分微服務后,選擇使用了springcloud netflix,沒有使用 springlcoud alibaba 所以這邊沒有通過阿里巴巴整合,
seata 使用 AT 模式
注冊中心 euraka
服務調用 openfeign
項目環境
springboot 2.1.3.RELEASE
springcloud Greenwich.SR1
seata 1.3
oracle 11g
准備工作
1、先下載 seata
下載地址:https://seata.io/zh-cn/blog/download.html
2、seata 對應的 sql 腳本
下載 seata 源碼后,在源碼目錄./seata-1.3.0/script/db/oracle.sql
undo_log腳本 ./seata/script/client/at/db/oracle.sql
3、oracle6驅動jar包,用於連接 oracle11g
下載地址:https://github.com/oldboyooxx/resource/tree/main/oracle
開始搭建
1、將下載后oracle 驅動包移到 lib 目錄下
2、修改 seata 配置
修改 registry.conf 文件(紅色為修改的地方)
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "" cluster = "default" username = "" password = "" } eureka { serviceUrl = "http://127.0.0.1:8761/eureka" application = "middle-service-seata" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
修改 file.conf 文件 (紅色為修改的地方)
## transaction log store, only used in seata-server store { ## store mode: file、db、redis mode = "db" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "oracle" driverClassName = "oracle.jdbc.driver.OracleDriver" url = "jdbc:oracle:thin:@172.***.***.***:1521:****" user = "******" password = "*******" minConn = 5 maxConn = 30 globalTable = "SEATA_GLOBAL_TABLE" branchTable = "SEATA_BRANCH_TABLE" lockTable = "SEATA_LOCK_TABLE" queryLimit = 100 maxWait = 5000 } ## redis store property redis { host = "127.0.0.1" port = "6379" password = "" database = "0" minConn = 1 maxConn = 10 queryLimit = 100 } }
創建 seata 需要的數據庫表
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE global_table ( xid VARCHAR2(128) NOT NULL, transaction_id NUMBER(19), status NUMBER(3) NOT NULL, application_id VARCHAR2(32), transaction_service_group VARCHAR2(32), transaction_name VARCHAR2(128), timeout NUMBER(10), begin_time NUMBER(19), application_data VARCHAR2(2000), gmt_create TIMESTAMP(0), gmt_modified TIMESTAMP(0), PRIMARY KEY (xid) ); CREATE INDEX idx_gmt_modified_status ON global_table (gmt_modified, status); CREATE INDEX idx_transaction_id ON global_table (transaction_id); -- the table to store BranchSession data CREATE TABLE branch_table ( branch_id NUMBER(19) NOT NULL, xid VARCHAR2(128) NOT NULL, transaction_id NUMBER(19), resource_group_id VARCHAR2(32), resource_id VARCHAR2(256), branch_type VARCHAR2(8), status NUMBER(3), client_id VARCHAR2(64), application_data VARCHAR2(2000), gmt_create TIMESTAMP(6), gmt_modified TIMESTAMP(6), PRIMARY KEY (branch_id) ); CREATE INDEX idx_xid ON branch_table (xid); -- the table to store lock data CREATE TABLE lock_table ( row_key VARCHAR2(128) NOT NULL, xid VARCHAR2(96), transaction_id NUMBER(19), branch_id NUMBER(19) NOT NULL, resource_id VARCHAR2(256), table_name VARCHAR2(32), pk VARCHAR2(36), gmt_create TIMESTAMP(0), gmt_modified TIMESTAMP(0), PRIMARY KEY (row_key) ); CREATE INDEX idx_branch_id ON lock_table (branch_id);
這邊用的是 AT 模式,所以需要添加 undo_log 表
-- for AT mode you must to init this sql for you business database. the seata server not need it. CREATE TABLE undo_log ( id NUMBER(19) NOT NULL, branch_id NUMBER(19) NOT NULL, xid VARCHAR2(128) NOT NULL, context VARCHAR2(128) NOT NULL, rollback_info BLOB NOT NULL, log_status NUMBER(10) NOT NULL, log_created TIMESTAMP(0) NOT NULL, log_modified TIMESTAMP(0) NOT NULL, PRIMARY KEY (id), CONSTRAINT ux_undo_log UNIQUE (xid, branch_id) ); COMMENT ON TABLE undo_log IS 'AT transaction mode undo table'; -- Generate ID using sequence and trigger CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
3、啟動服務
全部修改后運行seata-server.sh腳本啟動,到eureka 中查看是否注冊成功
在項目中引用 seata
1、對應的模塊 pom 添加依賴
<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.0</version> </dependency>
2、修改啟動的 application.yml
添加以下配置注意紅色部分
seata: enabled: true application-id: **-seata-***-server #服務名 tx-service-group: default # default是自定義的事務分組名稱 enable-auto-data-source-proxy: true # 啟用自動數據源代理 use-jdk-proxy: false # excludes-for-auto-proxying: # client: # rm: # async-commit-buffer-limit: 1000 # report-retry-count: 5 # table-meta-check-enable: false # report-success-enable: false # saga-branch-register-enable: false # lock: # retry-interval: 10 # retry-times: 30 # retry-policy-branch-rollback-on-conflict: true # tm: # commit-retry-count: 5 # rollback-retry-count: 5 # undo: # data-validation: true # log-serialization: jackson # log-table: undo_log # log: # exceptionRate: 100 service: vgroup-mapping: default: middle-service-seata # default是自定義的事務分組名稱,seata-server是tc注冊到注冊中心的服務名稱 # grouplist: # default: 127.0.0.1:8091 # 僅注冊中心為file時使用 enable-degrade: false # 是否啟用降級 disable-global-transaction: false # 是否禁用全局事務 # transport: # shutdown: # wait: 3 # thread-factory: # boss-thread-prefix: NettyBoss # worker-thread-prefix: NettyServerNIOWorker # server-executor-thread-prefix: NettyServerBizHandler # share-boss-worker: false # client-selector-thread-prefix: NettyClientSelector # client-selector-thread-size: 1 # client-worker-thread-prefix: NettyClientWorkerThread # worker-thread-size: default # boss-thread-size: 1 # type: TCP # server: NIO # heartbeat: true # serialization: seata # compressor: none # enable-client-batch-send-request: true config: type: file # 配置中心為file模式 # consul: # server-addr: 127.0.0.1:8500 # apollo: # apollo-meta: http://192.168.1.204:8801 # app-id: seata-server # namespace: application # etcd3: # server-addr: http://localhost:2379 # nacos: # namespace: # serverAddr: localhost # group: SEATA_GROUP # userName: "" # password: "" # zk: # server-addr: 127.0.0.1:2181 # session-timeout: 6000 # connect-timeout: 2000 # username: "" # password: "" registry: type: eureka # 注冊中心為eureka eureka: weight: 1 service-url: http://127.0.0.1:8761/eureka # 注冊中心地址 # consul: # server-addr: 127.0.0.1:8500 # etcd3: # serverAddr: http://localhost:2379 # nacos: # application: seata-server # server-addr: localhost # namespace: # userName: "" # password: "" # redis: # server-addr: localhost:6379 # db: 0 # password: # timeout: 0 # sofa: # server-addr: 127.0.0.1:9603 # region: DEFAULT_ZONE # datacenter: DefaultDataCenter # group: SEATA_GROUP # addressWaitTime: 3000 # application: default # zk: # server-addr: 127.0.0.1:2181 # session-timeout: 6000 # connect-timeout: 2000 # username: "" # password: "" # -----------seata--------------
3、模塊的資源文件目錄下添加 file.conf、registry.conf 文件
例:
file.conf 內容如下
transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #vgroup->rgroup vgroup_mapping.fsp_tx_group = "default" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" disableGlobalTransaction = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } }
registry.conf
registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" group = "SEATA_GROUP" namespace = "" cluster = "default" username = "" password = "" } eureka { serviceUrl = "http://127.0.0.1:8761/eureka" application = "middle-service-seata" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
4、其它服務配置重復上面的步驟既可,注意紅色部分
5、最后啟動項目
可以在 seata 控制台中看到注冊信息表示注冊成功,然后就可以再代碼中使用@GlobalTransactional注解了
處理 xid 跨傳遞以及全局異常捕獲的問題
xid 無法傳遞的話會出現跨服務事務 id 不一樣的問題,導致無法全局回滾,
原理就是
服務 A 調用 -- > 服務 B
服務 A將 xid 放入 header 中,服務 B使用攔截器將 xid 放到上下文
注意,springcloud alibaba 那套才有自帶處理 xid 傳遞的方案,其它架構參考源碼處理,下面貼一下我的處理辦法吧,僅供參考
feign 攔截器添加xid 判斷,如果存在將 xid 放入到header 中
import com.icsshs.xxxx.common.pojo.NonRequestAttributes; import feign.RequestInterceptor; import feign.RequestTemplate; import io.seata.core.context.RootContext; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.util.Enumeration; import java.util.LinkedHashMap; import java.util.Map; /** * feign 攔截器 * 1、傳遞 request * 2、seata xid 傳遞 * * @author zhongzm */ @Configuration public class FeignInterceptor implements RequestInterceptor { private final String AUTHORIZATION_HEADER = "Authorization"; private final String BEARER_TOKEN_TYPE = "Bearer"; @Override public void apply(RequestTemplate requestTemplate) { RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); if (requestAttributes != null) { if (requestAttributes instanceof ServletRequestAttributes) { HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest(); Map<String, String> headers = getHeaders(request); for (String headerName : headers.keySet()) { requestTemplate.header(headerName, getHeaders(request).get(headerName)); } } else if (requestAttributes instanceof NonRequestAttributes) { String authorization = String.valueOf(requestAttributes.getAttribute(AUTHORIZATION_HEADER, 0)); requestTemplate.header(AUTHORIZATION_HEADER, authorization); } } } private Map<String, String> getHeaders(HttpServletRequest request) { Map<String, String> map = new LinkedHashMap<>(); Enumeration<String> enumeration = request.getHeaderNames(); while (enumeration.hasMoreElements()) { String key = enumeration.nextElement(); String value = request.getHeader(key); map.put(key, value); } //全局事務判斷是否存在 xid,通過 header 傳遞 String xid = RootContext.getXID(); if (StringUtils.isNotEmpty(xid)) { map.put(RootContext.KEY_XID, xid); } return map; } }
添加 mvc 適配器,將 seata 源碼的TransactionPropagationIntercepter添加到攔截器中
import io.seata.integration.http.TransactionPropagationIntercepter; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * Springmvc Intercepter. * * @author pgy * @date 2021/9/29 5:54 下午 **/ @Configuration public class MvcConfigurer implements WebMvcConfigurer { /** * 添加攔截器 * * @param registry */ @Override public void addInterceptors(InterceptorRegistry registry) { //seata事務傳遞攔截器 registry.addInterceptor(new TransactionPropagationIntercepter()); } }
然后seata 和全局異常也有沖突,不清楚是我配置的問題還是官方就有這個問題,我這邊的處理方案是再全局異常的地方手動添加回滾,
可以參考官方的異常處理方案 io.seata.integration.http.HttpHandlerExceptionResolver
下面是我們的處理方案,這邊我是手動回滾
/** * 捕獲運行時異常 * * @param req * @param e * @return * @throws Exception */ @ExceptionHandler(value = RuntimeException.class) @ResponseBody public HsResponse runtimeExceptionHandler(HttpServletRequest req, Exception e) throws Exception { String errorMsg = e.getMessage(); //RuntimeException判斷是否開啟全局事務,然后手動回滾 if (RootContext.inGlobalTransaction()) { GlobalTransactionContext.reload(RootContext.getXID()).rollback(); XidResource.cleanXid(RootContext.getXID()); } logger.error(ExceptionUtils.getFullStackTrace(e)); return new HsResponse().error(errorMsg); }
最后如果是公共模塊記得再項目啟動的時候導入配置
@EnableEurekaClient @EnableFeignClients @ImportAutoConfiguration(value = { com.icsshs.xxxx.common.config.GlobalExceptionHandler.class, com.icsshs.xxxx.common.config.MvcConfigurer.class,}) @SpringBootApplication public class XxxxxxxxxxApplication { public static void main(String[] args) { SpringApplication.run(XxxxxxxxxxxxApplication.class, args); } }