Spring如何支持分布式事務


一、分布式事務介紹

名詞解釋

XA :XA規范的目的是允許多個資源(如數據庫,應用服務器,消息隊列,等等)在同一事務中訪問,這樣可以使ACID屬性跨越應用程序而保持有效。XA使用兩階段提交來保證所有資源同時提交或回滾任何特定的事務。

JTA: Java事務API(Java Transaction API,簡稱JTA ) 是一個Java企業版 的應用程序接口,在Java環境中,允許完成跨越多個XA資源的分布式事務。

分布式事務要解決的問題

把不同支援放到一個事物中,實現ACID。舉例說明,一個方法中需要操作兩個數據庫 db1,db2, 本地事物是基於connection ,所以無法保證兩個庫的事物,這是后需要用到分布式事物。

分布式事務原理

兩階段提交。簡單來說,引入事務管理器(txManager)的概念,開啟事務前,txManager 創建一個 tx ,txId 是全局事務的唯一標示, 方法中db1操作完成后,告知tx  db1操作成功,但db1沒有真的提交,而是block住了。db2 繼續執行,執行完了自己block住,然后告知txManager,這個事物可以提交了,然后txManager 通知db1 db2 你們可以真正提交了,事物結束。

db block 住屬於第一階段, 真正提交或者回滾屬於第二階段,這就是兩階段提交。

二、java事務類型

Java事務的類型有三種:JDBC事務、JTA(Java Transaction API)事務、容器事務。 常見的容器事務如Spring事務,容器事務主要是J2EE應用服務器提供的,容器事務大多是基於JTA完成,這是一個基於JNDI的,相當復雜的API實現。本人不推薦使用容器事務。

JDBC事務

JDBC的一切行為包括事務是基於一個Connection的,在JDBC中是通過Connection對象進行事務管理。在JDBC中,常用的和事務相關的方法是: setAutoCommit、commit、rollback等。

事務案例
public void JdbcTransfer() {
    java.sql.Connection conn = null;
     try{
        conn = conn =DriverManager.getConnection("jdbc:oracle:thin:@host:1521:SID","username","userpwd");
         // 將自動提交設置為 false,
         //若設置為 true 則數據庫將會把每一次數據更新認定為一個事務並自動提交
         conn.setAutoCommit(false);

         stmt = conn.createStatement();
         // 將 A 賬戶中的金額減少 500
         stmt.execute("\
         update t_account set amount = amount - 500 where account_id = 'A'");
         // 將 B 賬戶中的金額增加 500
         stmt.execute("\
         update t_account set amount = amount + 500 where account_id = 'B'");

         // 提交事務
         conn.commit();
         // 事務提交:轉賬的兩步操作同時成功
     } catch(SQLException sqle){
         try{
             // 發生異常,回滾在本事務中的操做
            conn.rollback();
             // 事務回滾:轉賬的兩步操作完全撤銷
             stmt.close();
             conn.close();
         }catch(Exception ignore){

         }
         sqle.printStackTrace();
     }
}
JDBC事務的優缺點

JDBC為使用Java進行數據庫的事務操作提供了最基本的支持。通過JDBC事務,我們可以將多個SQL語句放到同一個事務中,保證其ACID特性。JDBC事務的主要優點就是API比較簡單,可以實現最基本的事務操作,性能也相對較好。

但是,JDBC事務有一個局限:一個 JDBC 事務不能跨越多個數據庫!!!所以,如果涉及到多數據庫的操作或者分布式場景,JDBC事務就無能為力了。

容器事務

容器事務也是基於jndi實現的。

jndi(java naming directory interface),可以把JNDI看成一個全局的目錄服務接口,實現了這個接口的類可以提供你想要的東西,不管這個東西是什么,只要注冊到了目錄中就可以被找到並且返回給你。有點像webservbice。

Spring配置JNDI和通過JNDI獲取DataSource。

SpringJNDI數據源配置信息
<bean id="dataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName">
        <value>java:comp/env/myDataSourceJNDI</value>
    </property>
</bean>    

上面<value>中myDataSourceJNDI是tomcat或者其他應用服務器配置的JNDI。

關於JNDI的配置(tomcat中)

修改tomcat目錄conf/context.xml文件:

<Resource name="myDataSourceJNDI"
  auth="Container" type="javax.sql.DataSource"
  maxActive="100" maxIdle="30" maxWait="10000" username="root"
  password="root" driverClassName="oracle.jdbc.driver.OracleDriver"
  url="jdbc:oracle:thin:@127.0.0.1:1521:TEST"/>
通過JNDI獲取DataSource
Context context = new InitialContext();
DataSource ds = (DataSource)context.lookup("java:comp/env/myDataSourceJNDI");

它和jdbc的區別是,jdbc是java去找數據庫驅動,jndi是通過你的服務器配置(如Tomcat)的配置文件context來找數據庫驅動。

正如上面所見,致命缺陷,由於jndi要訪問容器組件(tomcat)的配置,所以耦合比較嚴重,不喜歡用。

JTA事務
為什么需要JTA

通常,JDBC事務就可以解決數據的一致性等問題,鑒於他用法相對簡單,所以很多人關於Java中的事務只知道有JDBC事務,或者有人知道框架中的事務(比如Hibernate、Spring)等。但是,由於JDBC無法實現分布式事務,而如今的分布式場景越來越多,所以,JTA事務就應運而生。

如果,你在工作中沒有遇到JDBC事務無法解決的場景,那么只能說你做的項目還都太小。拿電商網站來說,我們一般把一個電商網站橫向拆分成商品模塊、訂單模塊、購物車模塊、消息模塊、支付模塊等。然后我們把不同的模塊部署到不同的機器上,各個模塊之間通過遠程服務調用(RPC)等方式進行通信。以一個分布式的系統對外提供服務。

一個支付流程就要和多個模塊進行交互,每個模塊都部署在不同的機器中,並且每個模塊操作的數據庫都不一致,這時候就無法使用JDBC來管理事務。我們看一段代碼:

/** 支付訂單處理 **/
@Transactional(rollbackFor = Exception.class)
public void completeOrder() {
    orderDao.update(); // 訂單服務本地更新訂單狀態
    accountService.update(); // 調用資金賬戶服務給資金帳戶加款
    pointService.update(); // 調用積分服務給積分帳戶增加積分
    accountingService.insert(); // 調用會計服務向會計系統寫入會計原始憑證
    merchantNotifyService.notify(); // 調用商戶通知服務向商戶發送支付結果通知
}

上面的代碼是一個簡單的支付流程的操作,其中調用了五個服務,這五個服務都通過RPC的方式調用,請問使用JDBC如何保證事務一致性?我在方法中增加了@Transactional注解,但是由於采用調用了分布式服務,該事務並不能達到ACID的效果。

JTA事務比JDBC事務更強大。一個JTA事務可以有多個參與者,而一個JDBC事務則被限定在一個單一的數據庫連接。下列任一個Java平台的組件都可以參與到一個JTA事務中:JDBC連接、JDO PersistenceManager 對象、JMS 隊列、JMS 主題、企業JavaBeans(EJB)、一個用J2EE Connector Architecture 規范編譯的資源分配器。

JTA的定義

Java事務API(Java Transaction API,簡稱JTA ) 是一個Java企業版 的應用程序接口,在Java環境中,允許完成跨越多個XA資源的分布式事務。

 JTA和它的同胞Java事務服務(JTS;Java TransactionService),為J2EE平台提供了分布式事務服務。不過JTA只是提供了一個接口,並沒有提供具體的實現,而是由j2ee服務器提供商根據JTS規范提供的,常見的JTA實現有以下幾種:

  • J2EE容器所提供的JTA實現(JBoss)
  • 獨立的JTA實現:如JOTM,Atomikos.這些實現可以應用在那些不使用J2EE應用服務器的環境里用以提供分布事事務保證。如Tomcat,Jetty以及普通的java應用。

JTA里面提供了 java.transaction.UserTransaction ,里面定義了下面幾個方法

begin:開啟一個事務

commit:提交當前事務

rollback:回滾當前事務

setRollbackOnly:把當前事務標記為回滾

setTransactionTimeout:設置事務的事件,超過這個事件,就拋出異常,回滾事務

這里,值得注意的是,不是使用了UserTransaction就能把普通的JDBC操作直接轉成JTA操作,JTA對DataSource、Connection和Resource 都是有要求的,只有符合XA規范,並且實現了XA規范的相關接口的類才能參與到JTA事務中來。目前主流的數據庫都支持XA規范。

要想使用用 JTA 事務,那么就需要有一個實現 javax.sql.XADataSource 、javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驅動程序。一個實現了這些接口的驅動程序將可以參與 JTA 事務。一個 XADataSource 對象就是一個 XAConnection 對象的工廠。XAConnection 是參與 JTA 事務的 JDBC 連接。

要使用JTA事務,必須使用XADataSource來產生數據庫連接,產生的連接為一個XA連接。

XA連接(javax.sql.XAConnection)和非XA(java.sql.Connection)連接的區別在於:XA可以參與JTA的事務,而且不支持自動提交。

 

public void JtaTransfer() {
        javax.transaction.UserTransaction tx = null;
        java.sql.Connection conn = null;
         try{
             tx = (javax.transaction.UserTransaction) context.lookup("java:comp/UserTransaction");  //取得JTA事務,本例中是由Jboss容器管理
             javax.sql.DataSource ds = (javax.sql.DataSource) context.lookup("java:/XAOracleDS");  //取得數據庫連接池,必須有支持XA的數據庫、驅動程序
             tx.begin();
            conn = ds.getConnection();

             // 將自動提交設置為 false,
             //若設置為 true 則數據庫將會把每一次數據更新認定為一個事務並自動提交
             conn.setAutoCommit(false);

             stmt = conn.createStatement();
             // 將 A 賬戶中的金額減少 500
             stmt.execute("\
             update t_account set amount = amount - 500 where account_id = 'A'");
             // 將 B 賬戶中的金額增加 500
             stmt.execute("\
             update t_account set amount = amount + 500 where account_id = 'B'");

             // 提交事務
             tx.commit();
             // 事務提交:轉賬的兩步操作同時成功
         } catch(SQLException sqle){
             try{
                 // 發生異常,回滾在本事務中的操做
              tx.rollback();
                 // 事務回滾:轉賬的兩步操作完全撤銷
                 stmt.close();
                 conn.close();
             }catch(Exception ignore){

             }
             sqle.printStackTrace();
         }
     }

上面的例子就是一個使用JTA事務的轉賬操作,該操作相對依賴於J2EE容器,並且需要通過JNDI的方式獲取UserTransaction和Connection。

標准的分布式事務

一個分布式事務(Distributed Transaction)包括一個事務管理器(transaction manager)和一個或多個資源管理器(resource manager)。一個資源管理器(resource manager)是任意類型的持久化數據存儲。事務管理器(transaction manager)承擔着所有事務參與單元者的相互通訊的責任。

JTA的實現方式也是基於以上這些分布式事務參與者實現的,具體的關於JTA的實現細節不是本文的重點,感興趣的同學可以閱讀JTA 深度歷險 – 原理與實現

看上面關於分布式事務的介紹是不是和2PC中的事務管理比較像?的卻,2PC其實就是符合XA規范的事務管理器協調多個資源管理器的一種實現方式。 我之前有幾篇文章關於2PC和3PC的,那幾篇文章中介紹過分布式事務中的事務管理器是如何協調多個事務的統一提交或回滾的,后面我還會有幾篇文章詳細的介紹一下和分布式事務相關的內容,包括但不限於全局事務、DTP模型、柔性事務等。

JTA的優缺點

JTA的優點很明顯,就是提供了分布式事務的解決方案,嚴格的ACID。但是,標准的JTA方式的事務管理在日常開發中並不常用,因為他有很多缺點:

  • 實現復雜,通常情況下,JTA UserTransaction需要從JNDI獲取。這意味着,如果我們使用JTA,就需要同時使用JTA和JNDI。
  • JTA本身就是個笨重的API
  • 通常JTA只能在應用服務器環境下使用,因此使用JTA會限制代碼的復用性。
在spring中使用JTA分布式事務

spring的org.springframework.transaction.jta.JtaTransactionManager,提供了分布式事務支持。如果使用WAS的JTA支持,把它的屬性改為WebSphere對應TransactionManager。
在tomcat下,是沒有分布式事務的,不過可以借助於第三方軟件jotm(Java Open Transaction Manager )和AtomikosTransactionsEssentials實現,在spring中分布式事務是通過jta(jotm,atomikos)來進行實現。

三、分布式事務實現方案

提示:多數據源的切換一般可以根據項目結構划分(即哪些包路徑是使用哪個數據源,在一開始就初始化所有關系。)

Atomikos的JTA事務實現
引入依賴 
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>5.1.16.RELEASE</version>
</dependency>
<!--atomikos是分布式事務使用,jta是spring整合分布式事務使用 -->
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>5.0.8</version>
</dependency>
<dependency>
    <groupId>javax.transaction</groupId>
    <artifactId>jta</artifactId>
    <version>1.1</version>
</dependency>

<!-- druidDataSource, 支持XA規范 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.2.3</version>
</dependency>

<!-- mysql, 它提供了MysqlXADataSource支持XA規范 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>
數據源配置

db.properties

# 數據源1
ps.datasource.one.uniqueResourceName=first_unique_db
ps.datasource.one.driverClassName=com.mysql.jdbc.Driver
ps.datasource.one.jdbcUrl=jdbc:mysql://localhost:3305/spring?useTimezone=true&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useUnicode=true&characterEncoding=utf-8&tcpRcvBuf=1024000&useOldAliasMetadataBehavior=true&useSSL=false&rewriteBatchedStatements=true&useAffectedRows=true
ps.datasource.one.username=root
ps.datasource.one.password=123456
ps.datasource.one.poolSize=2
ps.datasource.one.minPoolSize=1
ps.datasource.one.maxPoolSize=5
#獲取連接失敗重新獲等待最大時間,在這個時間內如果有可用連接,將返回
ps.datasource.one.borrowConnectionTimeout=60
#最大獲取數據時間,如果不設置這個值,Atomikos使用默認的5分鍾,那么在處理大批量數據讀取的時候,一旦超過5分鍾,就會拋出類似 Resultset is close 的錯誤
ps.datasource.one.reapTimeout=20
#最大閑置時間,超過最小連接池連接的連接將關閉
ps.datasource.one.maxIdleTime=20
#連接回收時間
ps.datasource.one.maintenanceInterval=20
#java數據庫連接池,最大可等待獲取dataSource的時間
ps.datasource.one.loginTimeout=60
ps.datasource.one.testQuery=select 1

# 數據源2
ps.datasource.two.uniqueResourceName=second_unique_db
ps.datasource.two.driverClassName=com.mysql.jdbc.Driver
ps.datasource.two.jdbcUrl=jdbc:mysql://localhost:3305/mos_sp?useTimezone=true&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useUnicode=true&characterEncoding=utf-8&tcpRcvBuf=1024000&useOldAliasMetadataBehavior=true&useSSL=false&rewriteBatchedStatements=true&useAffectedRows=true
ps.datasource.two.username=root
ps.datasource.two.password=123456
ps.datasource.two.poolSize=2
ps.datasource.two.minPoolSize=1
ps.datasource.two.maxPoolSize=5
#獲取連接失敗重新獲等待最大時間,在這個時間內如果有可用連接,將返回
ps.datasource.two.borrowConnectionTimeout=60
#最大獲取數據時間,如果不設置這個值,Atomikos使用默認的5分鍾,那么在處理大批量數據讀取的時候,一旦超過5分鍾,就會拋出類似 Resultset is close 的錯誤
ps.datasource.two.reapTimeout=20
#最大閑置時間,超過最小連接池連接的連接將關閉
ps.datasource.two.maxIdleTime=20
#連接回收時間
ps.datasource.two.maintenanceInterval=20
#java數據庫連接池,最大可等待獲取dataSource的時間
ps.datasource.two.loginTimeout=60
ps.datasource.two.testQuery=select 1

數據源1

@Component
@PropertySource("classpath:db.properties")
public class FirstDbConfigBean {

    @Value("${ps.datasource.one.uniqueResourceName}")
    private String uniqueResourceName;
    @Value("${ps.datasource.one.driverClassName}")
    private String driverClassName;
    @Value("${ps.datasource.one.jdbcUrl}")
    private String jdbcUrl;
    @Value("${ps.datasource.one.username}")
    private String username;
    @Value("${ps.datasource.one.password}")
    private String password;
    @Value("${ps.datasource.one.poolSize}")
    private int poolSize;
    @Value("${ps.datasource.one.minPoolSize}")
    private int minPoolSize;
    @Value("${ps.datasource.one.maxPoolSize}")
    private int maxPoolSize;
    @Value("${ps.datasource.one.borrowConnectionTimeout}")
    private int borrowConnectionTimeout;
    @Value("${ps.datasource.one.reapTimeout}")
    private int reapTimeout;
    @Value("${ps.datasource.one.maxIdleTime}")
    private int maxIdleTime;
    @Value("${ps.datasource.one.maintenanceInterval}")
    private int maintenanceInterval;
    @Value("${ps.datasource.one.loginTimeout}")
    private int loginTimeout;
    @Value("${ps.datasource.one.testQuery}")
    private String testQuery;

    public String getUniqueResourceName() {
        return uniqueResourceName;
    }

    public void setUniqueResourceName(String uniqueResourceName) {
        this.uniqueResourceName = uniqueResourceName;
    }

    public String getDriverClassName() {
        return driverClassName;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public String getJdbcUrl() {
        return jdbcUrl;
    }

    public void setJdbcUrl(String jdbcUrl) {
        this.jdbcUrl = jdbcUrl;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getPoolSize() {
        return poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getReapTimeout() {
        return reapTimeout;
    }

    public void setReapTimeout(int reapTimeout) {
        this.reapTimeout = reapTimeout;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }
}

數據源2

@Component
@PropertySource("classpath:db.properties")
public class SecondDbConfigBean {

    @Value("${ps.datasource.two.uniqueResourceName}")
    private String uniqueResourceName;
    @Value("${ps.datasource.two.driverClassName}")
    private String driverClassName;
    @Value("${ps.datasource.two.jdbcUrl}")
    private String jdbcUrl;
    @Value("${ps.datasource.two.username}")
    private String username;
    @Value("${ps.datasource.two.password}")
    private String password;
    @Value("${ps.datasource.two.poolSize}")
    private int poolSize;
    @Value("${ps.datasource.two.minPoolSize}")
    private int minPoolSize;
    @Value("${ps.datasource.two.maxPoolSize}")
    private int maxPoolSize;
    @Value("${ps.datasource.two.borrowConnectionTimeout}")
    private int borrowConnectionTimeout;
    @Value("${ps.datasource.two.reapTimeout}")
    private int reapTimeout;
    @Value("${ps.datasource.two.maxIdleTime}")
    private int maxIdleTime;
    @Value("${ps.datasource.two.maintenanceInterval}")
    private int maintenanceInterval;
    @Value("${ps.datasource.two.loginTimeout}")
    private int loginTimeout;
    @Value("${ps.datasource.two.testQuery}")
    private String testQuery;

    public String getUniqueResourceName() {
        return uniqueResourceName;
    }

    public void setUniqueResourceName(String uniqueResourceName) {
        this.uniqueResourceName = uniqueResourceName;
    }

    public String getDriverClassName() {
        return driverClassName;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public String getJdbcUrl() {
        return jdbcUrl;
    }

    public void setJdbcUrl(String jdbcUrl) {
        this.jdbcUrl = jdbcUrl;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getPoolSize() {
        return poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getBorrowConnectionTimeout() {
        return borrowConnectionTimeout;
    }

    public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
        this.borrowConnectionTimeout = borrowConnectionTimeout;
    }

    public int getReapTimeout() {
        return reapTimeout;
    }

    public void setReapTimeout(int reapTimeout) {
        this.reapTimeout = reapTimeout;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public int getMaintenanceInterval() {
        return maintenanceInterval;
    }

    public void setMaintenanceInterval(int maintenanceInterval) {
        this.maintenanceInterval = maintenanceInterval;
    }

    public int getLoginTimeout() {
        return loginTimeout;
    }

    public void setLoginTimeout(int loginTimeout) {
        this.loginTimeout = loginTimeout;
    }

    public String getTestQuery() {
        return testQuery;
    }

    public void setTestQuery(String testQuery) {
        this.testQuery = testQuery;
    }
}

數據源及JTA事務配置

/**
 * 多數據源jta事務支持
 */
@Configuration
@EnableTransactionManagement
public class AtomikosJtaDataSource {
    /**
     * @MethodName firstDataSource
     * @Description 數據源1
     */
    @Bean("firstDataSource")
    public DataSource firstDataSource(FirstDbConfigBean configBean) throws Exception {
        AtomikosDataSourceBean firstDataSourceBean = new AtomikosDataSourceBean();
        // 不能為空,否則報錯:Property 'uniqueResourceName' cannot be null
        firstDataSourceBean.setUniqueResourceName(configBean.getUniqueResourceName());
        DruidXADataSource xaDataSource = new DruidXADataSource();
        xaDataSource.setUrl(configBean.getJdbcUrl());
        xaDataSource.setDriverClassName(configBean.getDriverClassName());
        xaDataSource.setUsername(configBean.getUsername());
        xaDataSource.setPassword(configBean.getPassword());
        firstDataSourceBean.setXaDataSource(xaDataSource);
        firstDataSourceBean.setMaxIdleTime(configBean.getMaxIdleTime());
        firstDataSourceBean.setMinPoolSize(configBean.getMinPoolSize());
        firstDataSourceBean.setPoolSize(configBean.getPoolSize());
        firstDataSourceBean.setMaxPoolSize(configBean.getMaxPoolSize());
        firstDataSourceBean.setTestQuery(configBean.getTestQuery());
        firstDataSourceBean.setReapTimeout(configBean.getReapTimeout());
        firstDataSourceBean.setMaintenanceInterval(configBean.getMaintenanceInterval());
        firstDataSourceBean.setLoginTimeout(configBean.getLoginTimeout());
        firstDataSourceBean.setBorrowConnectionTimeout(configBean.getBorrowConnectionTimeout());
        return firstDataSourceBean;
    }

    /**
     * @MethodName secondDataSource
     * @Description 數據源2
     */
    @Bean("secondDataSource")
    public DataSource secondDataSource(SecondDbConfigBean configBean) throws Exception {
        AtomikosDataSourceBean secondDataSourceBean = new AtomikosDataSourceBean();
        // 不能為空,否則報錯:Property 'uniqueResourceName' cannot be null
        secondDataSourceBean.setUniqueResourceName(configBean.getUniqueResourceName());
        DruidXADataSource xaDataSource = new DruidXADataSource();
        xaDataSource.setUrl(configBean.getJdbcUrl());
        xaDataSource.setDriverClassName(configBean.getDriverClassName());
        xaDataSource.setUsername(configBean.getUsername());
        xaDataSource.setPassword(configBean.getPassword());
        secondDataSourceBean.setXaDataSource(xaDataSource);
        secondDataSourceBean.setMaxIdleTime(configBean.getMaxIdleTime());
        secondDataSourceBean.setMinPoolSize(configBean.getMinPoolSize());
        secondDataSourceBean.setPoolSize(configBean.getPoolSize());
        secondDataSourceBean.setMaxPoolSize(configBean.getMaxPoolSize());
        secondDataSourceBean.setTestQuery(configBean.getTestQuery());
        secondDataSourceBean.setReapTimeout(configBean.getReapTimeout());
        secondDataSourceBean.setMaintenanceInterval(configBean.getMaintenanceInterval());
        secondDataSourceBean.setLoginTimeout(configBean.getLoginTimeout());
        secondDataSourceBean.setBorrowConnectionTimeout(configBean.getBorrowConnectionTimeout());
        return secondDataSourceBean;
    }

    @Bean("firstJdbcTemplate")
    public JdbcTemplate firstJdbcTemplate(DataSource firstDataSource){
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(firstDataSource);
        return jdbcTemplate;
    }

    @Bean("secondJdbcTemplate")
    public JdbcTemplate secondJdbcTemplate(DataSource secondDataSource){
        JdbcTemplate jdbcTemplate = new JdbcTemplate();
        jdbcTemplate.setDataSource(secondDataSource);
        return jdbcTemplate;
    }


    /**
     * @MethodName transactionManager
     * @Description 事務管理器,包裝了atomikos事務
     */
    @Bean
    public JtaTransactionManager transactionManager() throws Exception {
        JtaTransactionManager transactionManager = new JtaTransactionManager();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        userTransactionManager.setTransactionTimeout(3000);
        transactionManager.setUserTransaction(userTransactionManager);
        transactionManager.setAllowCustomIsolationLevels(true);
        return transactionManager;
    }

    /**
     * @MethodName transactionTemplate
     * @Description spring 事務模板,包裝了atomikos事務, 主要用於編程式事務
     */
    @Bean
    public TransactionTemplate transactionTemplate() throws Exception {
        TransactionTemplate transactionTemplate = new TransactionTemplate();
        JtaTransactionManager transactionManager = new JtaTransactionManager();
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        userTransactionManager.setTransactionTimeout(3000);
        transactionManager.setUserTransaction(userTransactionManager);
        transactionManager.setAllowCustomIsolationLevels(true);
        transactionTemplate.setTransactionManager(transactionManager);

        return transactionTemplate;
    }
}

注意:需要在SpringConfig.java中添加@Import(value = {AtomikosJtaDataSource.class})。

測試

AtomikosService.java

public interface AtomikosService {

    void saveEntity();
}

AtomikosServiceImpl.java

@Service
public class AtomikosServiceImpl implements AtomikosService {

    @Resource(name = "firstJdbcTemplate")
    private JdbcTemplate firstJdbcTemplate;

    @Resource(name = "secondJdbcTemplate")
    private JdbcTemplate secondJdbcTemplate;

    @Transactional
    @Override
    public void saveEntity() {
        String userCode = UUID.randomUUID().toString();
        String sql_1 = "insert into os_user(user_name, age) values(?, ?)";
        firstJdbcTemplate.update(sql_1, userCode, 23);
        String sql_2 = "insert into os_order(order_id, user_code) values(?, ?)";
        secondJdbcTemplate.update(sql_2, UUID.randomUUID().toString(), userCode);
        //int a = 1/0;
    }
}
@RunWith(SpringRunner.class)
@WebAppConfiguration
@ContextHierarchy({
        @ContextConfiguration(classes = SpringConfig.class),
        @ContextConfiguration(classes = SpringMVCConfig.class)
})
public class JtaTest {


    @Autowired
    private AtomikosService atomikosService;

    @Test
    public void testAtomikos() {
        atomikosService.saveEntity();
    }
}

可能大家也發現了,上面的代碼中注入了多個JdbcTemplate實例,麻煩地很。我們可以這樣解決:使用Map<String, JdbcTemplate> jdbcTemplateMap = applicationContext.getBeansOfType(JdbcTemplate.class);將所有的實例起來,用的時候根據beanName取下。這是最簡單粗暴的方式。

消息事務+最終一致性

所謂的消息事務就是基於消息中間件的兩階段提交,本質上是對消息中間件的一種特殊利用,它是將本地事務和發消息放在了一個分布式事務里,保證要么本地操作成功成功並且對外發消息成功,要么兩者都失敗,開源的RocketMQ就支持這一特性。

基於消息中間件的兩階段提交往往用在高並發場景下,將一個分布式事務拆成一個消息事務(A系統的本地操作+發消息)+B系統的本地操作,其中B系統的操作由消息驅動,只要消息事務成功,那么A操作一定成功,消息也一定發出來了,這時候B會收到消息去執行本地操作,如果本地操作失敗,消息會重投,直到B操作成功,這樣就變相地實現了A與B的分布式事務。

缺點:

  • 不是嚴格一致而是最終一致
  • 存在一定的風險,如上如果A執行成功,B始終執行不成功,那就完蛋了。

優點:

  • 性能大幅度提升。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM