springcloud netflix 整合 seata 使用 euraka 做注冊中心,數據庫 oracle,以及處理 xid 傳遞問題


前景

公司划分微服務后,選擇使用了springcloud netflix,沒有使用 springlcoud alibaba 所以這邊沒有通過阿里巴巴整合,

seata 使用 AT 模式

注冊中心 euraka

服務調用 openfeign

項目環境

springboot 2.1.3.RELEASE

springcloud Greenwich.SR1

seata 1.3 

oracle 11g

准備工作

1、先下載 seata

下載地址:https://seata.io/zh-cn/blog/download.html

2、seata 對應的 sql 腳本

下載 seata 源碼后,在源碼目錄./seata-1.3.0/script/db/oracle.sql 

undo_log腳本 ./seata/script/client/at/db/oracle.sql

3、oracle6驅動jar包,用於連接 oracle11g

下載地址:https://github.com/oldboyooxx/resource/tree/main/oracle

開始搭建

1、將下載后oracle 驅動包移到 lib 目錄下

2、修改 seata 配置

修改 registry.conf 文件(紅色為修改的地方)

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
 type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
 serviceUrl = "http://127.0.0.1:8761/eureka"
    application = "middle-service-seata"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

修改 file.conf 文件 (紅色為修改的地方)

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
 mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
 dbType = "oracle"
    driverClassName = "oracle.jdbc.driver.OracleDriver"
    url = "jdbc:oracle:thin:@172.***.***.***:1521:****"
    user = "******"
    password = "*******"
    minConn = 5
    maxConn = 30
    globalTable = "SEATA_GLOBAL_TABLE"
    branchTable = "SEATA_BRANCH_TABLE"
    lockTable = "SEATA_LOCK_TABLE"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    host = "127.0.0.1"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    queryLimit = 100
  }

}

 創建 seata 需要的數據庫表

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE global_table
(
    xid                       VARCHAR2(128) NOT NULL,
    transaction_id            NUMBER(19),
    status                    NUMBER(3)     NOT NULL,
    application_id            VARCHAR2(32),
    transaction_service_group VARCHAR2(32),
    transaction_name          VARCHAR2(128),
    timeout                   NUMBER(10),
    begin_time                NUMBER(19),
    application_data          VARCHAR2(2000),
    gmt_create                TIMESTAMP(0),
    gmt_modified              TIMESTAMP(0),
    PRIMARY KEY (xid)
);

CREATE INDEX idx_gmt_modified_status ON global_table (gmt_modified, status);
CREATE INDEX idx_transaction_id ON global_table (transaction_id);

-- the table to store BranchSession data
CREATE TABLE branch_table
(
    branch_id         NUMBER(19)    NOT NULL,
    xid               VARCHAR2(128) NOT NULL,
    transaction_id    NUMBER(19),
    resource_group_id VARCHAR2(32),
    resource_id       VARCHAR2(256),
    branch_type       VARCHAR2(8),
    status            NUMBER(3),
    client_id         VARCHAR2(64),
    application_data  VARCHAR2(2000),
    gmt_create        TIMESTAMP(6),
    gmt_modified      TIMESTAMP(6),
    PRIMARY KEY (branch_id)
);

CREATE INDEX idx_xid ON branch_table (xid);

-- the table to store lock data
CREATE TABLE lock_table
(
    row_key        VARCHAR2(128) NOT NULL,
    xid            VARCHAR2(96),
    transaction_id NUMBER(19),
    branch_id      NUMBER(19)    NOT NULL,
    resource_id    VARCHAR2(256),
    table_name     VARCHAR2(32),
    pk             VARCHAR2(36),
    gmt_create     TIMESTAMP(0),
    gmt_modified   TIMESTAMP(0),
    PRIMARY KEY (row_key)
);

CREATE INDEX idx_branch_id ON lock_table (branch_id);

這邊用的是 AT 模式,所以需要添加 undo_log 表

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE undo_log
(
    id            NUMBER(19)    NOT NULL,
    branch_id     NUMBER(19)    NOT NULL,
    xid           VARCHAR2(128) NOT NULL,
    context       VARCHAR2(128) NOT NULL,
    rollback_info BLOB          NOT NULL,
    log_status    NUMBER(10)    NOT NULL,
    log_created   TIMESTAMP(0)  NOT NULL,
    log_modified  TIMESTAMP(0)  NOT NULL,
    PRIMARY KEY (id),
    CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';

-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;

 

3、啟動服務 

全部修改后運行seata-server.sh腳本啟動,到eureka 中查看是否注冊成功

在項目中引用 seata

1、對應的模塊 pom 添加依賴

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.4.0</version>
</dependency>

2、修改啟動的 application.yml

添加以下配置注意紅色部分

seata:
  enabled: true
  application-id: **-seata-***-server #服務名
  tx-service-group: default # default是自定義的事務分組名稱
  enable-auto-data-source-proxy: true # 啟用自動數據源代理
  use-jdk-proxy: false
  #    excludes-for-auto-proxying:
  #    client:
  #        rm:
  #            async-commit-buffer-limit: 1000
  #            report-retry-count: 5
  #            table-meta-check-enable: false
  #            report-success-enable: false
  #            saga-branch-register-enable: false
  #            lock:
  #                retry-interval: 10
  #                retry-times: 30
  #                retry-policy-branch-rollback-on-conflict: true
  #        tm:
  #            commit-retry-count: 5
  #            rollback-retry-count: 5
  #        undo:
  #            data-validation: true
  #            log-serialization: jackson
  #            log-table: undo_log
  #        log:
  #            exceptionRate: 100
  service:
    vgroup-mapping:
      default: middle-service-seata # default是自定義的事務分組名稱,seata-server是tc注冊到注冊中心的服務名稱
    #        grouplist:
    #            default: 127.0.0.1:8091 #     僅注冊中心為file時使用
    enable-degrade: false # 是否啟用降級
    disable-global-transaction: false # 是否禁用全局事務
  #    transport:
  #        shutdown:
  #            wait: 3
  #        thread-factory:
  #            boss-thread-prefix: NettyBoss
  #            worker-thread-prefix: NettyServerNIOWorker
  #            server-executor-thread-prefix: NettyServerBizHandler
  #            share-boss-worker: false
  #            client-selector-thread-prefix: NettyClientSelector
  #            client-selector-thread-size: 1
  #            client-worker-thread-prefix: NettyClientWorkerThread
  #            worker-thread-size: default
  #            boss-thread-size: 1
  #        type: TCP
  #        server: NIO
  #        heartbeat: true
  #        serialization: seata
  #        compressor: none
  #        enable-client-batch-send-request: true
  config:
    type: file # 配置中心為file模式
  #        consul:
  #            server-addr: 127.0.0.1:8500
  #        apollo:
  #            apollo-meta: http://192.168.1.204:8801
  #            app-id: seata-server
  #            namespace: application
  #        etcd3:
  #            server-addr: http://localhost:2379
  #        nacos:
  #            namespace:
  #            serverAddr: localhost
  #            group: SEATA_GROUP
  #            userName: ""
  #            password: ""
  #        zk:
  #            server-addr: 127.0.0.1:2181
  #            session-timeout: 6000
  #            connect-timeout: 2000
  #            username: ""
  #            password: ""
  registry:
    type: eureka # 注冊中心為eureka
    eureka:
      weight: 1
      service-url: http://127.0.0.1:8761/eureka # 注冊中心地址
#        consul:
#            server-addr: 127.0.0.1:8500
#        etcd3:
#            serverAddr: http://localhost:2379
#        nacos:
#            application: seata-server
#            server-addr: localhost
#            namespace:
#            userName: ""
#            password: ""
#        redis:
#            server-addr: localhost:6379
#            db: 0
#            password:
#            timeout: 0
#        sofa:
#            server-addr: 127.0.0.1:9603
#            region: DEFAULT_ZONE
#            datacenter: DefaultDataCenter
#            group: SEATA_GROUP
#            addressWaitTime: 3000
#            application: default
#        zk:
#            server-addr: 127.0.0.1:2181
#            session-timeout: 6000
#            connect-timeout: 2000
#            username: ""
#            password: ""

# -----------seata--------------

3、模塊的資源文件目錄下添加 file.conf、registry.conf 文件

例:

file.conf 內容如下

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}

service {
  #vgroup->rgroup
  vgroup_mapping.fsp_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
  disableGlobalTransaction = false
}

client {
  async.commit.buffer.limit = 10000
  lock {
    retry.internal = 10
    retry.times = 30
  }
  report.retry.count = 5
  tm.commit.retry.count = 1
  tm.rollback.retry.count = 1
}

transaction {
  undo.data.validation = true
  undo.log.serialization = "jackson"
  undo.log.save.days = 7
  #schedule delete expired undo_log in milliseconds
  undo.log.delete.period = 86400000
  undo.log.table = "undo_log"
}

support {
  ## spring
  spring {
    # auto proxy the DataSource bean
    datasource.autoproxy = false
  }
}

registry.conf 

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
   serviceUrl = "http://127.0.0.1:8761/eureka"
    application = "middle-service-seata"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

4、其它服務配置重復上面的步驟既可,注意紅色部分

5、最后啟動項目

可以在 seata 控制台中看到注冊信息表示注冊成功,然后就可以再代碼中使用@GlobalTransactional注解了

 

處理 xid 跨傳遞以及全局異常捕獲的問題

xid 無法傳遞的話會出現跨服務事務 id 不一樣的問題,導致無法全局回滾,

原理就是

服務 A 調用 -- > 服務 B

服務 A將 xid 放入 header 中,服務 B使用攔截器將 xid 放到上下文

注意,springcloud alibaba 那套才有自帶處理 xid 傳遞的方案,其它架構參考源碼處理,下面貼一下我的處理辦法吧,僅供參考

feign 攔截器添加xid 判斷,如果存在將 xid 放入到header 中

 

import com.icsshs.xxxx.common.pojo.NonRequestAttributes;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import io.seata.core.context.RootContext;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * feign 攔截器
 * 1、傳遞 request
 * 2、seata xid 傳遞
 *
 * @author zhongzm
 */
@Configuration
public class FeignInterceptor implements RequestInterceptor {

    private final String AUTHORIZATION_HEADER = "Authorization";

    private final String BEARER_TOKEN_TYPE = "Bearer";

    @Override
    public void apply(RequestTemplate requestTemplate) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        if (requestAttributes != null) {
            if (requestAttributes instanceof ServletRequestAttributes) {
                HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest();
                Map<String, String> headers = getHeaders(request);
                for (String headerName : headers.keySet()) {
                    requestTemplate.header(headerName, getHeaders(request).get(headerName));
                }
            } else if (requestAttributes instanceof NonRequestAttributes) {
                String authorization = String.valueOf(requestAttributes.getAttribute(AUTHORIZATION_HEADER, 0));
                requestTemplate.header(AUTHORIZATION_HEADER, authorization);
            }
        }
    }

    private Map<String, String> getHeaders(HttpServletRequest request) {
        Map<String, String> map = new LinkedHashMap<>();
        Enumeration<String> enumeration = request.getHeaderNames();
        while (enumeration.hasMoreElements()) {
            String key = enumeration.nextElement();
            String value = request.getHeader(key);
            map.put(key, value);
        }
        //全局事務判斷是否存在 xid,通過 header 傳遞
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            map.put(RootContext.KEY_XID, xid);
        }
        return map;
    }

}

 

添加 mvc 適配器,將 seata 源碼的TransactionPropagationIntercepter添加到攔截器中

import io.seata.integration.http.TransactionPropagationIntercepter;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * Springmvc Intercepter.
 *
 * @author pgy
 * @date 2021/9/29 5:54 下午
 **/
@Configuration
public class MvcConfigurer implements WebMvcConfigurer {

    /**
     * 添加攔截器
     *
     * @param registry
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        //seata事務傳遞攔截器
        registry.addInterceptor(new TransactionPropagationIntercepter());
    }

}

然后seata 和全局異常也有沖突,不清楚是我配置的問題還是官方就有這個問題,我這邊的處理方案是再全局異常的地方手動添加回滾,

可以參考官方的異常處理方案 io.seata.integration.http.HttpHandlerExceptionResolver

下面是我們的處理方案,這邊我是手動回滾  

 /**
     * 捕獲運行時異常
     *
     * @param req
     * @param e
     * @return
     * @throws Exception
     */
    @ExceptionHandler(value = RuntimeException.class)
    @ResponseBody
    public HsResponse runtimeExceptionHandler(HttpServletRequest req,
                                              Exception e) throws Exception {
        String errorMsg = e.getMessage();
        //RuntimeException判斷是否開啟全局事務,然后手動回滾
        if (RootContext.inGlobalTransaction()) { GlobalTransactionContext.reload(RootContext.getXID()).rollback(); XidResource.cleanXid(RootContext.getXID()); }
        logger.error(ExceptionUtils.getFullStackTrace(e));
        return new HsResponse().error(errorMsg);
    }

 

最后如果是公共模塊記得再項目啟動的時候導入配置

@EnableEurekaClient
@EnableFeignClients
@ImportAutoConfiguration(value = {
        com.icsshs.xxxx.common.config.GlobalExceptionHandler.class,
        com.icsshs.xxxx.common.config.MvcConfigurer.class,})
@SpringBootApplication
public class XxxxxxxxxxApplication {

    public static void main(String[] args) {
        SpringApplication.run(XxxxxxxxxxxxApplication.class, args);
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM