分布式事務解決方案--Seata源碼解析


前言

分布式項目只要有業務交互就會涉及到分布式事務問題,事務通常分為三步:創建事務、執行事務、提交事務或回滾事務,單機模式下只要一個事務可以依賴數據庫的事務實現,而分布式事務往往涉及到多個項目多個數據庫的同步更新操作,此時就需要有一套分布式事務解決方案,否則就會出現分布式系統數據不一致的問題。所以分布式事務解決就是要將多個事務當作一個事務來執行,要么全部提交成功要么全部回滾。

一、常用分布式解決方案

1、基於XA的二階段提交

XA是X/Open組織提出的二階段提交的分布式事務規范,XA規范核心角色是事務管理器(Transaction Manager) 和 資源管理器 (Resource Manager),資源管理器就是本地的事物,事務管理器負責協調各個資源管理器事務的提交和回滾。

 

 

二階段提交主要將事務拆分成功兩個階段

1、事務准備階段:事務管理器通知各個資源管理器准備事務,各個資源管理器接受到通知之后在本地執行事務但是不提交,並且寫好本地的redo和undo日志,然后將事務執行結果上報給事務管理器

2、事務提交階段:事務管理器根據各個資源管理器上報的結果決定是提交事務還是回滾事務,然后通知各個資源管理器,各個資源管理器本地提交事務或回滾事務,然后再上報提交或回滾的結果給事務管理器

 

二階段提交方案的問題

1、第一階段基本上不會有什么問題,如果有問題的話直接回滾即可,一般會出問題的可能是第二階段

2、第二階段事務管理器下發通知之后,各個資源管理器就開始執行提交或回滾,但是此時還是會出現提交或回滾失敗的問題。比如以下場景:

事務管理器通知提交,資源管理器1接收到提交指令提交本地事務;資源管理器2由於宕機沒有接收到提交指令而沒有提交事務,此時就會出現事務提交不一致的問題了。

3、異常問題處理方案

a.本地資源管理器在執行事務時需要記錄redo和undo日志

b.當資源管理器宕機重啟時,根據本地redo和undo日志,詢問事務管理器當前事務的執行狀態,然后繼續提交或回滾事務

c.當事務管理器宕機時,如果指令未下發,可以通過記錄事務日志的方式根據本地日志進行下發指令;如果指令已經下發,那么各個資源管理器已經提交或回滾事務了,也沒有問題

d.第二階段由於只需要執行提交和回滾命令,而事務中的業務實際已經執行完成,所以提交或回滾命令的執行時間是非常短的,所以出現問題的概率本身就不是很大

4、二階段提交還有一個問題是整體流程較長,就會導致各個資源管理器占用鎖的時間就變長了,必須等待事務提交完成才會釋放鎖資源,所以高並發情況下不建議采用,否則會導致大量的請求被阻塞在鎖競爭上

 

2、基於TCC的事務補償方案

事務分成事務執行、事務提交和事務回滾,TCC本質就是實現了事務的三步,包括事務執行(Try)、事務提交(Confirm)、事務回滾(Cancel),TCC核心思想是針對每一個操作都注冊一個對應的確認和取消操作。

事務開始時,業務應用會向事務協調器注冊啟動事務。之后業務應用會調用所有服務的try接口,完成一階段准備。之后事務協調器會根據try接口返回情況,決定調用confirm接口或者cancel接口。如果接口調用失敗,會進行重試。
TCC方案讓應用自己定義數據庫操作的粒度,使得降低鎖沖突、提高吞吐量成為可能。 當然TCC方案也有不足之處,集中表現在以下兩個方面:

a.對應用的侵入性強。業務邏輯的每個分支都需要實現try、confirm、cancel三個操作,應用侵入性較強,改造成本高。

b.實現難度較大。需要按照網絡狀態、系統故障等不同的失敗原因實現不同的回滾策略。為了滿足一致性的要求,confirm和cancel接口必須實現冪等。

上述原因導致TCC方案大多被研發實力較強、有迫切需求的大公司所采用。微服務倡導服務的輕量化、易部署,而TCC方案中很多事務的處理邏輯需要應用自己編碼實現,復雜且開發量大。
 

3、基於MQ的最終一致性方案

基於MQ的一致性相當於是異步實現的分布式事務,將事務分成了兩個不關聯的本地事務,基於MQ進行同步,根據最終結果實現一致性從而達到分布式事務。但是最終一致性方案的一致性會有所延遲,並不能保證實時的一致性,另外還需要保證MQ是高可用且不會丟失消息的情況,否則可能會丟失事務的一部分。

 

二、基於Seata實現的分布式事務解決方案

Seata是阿里開源的分布式事務解決方案,Seata的前身是阿里開源的分布式事務中間件Fescar,后來更名為Seata,意思是Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事務解決方案。

Seata是基於XA的二階段提交方案演變而來的,核心角色分成了三個:

Transcation Coordinator(TC):事務協調器,負責維護全局事務的狀態,以及協調全局事務的提交和回滾

Transcation Manager(TM):控制全局事務的邊界,負責開啟一個全局事務,並發起最終的全局事務的提交或回滾

Resource Manager(RM):資源管理器,控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

一次完整的分布式事務過程如下:

1、TM向TC申請一個全局事務,TC創建一個新的全局事務,並有一個全局事務唯一ID為XID

2、XID在微服務調用鏈路的上下文中用於傳播

3、RM向TC注冊分支事務,並由XID對應的全局事務管理

4、TM向TC發起全局事務XID的提交或回滾決議

5、TC調用XID管理的所有分支事務進行提交或回滾

Seata和XA有點類似都是進行了兩個階段,但是有所不同,

1、XA的資源管理器本質是數據庫,所以比較依賴數據庫的事務;而Seata的資源管理器是第三方包的形式嵌入在應用程序中,更加的靈活

2、XA的事務占有鎖資源需要等到第二階段完成才會釋放;而Seata是在第一階段就將所有的事務進行提交,因為90%以上的請求都會執行成功,所以第二階段回滾的概率一般不會太高,所以將提交事務的工作放在第一階段,可以保證雖然全局事務沒有結束,但是分支事務可以提前先提交好,如果真的出現異常情況,再通過第二階段回滾即可,這樣就極大的建設了各個分支事務的資源占有時間,提高了整體吞吐量。

 

三、Seata的實現細節

3.1、Seata的事務如何回滾?

Seata是通過中間件的方式嵌入應用程序的,Seata的 JDBC 數據源代理通過對業務 SQL 的解析,把業務數據在更新前后的數據鏡像組織成回滾日志,利用 本地事務 的 ACID 特性,將業務數據的更新和回滾日志的寫入在同一個 本地事務中提交。這樣就可以保證任何提交的業務數據的更新一定有相應的回滾日志存在。基於這樣的邏輯Seata可以在第一階段就將分支事務直接提交,而不用擔心后續的回滾。

到了第二階段,如果全局事務是提交,那么分支事務無需再提交事務,只需要異步將回滾日志刪除即可,而這個異步操作的效率比較高;如果全局事務是回滾,那么分支事務就根據XID找到對應的回滾日志進行回滾操作即可。

3.2、分布式事務的傳播

全局事務是由於事務協調器發起,實際就是調用鏈路的第一個發起方,全局事務發起之后會產生全局事務XID,而為了讓其他分支事務拿到XID,就需要XID在服務調用鏈路上進行傳播。所以服務調用機制就需要提高一套可以透傳XID的機制,比如HTTP請求的header、dubbo的Filter+RpcContext、本地方法的ThreadLocal等

 

四、Seata使用案例

業務場景:模擬下單業務,需要扣庫存,扣用戶余額,創建訂單三步,假設有訂單服務、庫存服務、賬戶服務,那么下單業務就需要調用庫存服務和訂單服務,而訂單服務又需要調用賬戶服務

案例采用Seata官方提供的Demo

1、maven依賴如下:

<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
  ~
  ~  Licensed under the Apache License, Version 2.0 (the "License");
  ~  you may not use this file except in compliance with the License.
  ~  You may obtain a copy of the License at
  ~
  ~       http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~  Unless required by applicable law or agreed to in writing, software
  ~  distributed under the License is distributed on an "AS IS" BASIS,
  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~  See the License for the specific language governing permissions and
  ~  limitations under the License.
  -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>io.seata</groupId>
        <artifactId>seata-samples</artifactId>
        <version>1.1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seata-samples-dubbo</artifactId>
    <packaging>jar</packaging>
    <name>seata-samples-dubbo ${project.version}</name>
    <dependencies>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo-registry-nacos</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.spring</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.spring</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
        </dependency>

        <!--<dependency>-->
            <!--<groupId>org.apache.dubbo</groupId>-->
            <!--<artifactId>dubbo-remoting-etcd3</artifactId>-->
        <!--</dependency>-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
View Code

 

2、數據庫初始化業務數據表腳本如下:

DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  PRIMARY KEY (`id`),
  UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
View Code

 

3、數據庫初始化seata日志表腳步如下:

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

 

4、定義服務和實現

賬戶服務接口和實現

public interface AccountService {

    /**
     * 余額扣款
     *
     * @param userId 用戶ID
     * @param money  扣款金額
     */
    void debit(String userId, int money);
}

/** 賬戶服務 */
public class AccountServiceImpl implements AccountService {

    private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);

    private JdbcTemplate jdbcTemplate;

    /**
     * Sets jdbc template.
     *
     * @param jdbcTemplate the jdbc template
     */
    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void debit(String userId, int money) {
        LOGGER.info("Account Service ... xid: " + RootContext.getXID());
        LOGGER.info("Deducting balance SQL: update account_tbl set money = money - {} where user_id = {}",money,userId);

        jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[] {money, userId});
        LOGGER.info("Account Service End ... ");
    }
}
View Code

 

庫存服務接口和實現

public interface StorageService {

    /**
     * 扣減庫存
     *
     * @param commodityCode 商品編號
     * @param count         扣減數量
     */
    void deduct(String commodityCode, int count);
}

/** 庫存服務*/
public class StorageServiceImpl implements StorageService {

    private static final Logger LOGGER = LoggerFactory.getLogger(StorageService.class);

    private JdbcTemplate jdbcTemplate;

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void deduct(String commodityCode, int count) {
        LOGGER.info("Storage Service Begin ... xid: " + RootContext.getXID());
        LOGGER.info("Deducting inventory SQL: update storage_tbl set count = count - {} where commodity_code = {}",
            count, commodityCode);

        jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?",
            new Object[] {count, commodityCode});
        LOGGER.info("Storage Service End ... ");
    }
}
View Code

 

訂單服務接口和實現

public interface OrderService {
    /**
     * 創建訂單
     *
     * @param userId        用戶ID
     * @param commodityCode 商品編號
     * @param orderCount    訂購數量
     * @return 生成的訂單 order
     */
    Order create(String userId, String commodityCode, int orderCount);
}

/** 訂單服務*/
public class OrderServiceImpl implements OrderService {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class);

    private AccountService accountService;

    private JdbcTemplate jdbcTemplate;

    @Override
    public Order create(String userId, String commodityCode, int orderCount) {
        LOGGER.info("Order Service Begin ... xid: " + RootContext.getXID());

        // 計算訂單金額
        int orderMoney = calculate(commodityCode, orderCount);

        // 從賬戶余額扣款
        accountService.debit(userId, orderMoney);

        final Order order = new Order();
        order.userId = userId;
        order.commodityCode = commodityCode;
        order.count = orderCount;
        order.money = orderMoney;

        KeyHolder keyHolder = new GeneratedKeyHolder();

        LOGGER.info("Order Service SQL: insert into order_tbl (user_id, commodity_code, count, money) values ({}, {}, {}, {})" ,userId ,commodityCode ,orderCount ,orderMoney );

        jdbcTemplate.update(new PreparedStatementCreator() {

            @Override
            public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
                PreparedStatement pst = con.prepareStatement(
                        "insert into order_tbl (user_id, commodity_code, count, money) values (?, ?, ?, ?)",
                        PreparedStatement.RETURN_GENERATED_KEYS);
                pst.setObject(1, order.userId);
                pst.setObject(2, order.commodityCode);
                pst.setObject(3, order.count);
                pst.setObject(4, order.money);
                return pst;
            }
        }, keyHolder);

        order.id = keyHolder.getKey().longValue();

        LOGGER.info("Order Service End ... Created " + order);

        return order;
    }

    /**
     * Sets account service.
     *
     * @param accountService the account service
     */
    public void setAccountService(AccountService accountService) {
        this.accountService = accountService;
    }

    /**
     * Sets jdbc template.
     *
     * @param jdbcTemplate the jdbc template
     */
    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    private int calculate(String commodityId, int orderCount) {
        return 200 * orderCount;
    }

}
View Code

 

下單業務服務接口和實現

public interface BusinessService {

    /**
     * 用戶訂購商品
     *
     * @param userId        用戶ID
     * @param commodityCode 商品編號
     * @param orderCount    訂購數量
     */
    void purchase(String userId, String commodityCode, int orderCount);

}

public class BusinessServiceImpl implements BusinessService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);

    private StorageService storageService;
    private OrderService orderService;

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageService.deduct(commodityCode, orderCount); orderService.create(userId, commodityCode, orderCount); //throw new RuntimeException("xxx");
    }

    public void setStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    public void setOrderService(OrderService orderService) {
        this.orderService = orderService;
    }

}

 

下單業務是分布式事務的發起者,所以方法上需要添加全局事務注解@GlobalTransactional 表示該方法是一個全局事務,那么該方法內部的執行不管是調用遠程方法還是本地方法都會添加到全局事務中

5、本案例采用dubbo的RPC調用,各個服務提供方和服務消費方配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations" value="classpath:jdbc.properties"/>
    </bean>

    <bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.account.url}"/>
        <property name="username" value="${jdbc.account.username}"/>
        <property name="password" value="${jdbc.account.password}"/>
        <property name="driverClassName" value="${jdbc.account.driver}"/>
        <property name="initialSize" value="0" />
        <property name="maxActive" value="180" />
        <property name="minIdle" value="0" />
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="Select 'x' from DUAL" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <property name="removeAbandoned" value="true" />
        <property name="removeAbandonedTimeout" value="1800" />
        <property name="logAbandoned" value="true" />
        <property name="filters" value="mergeStat" />
    </bean>

    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="accountDataSourceProxy" />
    </bean>

    <dubbo:application name="dubbo-demo-account-service" >
        <dubbo:parameter key="qos.enable" value="false"/>
    </dubbo:application>
<!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
    <!--support etcd -->
    <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
    <!--support zk-->
    <dubbo:registry address="zookeeper://localhost:2181" />
    <!--support nacos-->
    <!--<dubbo:registry address="nacos://127.0.0.1:8848"/>-->
    <dubbo:protocol name="dubbo" port="20881"/>
    <dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>

    <bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-account-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>
</beans>
View Code

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations" value="classpath:jdbc.properties"/>
    </bean>

    <bean name="orderDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.order.url}"/>
        <property name="username" value="${jdbc.order.username}"/>
        <property name="password" value="${jdbc.order.password}"/>
        <property name="driverClassName" value="${jdbc.order.driver}"/>
        <property name="initialSize" value="0"/>
        <property name="maxActive" value="180"/>
        <property name="minIdle" value="0"/>
        <property name="maxWait" value="60000"/>
        <property name="validationQuery" value="Select 'x' from DUAL"/>
        <property name="testOnBorrow" value="false"/>
        <property name="testOnReturn" value="false"/>
        <property name="testWhileIdle" value="true"/>
        <property name="timeBetweenEvictionRunsMillis" value="60000"/>
        <property name="minEvictableIdleTimeMillis" value="25200000"/>
        <property name="removeAbandoned" value="true"/>
        <property name="removeAbandonedTimeout" value="1800"/>
        <property name="logAbandoned" value="true"/>
        <property name="filters" value="mergeStat"/>
    </bean>

    <bean id="orderDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="orderDataSource"/>
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="orderDataSourceProxy"/>
    </bean>

    <dubbo:application name="dubbo-demo-order-service">
        <dubbo:parameter key="qos.enable" value="false"/>
    </dubbo:application>
<!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
    <!--support etcd -->
    <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
    <!--support zk-->
    <dubbo:registry address="zookeeper://localhost:2181" />
    <!--support nacos-->
    <!--<dubbo:registry address="nacos://127.0.0.1:8848"/> -->
    <dubbo:protocol name="dubbo" port="20883"/>
    <dubbo:service interface="io.seata.samples.dubbo.service.OrderService" ref="service" timeout="10000"/>

    <dubbo:reference id="accountService" check="false" interface="io.seata.samples.dubbo.service.AccountService"/>

    <bean id="service" class="io.seata.samples.dubbo.service.impl.OrderServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
        <property name="accountService" ref="accountService"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-order-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>
</beans>
View Code

 

<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
  ~
  ~  Licensed under the Apache License, Version 2.0 (the "License");
  ~  you may not use this file except in compliance with the License.
  ~  You may obtain a copy of the License at
  ~
  ~       http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~  Unless required by applicable law or agreed to in writing, software
  ~  distributed under the License is distributed on an "AS IS" BASIS,
  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~  See the License for the specific language governing permissions and
  ~  limitations under the License.
  -->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations" value="classpath:jdbc.properties"/>
    </bean>

    <bean name="storageDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.storage.url}"/>
        <property name="username" value="${jdbc.storage.username}"/>
        <property name="password" value="${jdbc.storage.password}"/>
        <property name="driverClassName" value="${jdbc.storage.driver}"/>
        <property name="initialSize" value="0" />
        <property name="maxActive" value="180" />
        <property name="minIdle" value="0" />
        <property name="maxWait" value="60000" />
        <property name="validationQuery" value="Select 'x' from DUAL" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />
        <property name="testWhileIdle" value="true" />
        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <property name="minEvictableIdleTimeMillis" value="25200000" />
        <property name="removeAbandoned" value="true" />
        <property name="removeAbandonedTimeout" value="1800" />
        <property name="logAbandoned" value="true" />
        <property name="filters" value="mergeStat" />
    </bean>

    <bean id="storageDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="storageDataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="storageDataSourceProxy" />
    </bean>

    <dubbo:application name="dubbo-demo-storage-service"  >
        <dubbo:parameter key="qos.enable" value="false"/>
    </dubbo:application>
<!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
    <!--support etcd -->
    <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
    <!--support zk-->
    <dubbo:registry address="zookeeper://localhost:2181" />
    <!--support nacos-->
    <!--<dubbo:registry address="nacos://127.0.0.1:8848"/> -->
    <dubbo:protocol name="dubbo" port="20882" />
    <dubbo:service interface="io.seata.samples.dubbo.service.StorageService" ref="service" timeout="10000"/>

    <bean id="service" class="io.seata.samples.dubbo.service.impl.StorageServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-storage-service"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>
</beans>
View Code

 

<?xml version="1.0" encoding="UTF-8"?>
<!--
  ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
  ~
  ~  Licensed under the Apache License, Version 2.0 (the "License");
  ~  you may not use this file except in compliance with the License.
  ~  You may obtain a copy of the License at
  ~
  ~       http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~  Unless required by applicable law or agreed to in writing, software
  ~  distributed under the License is distributed on an "AS IS" BASIS,
  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ~  See the License for the specific language governing permissions and
  ~  limitations under the License.
  -->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="dubbo-demo-app">
        <dubbo:parameter key="qos.enable" value="false" />
        <dubbo:parameter key="qos.accept.foreign.ip" value="false" />
        <dubbo:parameter key="qos.port" value="33333" />
    </dubbo:application>
<!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
    <!--support etcd -->
    <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
    <!--support zk-->
    <dubbo:registry address="zookeeper://localhost:2181" />
    <!--support nacos-->
    <!-- <dubbo:registry address="nacos://127.0.0.1:8848"/> -->

    <dubbo:reference id="orderService" check="false" interface="io.seata.samples.dubbo.service.OrderService"/>
    <dubbo:reference id="storageService" check="false" interface="io.seata.samples.dubbo.service.StorageService"/>

    <bean id="business" class="io.seata.samples.dubbo.service.impl.BusinessServiceImpl">
        <property name="orderService" ref="orderService"/>
        <property name="storageService" ref="storageService"/>
    </bean>

    <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-app"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

</beans>
View Code

 

6、啟動Seata服務,Seata服務下載地址為:https://github.com/seata/seata/releases, 下載完成之后解壓,並進入bin目錄,執行如下腳本開啟seata服務

1 sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

 

7、分別啟動庫存服務、訂單服務、賬戶服務

 1 public class DubboStorageServiceStarter {
 2     /**
 3      * 1. Storage service is ready . A seller add 100 storage to a sku: C00321
 4      *
 5      * @param args the input arguments
 6      */
 7     public static void main(String[] args) {
 8         ClassPathXmlApplicationContext storageContext = new ClassPathXmlApplicationContext(
 9             new String[]{"spring/dubbo-storage-service.xml"});
10         storageContext.getBean("service");
11         JdbcTemplate storageJdbcTemplate = (JdbcTemplate) storageContext.getBean("jdbcTemplate");
12         storageJdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'");
13         storageJdbcTemplate.update("insert into storage_tbl(commodity_code, count) values ('C00321', 100)");
14         new ApplicationKeeper(storageContext).keep();
15     }
16 }
View Code

 

public class DubboOrderServiceStarter {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        /**
         *  3. Order service is ready . Waiting for buyers to order
         */
        ClassPathXmlApplicationContext orderContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-order-service.xml"});
        orderContext.getBean("service");
        new ApplicationKeeper(orderContext).keep();
    }
}
View Code

 

public class DubboAccountServiceStarter {
    /**
     * 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});
        accountContext.getBean("service");
        JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");
        accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
        accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");

        new ApplicationKeeper(accountContext).keep();
    }
}
View Code

 

8、啟動業務服務,調用下單方法

public class DubboBusinessTester {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        /**
         *  4. The whole e-commerce platform is ready , The buyer(U100001) create an order on the sku(C00321) , the count is 2
         */
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
            new String[]{"spring/dubbo-business.xml"});
        final BusinessService business = (BusinessService) context.getBean("business");
        business.purchase("U100001", "C00321", 2);
    }
}

9、模擬測試

1、各個服務均正常情況

測試結果:業務數據正常

2、庫存服務異常

測試結果:庫存服務回滾,全局事務中其他事務未執行

3、訂單服務異常

測試結果:數據庫數據先更新成功然后全部回滾,說明各個事務都提交成功然后又執行了回滾操作

4、賬戶服務異常

測試結果:庫存數據顯扣除然后由於賬戶服務異常導致訂單服務回滾,從而全局事務回滾,而庫存數據也恢復

5、業務服務異常

測試流程:業務服務調用訂單服務、庫存服務、賬戶服務全部執行成功,最后處理業務時拋異常,此時數據庫數據已經更新成功了

測試結果:庫存服務、訂單服務、賬戶服務一切正常數據庫數據全部更新成功,然后由於業務服務異常導致全部數據回滾

6、業務服務異常,Seata服務宕機

測試流程:業務服務調用訂單服務、庫存服務、賬戶服務全部執行成功,最后處理業務時拋異常,此時數據庫數據已經更新成功了,並且Seata服務測試掛了,再重啟時能否再回滾呢?

測試結果:Seata重啟之后,各個客戶端會再次向Seata注冊事務並上報事務狀態,Seata根據事務狀態判斷全局事務是提交還是回滾,所以最終結果為回滾成功

7、全局事務回滾時,其他事務變更事務場景

測試流程:由於Seata的分支事務會在第一階段直接提交,那么此時其他事務理論上就可以任意訪問和更新數據了,此時如果全局事務再回滾,那么已經被修改的數據還可以回滾么?

測試結果:其他分支事務提交成功;全局事務回滾數據恢復

五、Seata的實現原理

以上述為例,架構圖如下圖示: 

 

全局事務協調器TC就是Seata Server服務,事務管理器TM也就是事務發起者就是Business服務,資源管理器RM就分別是操作數據庫的Storage、Order和Account服務。

5.1、Seata源碼解析

5.1.1、初始化

從配置文件入手,使用Seata時需要配置兩個bean分別是DataSourceProxy和GlobalTransactionScanner,配置如下:

<bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
        <constructor-arg ref="accountDataSource" />
 </bean>
1 <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
2         <constructor-arg value="dubbo-demo-account-service"/>
3         <constructor-arg value="my_test_tx_group"/>
4     </bean>

 

從字面以上看DataSourceProxy意思就是數據源的代理,構造函數中需要傳入需要被代理的數據源,比如阿里的DruidDataSource;GlobalTransactionScanner意思是全局事務掃描器,那么應該就是用來掃描項目中配置的所有全局事務。所以了解Seata就需要從這兩個類開始着手。

5.1.2、DataSourceProxy初始化

DataSourceProxy構造函數保護了一個DataSource對象,意思是要代理這個DataSource對象,構造函數源碼如下:

public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
        if (targetDataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
            targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
        }
        /** 設置目標數據源DataSource*/
        this.targetDataSource = targetDataSource;
        /** 執行初始化init方法*/
        init(targetDataSource, resourceGroupId);
    }

private void init(DataSource dataSource, String resourceGroupId) {
        /** 設置資源小組ID*/
        this.resourceGroupId = resourceGroupId;
        /** 設置JDBC參數*/
        try (Connection connection = dataSource.getConnection()) {
            jdbcUrl = connection.getMetaData().getURL();
            dbType = JdbcUtils.getDbType(jdbcUrl);
            if (JdbcConstants.ORACLE.equals(dbType)) {
                userName = connection.getMetaData().getUserName();
            }
        } catch (SQLException e) {
            throw new IllegalStateException("can not init dataSource", e);
        }
        /** 注冊當前數據源*/
        DefaultResourceManager.get().registerResource(this);
        /** 判斷是否開啟定時任務檢查表的元數據*/
        if (ENABLE_TABLE_META_CHECKER_ENABLE) {
            tableMetaExcutor.scheduleAtFixedRate(() -> {
                try (Connection connection = dataSource.getConnection()) {
                    TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                        .refresh(connection, DataSourceProxy.this.getResourceId());
                } catch (Exception ignore) {
                }
            }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
        }

        //Set the default branch type to 'AT' in the RootContext.
        /** 設置全局分支類型,默認是AT類型*/
        RootContext.setDefaultBranchType(this.getBranchType());
    }

 

從DataSourceProxy初始化的源碼來看主要是配置數據源,JDBC相關的熟悉,並且設置全局事務的類型默認為AT類型。另外有一步比較重要就是注冊資源到資源管理器,說明Seata資源管理器管理的資源就是DataSourceProxy對象,因為DataSourceProxy類實現了資源Resource接口,而資源管理器是ResourceManager對象,在Seata中就是RM角色,那么就可以在看下資源管理器是怎么來的?如何注冊資源的?

5.1.3、資源管理器初始化

DefaultResourceManager.get()方法可以返回一個資源管理器,源碼如下:

1 private static class SingletonHolder {
2         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
3     }
4     
5     public static DefaultResourceManager get() {
6         /** 返回資源管理器的單例對象*/
7         return SingletonHolder.INSTANCE;
8     }

 

資源管理器采用單例模式初始化,返回了DefaultResourceManager類的對象,那么再看下DefaultResourceManager的初始化過程,源碼如下:

 1 /** 存儲不同模式的資源管理器對象,支持模式為:AT、TCC、SAGA、XA*/
 2     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
 3 
 4     /** 初始化資源管理器*/
 5     private DefaultResourceManager() {
 6         initResourceManagers();
 7     }
 8 
 9     /** 初始化資源管理器列表*/
10     protected void initResourceManagers() {
11         /** 加載所有的資源管理器對象*/
12         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
13         if (CollectionUtils.isNotEmpty(allResourceManagers)) {
14             for (ResourceManager rm : allResourceManagers) {
15                 resourceManagers.put(rm.getBranchType(), rm);
16             }
17         }
18     }

 

其中EnhancedServerLoader.loadAll(ResourceManager.class)這一行實際是采用了SPI機制來加載資源管理器對象,SPI機制就是從/resource/META-INF/services目錄下尋找ResourceManager接口的實現類,然后進行加載。所以我們就需要從該目錄下尋找ResourceManager的實現類,既然應用項目中沒有配置SPI,那么肯定就是Seata提供的jar包中配置了,打開seata-all.jar,果然發現了ResourceManager的SPI配置,如下圖:

 

 

並且除了ResourceManager配置,還有很多其他接口的配置,所以可以看出seata中大量的使用了SPI機制。那么此時打開ResourceManager配置文件,內容如下:

io.seata.rm.datasource.DataSourceManager
io.seata.rm.datasource.xa.ResourceManagerXA
io.seata.rm.tcc.TCCResourceManager
io.seata.saga.rm.SagaResourceManager

 

可以看出配置了不同模式下的資源管理器,正好和Seata支持的AT、TCC、SAGA、XA四種模式對應上,而默認的AT模式資源管理器就是DataSourceManager類的對象。

總結:Seata通過SPI機制加載不同模式下的資源管理器對象,存在Map中,DataSourceProxy注冊資源時根據模式從Map中獲取對應類型的資源管理器即可,默認AT模式就是DataSourceManager對象。

5.1.4、資源管理器注冊資源

現在有了資源管理器了,那么就需要再注冊下資源,也就是調用ResourceManager的registerResource(Resource resource)方法,以AT模式為例,就分析下DataSourceManager的registerResource方法,源碼如下:

 1 /** 資源緩存*/
 2     private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
 3 
 4     /** 注冊資源*/
 5     public void registerResource(Resource resource) {
 6         DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
 7         /** 將資源加入緩存中*/
 8         dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
 9         /** 調用父類的registerResource方法*/
10         super.registerResource(dataSourceProxy);
11     }

 

加了一步緩存,然后再調用父類AbstractResourceManager的registerResource方法,源碼如下:

1 public void registerResource(Resource resource) {
2         RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
3     }

 

代碼只有一行但是比較長,實際就是通過RmNettyRemotingClinet類的靜態方法getInstance()獲取一個單例對象,然后調用單例對象的registerResource方法,RmNettyRemotingClient.getInstance()從字面意思就可以很好理解, 獲取一個代表資源管理器(RM)的Netty客戶端實例,實際就是RmNettyRemotingClient對象,源碼如下:

 1 /** 獲取表示資源管理器的Netty客戶端對象*/
 2     public static RmNettyRemotingClient getInstance() {
 3         if (instance == null) {
 4             synchronized (RmNettyRemotingClient.class) {
 5                 if (instance == null) {
 6                     NettyClientConfig nettyClientConfig = new NettyClientConfig();
 7                     /** 創建一個線程池*/
 8                     final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
 9                             nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
10                             KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
11                             new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),
12                                     nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
13                     /** 通過構造器創建RmNettyRemotingClient對象*/
14                     instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
15                 }
16             }
17         }
18         return instance;
19     }
20 
21     private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
22                                   ThreadPoolExecutor messageExecutor) {
23         /**
24          * nettyClientConfig:Netty客戶端配置
25          * eventExecutorGroup:事件線程池組
26          * messageExecutor:消息線程池
27          * TransactionRole.RMROLE:表示當前客戶端角色是RM
28          * */
29         super(nettyClientConfig, eventExecutorGroup, messageExecutor, TransactionRole.RMROLE);
30     }
31 
32     public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
33                                        ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
34         super(messageExecutor);
35         /** 設置角色,分別為 TM、RM、SERVER*/
36         this.transactionRole = transactionRole;
37         /** 初始化Netty客戶端*/
38         clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
39         /** 設置Netty客戶端處理器*/
40         clientBootstrap.setChannelHandlers(new ClientHandler());
41         clientChannelManager = new NettyClientChannelManager(
42                 new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
43     }

 

這里實際就是創建了一個Netty的客戶端對象,並且當前客戶端對象的角色是RM也就是資源管理器,客戶端處理業務的處理器是ClientHandler對象

此時Netty客戶端已經創建了,下一步就應該是需要和Netty服務器(TC)進行連接了,那么估計邏輯就在registerResource方法中了,RmNettyRemotingClient的registerResource源碼如下:

public void registerResource(String resourceGroupId, String resourceId) {

        // Resource registration cannot be performed until the RM client is initialized
        if (StringUtils.isBlank(transactionServiceGroup)) {
            return;
        }

        if (getClientChannelManager().getChannels().isEmpty()) {
            getClientChannelManager().reconnect(transactionServiceGroup);
            return;
        }
        synchronized (getClientChannelManager().getChannels()) {
            for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
                String serverAddress = entry.getKey();
                Channel rmChannel = entry.getValue();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("will register resourceId:{}", resourceId);
                }
                /** 發送注冊資源的消息*/
                sendRegisterMessage(serverAddress, rmChannel, resourceId);
            }
        }
    }

    public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
        /** 設置應用ID和事務服務組*/
        RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
        /** 設置資源ID*/
        message.setResourceIds(resourceId);
        try {
            /** 向Netty服務器發送消息*/
            super.sendAsyncRequest(channel, message);
        } catch (FrameworkException e) {
            if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
                getClientChannelManager().releaseChannel(channel, serverAddress);
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove not writable channel:{}", channel);
                }
            } else {
                LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);
            }
        }
    }
View Code

 

大致流程就是構建一個注冊資源的消息,然后發送給Netty服務器即可

總結下資源管理器注冊資源的流程:

1、首先獲取代表資源管理器RM的Netty客戶端對象

2、構造代表注冊資源的消息體對象

3、Netty客戶端將注冊資源的消息發送給Netty服務器請求注冊資源

5.1.5、資源管理器處理器

資源管理器向資源協調器(TC)注冊了資源之后就需要對資源進行管理,由資源協調器向資源管理器發送命令,也就是Netty服務器向Netty客戶端發送命令,所以Netty客戶端就需要對服務器發送的命令進行處理,通過Netty客戶端初始化過程可知處理的邏輯在ClientHandler中,所以就再分析ClientHandler的channelRead方法即可,源碼如下:

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            /** 將Netty服務器發送的消息封裝成RpcMessage對象,調用processMessage方法*/
            processMessage(ctx, (RpcMessage) msg);
        }

 

 1 protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
 2         /** 獲取消息體*/
 3         Object body = rpcMessage.getBody();
 4         if (body instanceof MessageTypeAware) {
 5             /** 消息體轉換為MessageTypeAware對象*/
 6             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
 7             /** 根據消息體類型編碼TypeCode獲取對應的執行器*/
 8             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
 9             if (pair != null) {
10                 if (pair.getSecond() != null) {
11                     try {
12                         pair.getSecond().execute(() -> {
13                             try {
14                                 /** 獲取第一個執行器處理消息*/
15                                 pair.getFirst().process(ctx, rpcMessage);
16                             } catch (Throwable th) {
17                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
18                             } finally {
19                                 MDC.clear();
20                             }
21                         });
22                     } catch (RejectedExecutionException e) {
23                       //......
24                     }
25                 } else {
26                    //......
27                 }
28             } else {
29                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
30             }
31         } else {
32             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
33         }
34     }

 

方法比較長但是邏輯比較簡單,一共就兩步,第一步是獲取消息對應的類型,然后找到該類型的處理器;第二步是直接執行對應消息類型的處理器的process方法即可,處理器的頂級接口是RemotingProcessor,表示遠程消息處理器,實現類比較多,基本上根據不同的消息類型就會有不同的消息處理器,並且不同消息有不同的MessageTypeAware實現類,也就是意味着不同類型的消息有不同的消息體結構,並且有不同的消息處理器,消息體類型定義在MessageType類中,對應關系如下:

消息類型 用途 MessageTypeAware RemotingProcessor 接收處理方
short TYPE_GLOBAL_BEGIN = 1 全局事務開啟

GlobalBeginRequest

ServerOnRequestProcessor Server
short TYPE_GLOBAL_BEGIN_RESULT = 2 全局事務開啟結果 GlobalBeginResponse ClientOnResponseProcessor Client
short TYPE_BRANCH_COMMIT = 3 分支事務提交 BranchCommitRequest RmBranchCommitProcessor Client
short TYPE_BRANCH_COMMIT_RESULT = 4 分支事務提交結果 BranchCommitResponse ServerOnResponseProcessor Server
short TYPE_BRANCH_ROLLBACK = 5 分支事務回滾 BranchRollbackRequest RmBranchRollbackProcessor Client
short TYPE_BRANCH_ROLLBACK_RESULT = 6 分支事務回滾結果 BranchRollbackResponse ServerOnResponseProcessor Server
short TYPE_GLOBAL_COMMIT = 7 全局事務提交 GlobalCommitRequest ServerOnRequestProcessor Server
short TYPE_GLOBAL_COMMIT_RESULT = 8 全局事務提交結果 GlobalCommitResponse ClientOnResponseProcessor Client
short TYPE_GLOBAL_ROLLBACK = 9 全局事務回滾 GlobalRollbackRequest ServerOnRequestProcessor Server
short TYPE_GLOBAL_ROLLBACK_RESULT = 10 全局事務回滾結果 GlobalRollbackResponse ClientOnResponseProcessor Client
short TYPE_BRANCH_REGISTER = 11 分支事務注冊 BranchRegisterRequest ServerOnRequestProcessor Server
short TYPE_BRANCH_REGISTER_RESULT = 12 分支事務注冊結果 BranchRegisterResponse ClientOnResponseProcessor Client
short TYPE_BRANCH_STATUS_REPORT = 13 分支事務狀態報告 BranchReportRequest ServerOnRequestProcessor Server
short TYPE_BRANCH_STATUS_REPORT_RESULT = 14 分支事務狀態報告結果 BranchReportResponse ClientOnResponseProcessor Client
short TYPE_GLOBAL_STATUS = 15 獲取全局事務狀態 GlobalStatusRequest ServerOnRequestProcessor Server
short TYPE_GLOBAL_STATUS_RESULT = 16 獲取全局事務狀態結果 GlobalStatusResponse ClientOnResponseProcessor Client
short TYPE_GLOBAL_REPORT = 17 全局事務報告 GlobalReportRequest ServerOnRequestProcessor Server
short TYPE_GLOBAL_REPORT_RESULT = 18 全局事務報告結果 GlobalReportResponse ClientOnResponseProcessor Client
short TYPE_GLOBAL_LOCK_QUERY = 21 全局事務查詢鎖 GlobalLockQueryRequest ServerOnRequestProcessor Server
short TYPE_GLOBAL_LOCK_QUERY_RESULT = 22 全局事務查詢鎖結果 GlobalLockQueryResponse ClientOnResponseProcessor Client
short TYPE_SEATA_MERGE = 59 seata合並 MergedWarpMessage ServerOnRequestProcessor Server
short TYPE_SEATA_MERGE_RESULT = 60 seata合並結果 MergedResultMessage ClientOnResponseProcessor Client
short TYPE_REG_CLT = 101 注冊TM RegisterTMRequest RegTmProcessor Server
short TYPE_REG_CLT_RESULT = 102 注冊TM結果 RegisterTMResponse ClientOnResponseProcessor Client
short TYPE_REG_RM = 103 注冊RM RegisterRMRequest RegRmProcessor Server
short TYPE_REG_RM_RESULT = 104 注冊RM結果 RegisterRMResponse ClientOnResponseProcessor Client
short TYPE_RM_DELETE_UNDOLOG = 111 RM刪除undolog UndoLogDeleteRequest RmUndoLogProcessor Client
short TYPE_HEARTBEAT_MSG = 120 心跳 HeartbeatMessage ServerHeartbeatProcessor 和 ClientHeartbeatProcessor Server 和 Client

Seata將服務器和客戶端之間通信的消息類型和處理器全部獨立開,很符合設計原則中的單一職責原則和開閉原則。所以不同狀態的處理邏輯就可以直接看對應的處理器即可。接下來就針對一個完整的全局事務工作流程來分析seata的工作流程

5.1.6、注冊RM

注冊RM邏輯是由客戶端發送給服務器,對於的處理器是RegRmProcessor,邏輯如下:

 1 public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
 2  onRegRmMessage(ctx, rpcMessage);
 3     }
 4 
 5     private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
 6         RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
 7         String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
 8         boolean isSuccess = false;
 9         String errorInfo = StringUtils.EMPTY;
10         try {
11             if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
12                 /** 調用ChannelManager的registerRMChannel方法注冊RM客戶端*/
13                 ChannelManager.registerRMChannel(message, ctx.channel());
14                 Version.putChannelVersion(ctx.channel(), message.getVersion());
15                 isSuccess = true;
16                 if (LOGGER.isDebugEnabled()) {
17                     LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
18                 }
19             }
20         } catch (Exception exx) {
21             isSuccess = false;
22             errorInfo = exx.getMessage();
23             LOGGER.error("RM register fail, error message:{}", errorInfo);
24         }
25         RegisterRMResponse response = new RegisterRMResponse(isSuccess);
26         if (StringUtils.isNotEmpty(errorInfo)) {
27             response.setMsg(errorInfo);
28         }
29         /** 回復RM注冊結果消息 */
30         remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
31     }

 

 注冊RM的工作交給了ChannelManager來負責,registerRMChannel邏輯如下:

 1 public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
 2             throws IncompatibleVersionException {
 3         Version.checkVersion(resourceManagerRequest.getVersion());
 4         /** 獲取DBKey,實際就是jdbcUrl*/
 5         Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
 6         RpcContext rpcContext;
 7         /** 如果channel沒有認證過*/
 8         if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
 9             /** 創建RpcContext上下文信息用於存儲RM和資源信息*/
10             rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
11                     resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
12                     resourceManagerRequest.getResourceIds(), channel);
13             rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
14         } else {
15             rpcContext = IDENTIFIED_CHANNELS.get(channel);
16             rpcContext.addResources(dbkeySet);
17         }
18         if (dbkeySet == null || dbkeySet.isEmpty()) { return; }
19         /** 緩存RM信息,采用多個集合存儲RM信息*/
20         for (String resourceId : dbkeySet) {
21             String clientIp;
22             ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
23                     .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
24                     .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());
25 
26             rpcContext.holdInResourceManagerChannels(resourceId, portMap);
27             updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
28         }
29     }

 

ChannelManager采用了多層集合存儲了RM的信息,數據結構如下:

/**
     * resourceId -> applicationId -> ip -> port -> RpcContext
     */
    private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();

5.1.7、注冊TM

注冊TM的邏輯和注冊RM的邏輯幾乎一摸一樣,只不過采用了不同的數據結構來存儲,存儲TM的數據結構如下:

/**
     * ip+appname,port
     */
    private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>= new ConcurrentHashMap<>();

目前為止已經注冊了TM和RM,那么接下來就需要發現全局事務、注冊全局事務了,而這些工作很顯然就需要GlobalTransactionScanner來實現了

5.1.8、GlobalTransactionScanner(***)

GlobalTransactionScanner從字面意思是全局事務掃描器,定義如下:

1 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean

 

繼承了AbstractAutoProxyCreator用於給修飾的bean進行動態代理創建,實現了InitalizingBean用於初始化,實現了ApplicationContextAware用於注入ApplicationContext對象,實現DisposableBean用於銷毀,那就先從初始化方法afterPropertiesSet開始分析。

5.1.8.1.GlobalTransactionScanner初始化

public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                    (ConfigurationChangeListener)this);
            return;
        }
        /** 開啟初始化狀態*/
        if (initialized.compareAndSet(false, true)) {
            /** 初始化客戶端 */ initClient();
        }
    }

    private void initClient() {
        /** 參數校驗*/
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }
        /** 初始化TM客戶端 */
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
        /** 初始化RM客戶端 */
        RMClient.init(applicationId, txServiceGroup);
        /** 注冊Spring銷毀鈎子函數 */
        registerSpringShutdownHook();
    }

 

核心邏輯就兩個,分別是初始化TM客戶端和RM客戶端,本質就是創建Netty客戶端和Netty服務器也就是TC服務器創建連接,以TM客戶端為例,init方法邏輯如下:

public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
        /** 創建Netty客戶端 */
        TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
        /** Netty客戶端初始化 */ tmNettyRemotingClient.init();
    }

    public void init() {
        /** 注冊消息處理器*/ registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            /** 調用父類初始化方法*/
            super.init();
        }
    }

    private void registerProcessor() {
        /** 注冊不同消息體的處理器*/
        ClientOnResponseProcessor onResponseProcessor =
                new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
        super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
        // 2.registry heartbeat message processor
        ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
    }

    public void init() {
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                /** 客戶端重連*/
                clientChannelManager.reconnect(getTransactionServiceGroup());
            }
        }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                    MAX_MERGE_SEND_THREAD,
                    KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
                    new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        /** 客戶端監聽程序開啟 */ clientBootstrap.start();
    }

 

邏輯如下:

1.創建Netty客戶端表示TM客戶端

2.注冊消息處理器,針對不同的消息類型注冊不同的業務處理器

3.連接TC服務器並開始監聽服務器消息

而RM的初始化過程基本上整體邏輯和TM的初始化邏輯一致,不同的是監聽的消息和處理器不一樣而已。

5.1.8.2.動態代理

GlobalTransactionScanner繼承了AbstractAutoProxyCreator,而AbstractAutoProxyCreator是用於給bean創建動態代理使用的,通過wrapIfNecessary方法給bean創建動態代理,那么就可以直接看GlobalTransactionScanner的wrapIfNecessary方法即可,邏輯如下:

@Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                /** 如果代理已存在,直接返回*/
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                /** 檢查是否是TCC模式的代理*/
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener)interceptor);
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    /** 判斷目標類和目標接口是否存在@GlobalTransactional 或 @GlobalLock注解 */
                    if (!existsAnnotation(new Class[]{serviceInterface})
                            && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }
                    if (globalTransactionalInterceptor == null) {
                        /** 創建方法攔截器 GlobalTransactionalInterceptor*/
                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                    }
                    interceptor = globalTransactionalInterceptor;
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {
                    /** 調用父類的創建代理的方法繼續添加代理的增強邏輯 */
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

private boolean existsAnnotation(Class<?>[] classes) {
        if (CollectionUtils.isNotEmpty(classes)) {
            for (Class<?> clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                /** 判斷是否存在@GlobalTransactional注解 */
                GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
                if (trxAnno != null) {
                    return true;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }
                    /** 判斷是否存在@GlobalLock注解*/
                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

 

整體邏輯就是判斷當前bean是否存在@GlobalTransactional注解或@GlobalLock注解,如果存在就創建方法攔截器GlobalTransactionalInterceptor,然后創建代理的工作交給父類AbstractAutoProxyCreator, 所以得出結論是被@GlobalTransactional注解修飾的方法執行的時候會走方法攔截器GlobalTransactionalIntercetor的invoke方法,邏輯如下:

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            /** 獲取GlobalTransactional注解對象 */
            final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
            /** 獲取GlobalLock注解對象 */
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
                if (globalTransactionalAnnotation != null) {
                    /** 如果存在GlobalTransactional注解,就執行handleGlobalTransaction方法*/
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                    /** 如果存在GlobalLock注解,就執行handleGlobalLock方法*/
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        /** 執行原方法邏輯*/
        return methodInvocation.proceed();
    }

 

如果存在@GlobalTransactional注解最終會執行handleGlobalTransaction方法處理,邏輯如下:

 

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                   final GlobalTransactional globalTrxAnno) throws Throwable {
        boolean succeed = true;
        try {
            /** 調用事務模版的execute方法執行 */
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }

                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = globalTrxAnno.timeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }

                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                    transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

    public Object execute(TransactionalExecutor business) throws Throwable {
        /** 1.獲取當前事務*/
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        /** 2.獲取當前全局事務,如果存在就表示當前角色為參與者*/
        GlobalTransaction tx = GlobalTransactionContext.getCurrent();

        /** 3.獲取事務傳播機制 */
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    /** 如果當前事務存在,就暫停事務*/
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    return business.execute();
                case REQUIRES_NEW:
                    /** 如果當前事務存在,就開啟新事務*/
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    break;
                case SUPPORTS:
                    /** 如果當前事務不存在,就不使用事務*/
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    /** 默認機制,如果存在就加入;如果不存在就創建新事務*/
                    break;
                case NEVER:
                    /** 不使用事務,如果事務存在就拋異常*/
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                        , tx.getXid()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    /** 使用事務,如果事務不存在就拋異常*/ 
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }

            /** 如果事務不存在,就創建新事務,當前角色為LAUNCHER表示事務發起者*/
            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }
            GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

            try {
                /** 開啟事務,必須以Launcher角色開啟*/ beginTransaction(txInfo, tx);

                Object rs;
                try {
                    /** 執行業務邏輯*/
                    rs = business.execute();
                } catch (Throwable ex) {
                    /** 回滾事務*/ completeTransactionAfterThrowing(txInfo, tx, ex); throw ex;
                }
                /** 提交事務*/ commitTransaction(tx); return rs;
            } finally {
                /** 清理緩存信息*/
                resumeGlobalLockConfig(previousConfig);
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            // If the transaction is suspended, resume it.
            if (suspendedResourcesHolder != null) {
                tx.resume(suspendedResourcesHolder);
            }
        }
    }

 

核心邏輯是根據不同事務傳播機制進行不同處理,然后開啟全局事務並執行業務,根據業務執行結果進行提交或回滾事務。

5.1.9、全局事務開啟

通過5.1.8.2得知全局事務的開啟是通過beginTransaction方法開啟的,邏輯如下:

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            /** 觸發開啟事務之前的鈎子邏輯*/
            triggerBeforeBegin();
            /** 調用GlobalTransaction的begin方法開啟事務 */
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            /** 觸發開啟事務之后的鈎子邏輯*/
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.BeginFailure);
        }
    }

 

調用GlobalTransaction的begin方法開啟事務,GlobalTransaction對象是5.1.8.2中GlobalTransactionContext.createNew方法創建,實際就是創建了一個DefaultGlobalTransaction對象,所以開啟事務的邏輯在DefaultGlobalTransaction的begin方法中,邏輯如下:

 1 /** DefaultGlobalTransaction 開啟事務*/
 2     public void begin(int timeout, String name) throws TransactionException {
 3         /** 當前角色必須是Launcher才可以開啟事務*/
 4         if (role != GlobalTransactionRole.Launcher) {
 5             assertXIDNotNull();
 6             if (LOGGER.isDebugEnabled()) {
 7                 LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
 8             }
 9             return;
10         }
11         assertXIDNull();
12         /** 判斷當前事務是否存在,如果存在則拋異常*/
13         String currentXid = RootContext.getXID();
14         if (currentXid != null) {
15             throw new IllegalStateException("Global transaction already exists," +
16                     " can't begin a new global transaction, currentXid = " + currentXid);
17         }
18         /** 調用TransactionManager的begin方法開啟事務,並獲取XID*/
19         xid = transactionManager.begin(null, null, name, timeout);
20         /** 綁定全局事務ID並標記事務狀態*/
21         status = GlobalStatus.Begin;
22         RootContext.bind(xid);
23         if (LOGGER.isInfoEnabled()) {
24             LOGGER.info("Begin new global transaction [{}]", xid);
25         }
26     }

 

發現最終開啟事務是通過TransactionManager類的begin方法實現的,邏輯就是創建開啟全局事務請求對象發送給TC服務器,返回全局事務的ID,源碼如下:

 1 /** DefaultTransactionManager 開啟事務*/
 2     @Override
 3     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
 4             throws TransactionException {
 5         /** 創建開啟全局事務的請求消息對象*/
 6         GlobalBeginRequest request = new GlobalBeginRequest();
 7         request.setTransactionName(name);
 8         request.setTimeout(timeout);
 9         /** 發送開啟全局事務消息並獲取開啟結果 */
10         GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
11         if (response.getResultCode() == ResultCode.Failed) {
12             throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
13         }
14         /** 返回全局事務ID*/
15         return response.getXid();
16     }

 

根據5.1.5的消息對應關系可知,TC服務器處理開啟事務消息的處理器是ServerOnRequestProcessor,最終被AbstractTCInboundHandler的handle方法處理,現在被@GlobalTransactional注解修飾的方法已經開啟全局事務了,那么接下來就需要進行分支事務的注冊以及分支事務執行了。

TC服務器處理開啟全局事務的邏輯是先創建開啟事務結果GlobalBeginResponse對象,然后調用doGlobalBegin方法處理,邏輯如下:

 1  protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
 2             throws TransactionException {
 3         /**
 4          * 調用core.begin開啟事務,並持久化
 5          * 給response設置全局事務編號XID*/
 6         response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
 7                 request.getTransactionName(), request.getTimeout()));
 8         if (LOGGER.isInfoEnabled()) {
 9             LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
10                     rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
11         }
12     }
13 
14     /** DefaultCode的begin方法 */
15     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
16             throws TransactionException {
17         /** 創建全局會話*/
18         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
19                 timeout);
20         /** 日志打印*/
21         MDC.put(RootContext.MDC_KEY_XID, session.getXid());
22         /** 添加事務生命周期監聽器*/
23         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
24         /** 開啟事務*/
25         session.begin();
26 
27         /** 發送事務開啟事件*/
28         eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
29                 session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
30         /** 返回全局事務ID*/
31         return session.getXid();
32     }

 

最終創建了一個GlobalSession對象並調用GlobalSession的begin方法開啟事務,開啟之后會激活Session生命周期監聽器的onBegin方法,監聽器通過sessionHolder.getRootSessionManager()獲取,SessionManager有多種實現方式,可以采用Redis、File和DB幾種方式來存儲,默認采用數據庫方式存儲,所以這里就分析數據庫方式存儲的實現方式,實現為DataBaseSessionManager,通過TransactionStoreManager的writeSession方法進行全局事務持久化,邏輯如下:

 1 public boolean writeSession(LogOperation logOperation, SessionStorable session) {
 2         if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
 3             /** 全局事務添加*/
 4             return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
 5         } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
 6             /** 全局事務更新*/
 7             return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
 8         } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
 9             /** 全局事務移除*/
10             return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
11         } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
12             /** 添加分支事務*/
13             return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
14         } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
15             /** 更新分支事務*/
16             return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
17         } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
18             /** 移除分支事務*/
19             return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
20         } else {
21             throw new StoreException("Unknown LogOperation:" + logOperation.name());
22         }
23     }

 

當全局事務和分支事務增刪改時都會記錄日志。

5.1.10、分支事務注冊

由5.1.2可以seata針對數據進行了代理,采用了DataSourceProxy代理類來代理DataSource,而獲取數據庫連接的方法getConnection就被你DataSourceProxy重寫了,返回了Connection的代理類ConnectionProxy,代碼如下:

/** DataSourceProxy 獲取數據庫連接*/
@Override 
public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
       /** 創建Connection對象的代理*/
        return new ConnectionProxy(this, targetConnection);
    }

那么說明數據庫事務的提交和回滾都是由ConnectionProxy來實現的。先看提交事務的commit方法,邏輯如下:

 1 @Override
 2     public void commit() throws SQLException {
 3         try {
 4             LOCK_RETRY_POLICY.execute(() -> {
 5                 /** 提交事務*/
 6  doCommit();
 7                 return null;
 8             });
 9         } catch (SQLException e) {
10             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
11                 /** 異常回滾事務*/
12  rollback();
13             }
14             throw e;
15         } catch (Exception e) {
16             throw new SQLException(e);
17         }
18     }

 

 1 private void doCommit() throws SQLException {
 2         /** 如果在全局事務中*/
 3         if (context.inGlobalTransaction()) {
 4  processGlobalTransactionCommit();
 5         }
 6         /** 如果在全局鎖中 */
 7         else if (context.isGlobalLockRequire()) {
 8             processLocalCommitWithGlobalLocks();
 9         } else {
10             /** 沒有事務直接調用目標Connection的commit方法提交事務*/
11             targetConnection.commit();
12         }
13     }

 

 1 /** 處理全局事務的分支事務提交*/
 2     private void processGlobalTransactionCommit() throws SQLException {
 3         try {
 4             /** 注冊分支事務*/
 5  register();
 6         } catch (TransactionException e) {
 7             recognizeLockKeyConflictException(e, context.buildLockKeys());
 8         }
 9         try {
10             /** 清理undoLog*/
11             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
12             /** 目標連接提交事務*/
13             targetConnection.commit();
14         } catch (Throwable ex) {
15             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
16             /** 上報分支事務狀態*/
17             report(false);
18             throw new SQLException(ex);
19         }
20         if (IS_REPORT_SUCCESS_ENABLE) {
21             /** 上報分支事務狀態*/
22             report(true);
23         }
24         /** 上下文重置*/
25         context.reset();
26     }

 邏輯清晰先調用register方法注冊分支事務,然后調用被代理連接的commit方法提交事務,所以提交事務還是原連接的邏輯,核心是register方法的注冊邏輯,邏輯如下:

1 private void register() throws TransactionException {
2         if (!context.hasUndoLog() || !context.hasLockKey()) {
3             return;
4         }
5         /** 調用資源管理器的branchRegister方法注冊分支事務 */
6         Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
7                 null, context.getXid(), null, context.buildLockKeys());
8         context.setBranchId(branchId);
9     }

調用了資源管理器的branchRegister方法注冊分支事務,最終調用AbstractResourceManager的branchRegister方法,源碼如下:

 1 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
 2         try {
 3             /** 創建注冊分支請求消息對象*/
 4             BranchRegisterRequest request = new BranchRegisterRequest();
 5             request.setXid(xid);
 6             request.setLockKey(lockKeys);
 7             request.setResourceId(resourceId);
 8             request.setBranchType(branchType);
 9             request.setApplicationData(applicationData);
10             /** 發送注冊分支消息給TC,並獲取注冊分支的結果,得到分支ID*/
11             BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
12             if (response.getResultCode() == ResultCode.Failed) {
13                 throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
14             }
15             return response.getBranchId();
16         } catch (TimeoutException toe) {
17             throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
18         } catch (RuntimeException rex) {
19             throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
20         }
21     }

邏輯比較簡單,就是創建注冊分支請求的消息發送給TC服務器,然后獲取注冊分支事務的結果得到分支事務ID直接返回即可。TC服務器處理分支事務注冊核心邏輯最終調用AbstractCore的branchRegister方法,邏輯如下:

 1 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
 2                                String applicationData, String lockKeys) throws TransactionException {
 3         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
 4         return SessionHolder.lockAndExecute(globalSession, () -> {
 5             globalSessionStatusCheck(globalSession);
 6             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
 7             /** 創建分支會話 */
 8             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
 9                     applicationData, lockKeys, clientId);
10             /** 日志打印*/
11             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
12             branchSessionLock(globalSession, branchSession);
13             try {
14                 /** 全局會話添加分支會話*/
15                 globalSession.addBranch(branchSession);
16             } catch (RuntimeException ex) {
17                 branchSessionUnlock(branchSession);
18                 throw new BranchTransactionException(FailedToAddBranch, String
19                         .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
20                                 branchSession.getBranchId()), ex);
21             }
22             if (LOGGER.isInfoEnabled()) {
23                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
24                         globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
25             }
26             /** 返回分支會話的分支ID*/
27             return branchSession.getBranchId();
28         });
29     }

 

全局會話GlobalSession內部采用ArrayList<BranchSession> branchSessions存儲分支會話記錄,並且通過持久化的方式記錄添加分支的記錄

5.1.11、分支事務提交

分支事務提交的消息編碼為MessageType.TYPE_BRANCH_COMMIT = 4,有興趣同學可以直接搜索對應的處理器即可,大致交互流程差不多。

5.1.12、全局事務提交

全局事務的提交由TM來負責,由5.1.8.2可以最終都TransactionTemplate的commitTransaction方法實現,邏輯如下:

 1 private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
 2         try {
 3             /** 觸發提交事務前置處理*/
 4             triggerBeforeCommit();
 5             /** 全局事務提交*/
 6             tx.commit();
 7             /** 觸發提交事務后置處理*/
 8             triggerAfterCommit();
 9         } catch (TransactionException txe) {
10             // 4.1 Failed to commit
11             throw new TransactionalExecutor.ExecutionException(tx, txe,
12                     TransactionalExecutor.Code.CommitFailure);
13         }
14     }

由DefaultlGlobalTransaction的commit實現,邏輯如下:

 1 public void commit() throws TransactionException {
 2         /** 提交事務必須是Launcher角色,不可用是Participant角色*/
 3         if (role == GlobalTransactionRole.Participant) {
 4             return;
 5         }
 6         assertXIDNotNull();
 7         int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
 8         try {
 9             while (retry > 0) {
10                 try {
11                     /** 調用TransactionManager的commit方法提交事務*/
12                     status = transactionManager.commit(xid);
13                     break;
14                 } catch (Throwable ex) {
15                     LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
16                     retry--;
17                     if (retry == 0) {
18                         throw new TransactionException("Failed to report global commit", ex);
19                     }
20                 }
21             }
22         } finally {
23             if (xid.equals(RootContext.getXID())) {
24                 suspend();
25             }
26         }
27     }

最終到DefaultTransactionManager的commit方法實現,邏輯如下:

1 public GlobalStatus commit(String xid) throws TransactionException {
2         GlobalCommitRequest globalCommit = new GlobalCommitRequest();
3         globalCommit.setXid(xid);
4        /** 發送全局事務提交請求,接收全局事務提交結果,返回提交狀態*/
5         GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
6         return response.getGlobalStatus();
7     }

實際就是由TM向TC發送一個提交全局事務的請求,獲取TC返回的結果;同理全局事務的回滾邏輯也是TM向TC發送一個全局事務回滾的請求,獲取TC返回的結果即可。

而TC接收到提交事務的通知之后,從當前持久化日志中獲取全局事務信息,並獲取到所有分支事務信息,遍歷通知分支事務提交事務。

全局事務回滾的邏輯和全局事務提交的邏輯如出一轍。

5.1.13、心跳

客戶端和服務器保持着心跳保證互相確認對方沒有掉線,實現原理就是通過Netty的讀寫空閑檢測機制實現,客戶端發現寫空閑時間超過配置的超時時間就會主動發送心跳消息給服務器來保活。

5.2、Seata的整體工作流程

總結整體實現流程如下:

5.2.1、初始化過程

1、啟動Seata Server,Seata Server是一個Netty服務器,默認監聽2181端口

2、應用項目啟動,通過配置的GlobalTransacationScanner對象,在初始化該bean的時候執行init方法,分別初始化TM客戶端和RM客戶端,分別是TmNettyRemotingClient和RmNettyRemotingClient對象

3、TM客戶端和RM客戶端和TC服務器保持通信,並通過心跳機制保持有效長連接

4、RM初始化的時候會將自己管理的資源信息注冊給TC服務器,也就是負責的數據源信息,TC采用Map<Channel,RpcContext>存儲TM和RM的客戶端信息

5、客戶端和服務器初始化時都會注冊監聽的事件編號並綁定對應的事件處理器

6、通過代理模式分別給數據源DataSource創建代理對象DataSourceProxy,獲取的Connection對象為代理對象ConnectionProxy

5.2.2、開啟事務流程

1、通過GlobalTransactionScanner給被@GlobalTransactional注解修飾的方法創建動態代理,創建了一個方法攔截器GlobalTransactionInterceptor對象

2、被@GlobalTransactional注解修飾的方法執行時走方法攔截器GlobalTransactionalInterceptor對象的invoke方法處理增強邏輯

3、根據事務傳播機制選擇是否走事務,如果走事務就開啟全局事務,由當前的TM向TC發送開啟全局事務的請求

4、TC接收開啟事務請求,根據配置決定采用哪種持久化方式,可以采用Redis、DB、File等方式持久化全局事務,並返回全局事務ID

5、TM獲取到全局事務ID,存入上下文RootContext中,本質是通過ThreadLocal存儲

6、TM對應業務方調用分布式服務將XID傳遞過去,Dubbo采用RpcContext、HTTP采用Header的方式

7、RM對應業務方被調用,獲取XID,先注冊RM到TC,然后執行本地事務並提交,並且記錄undoLog到數據庫,

8、RM將本地事務提交結果通過brancheReport上報給TC

9、TC接收分支上報結果,先更新全局事務和分支事務狀態,然后進行持久化

5.2.3、提交事務流程

1、如果業務執行成功,TM創建全局事務提交請求並發送給TC

2、TC接收全局事務提交請求,更新持久化的全局事務狀態

3、TC有個定時任務,發送刪除UndoLog的請求給所有RM用於清理指定時間段內的UndoLog

4、RM接收到刪除UndoLog請求,刪除指定時間段內的UndoLog,默認是刪除超過7天的

5.2.4、回滾事務流程

1、如果業務執行失敗,TM創建全局事務回滾請求並發送給TC

2、TC接收全局事務回滾請求,更新持久化的全局事務狀態

3、TC根據全局事務獲取全部分支事務,遍歷通知分支事務回滾事務,僅通知分支事務提交成功的,對於分支事務提交失敗的直接刪除;

4、分支事務從數據庫查詢undoLog日志,並執行undoLog日志中的命令進行回滾操作

5、分支事務回滾成功返回分支回滾狀態給TC服務器

6、回滾成功則TC刪除分支事務,

7、如果回滾失敗則會將回滾任務加入隊列重試回滾

5.2.5、整體交互流程圖

5.2.6、實現細節梳理

1、服務器注冊TM和RM的邏輯處理?

業務系統啟動時通過GlobalTransactionScanner初始化方法afterPropertiesSet,初始化TMClient和RMClient
實際是分別創建兩個Netty客戶端TmNettyRemotingClient和RmNettyRemotingClient對象和TC服務器創建連接,並分別發送TM和RM注冊的指令給TC
TC處理TM和RM注冊的邏輯主要如下:
1.創建RpcContext對象封裝TM客戶端;
2.創建客戶端Channel和RpcContext存儲已經認證的<Channel,RpcContext> IDENTIFIED_CHANNELS集合中緩存;
3.客戶端認證ID為 applicationId:clientIp
4.TC采用ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS 存儲所有TM信息,key是客戶端認證ID,value是端口號和RpcContext的關聯信息
5.TC采用ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS 存儲所有RM信息
存儲RM信息的Map有多層,每一層的key依次為resourceId -> applicationId -> ip -> port -> RpcContext
6.所以TM和RM注冊的過程實際就是在緩存集合中存儲對應的RpcContext的過程

2、服務器注冊全局事務的邏輯處理?

被@GlobalTransactional注解修飾的方法執行時會被方法攔截器攔截,會發起全局事務開啟請求,獲取服務器返回的XID,綁定到全局上下文RootContext中,
全局上下文RootContext采用key為TX_XID存儲XID,實際就是采用ThreadLocal<Map<key,value>>形式存儲
服務器接收TM發起全局事務請求之后處理邏輯如下:
1.創建GlobalSession對象表示全局事務
2.根據持久化配置選擇redis、file、DB來存儲全局事務信息
3.持久化全局事務信息包括XID、status、transactionId、transactionName、transactionServiceGroup、beginTime、applicationId等
4.XID=ipAddress:port:transactionId
5.返回全局事務開啟結果,返回XID

3、服務器注冊分支事務的邏輯處理?

1.服務器接收分支注冊請求,先鎖住全局事務會話GlobalSession
2.根據GlobalSession創建分支事務會話BranchSession
3.GlobalSession采用ArrayList存儲branchSession
4.將分支事務信息存儲到分支事務表中
5.返回分支事務注冊結果並返回分支事務ID

4、分支事務上報狀態的邏輯處理?

Seata采用DataSourceProxy代理DataSource,並通過ConnectionProxy代理Connection,並PreparedStatement也采用了代理PreparedStatementProxy,
分支執行數據庫操作時從上下文RootContext獲取XID,如果存在全局事務XID,則執行完SQL語句之后,執行ConnectionProxy的commit方法,commit方法邏輯如下:
1.ConnectionProxy會先注冊分支事務,獲取分支事務ID;
2.記錄undoLog存儲在數據庫
3.然后提交本地事務
4.上報本地事務提交的狀態
5.TC服務器更新分支事務上報的狀態

5、undoLog的使用?

本地事務執行完成成會注冊分支事務,並且本地記錄undoLog,當全局事務提交成功,TC會采用定時任務通知RM刪除undoLog,如果全局事務回滾,TC會通知RM進行本地事務回滾,本地事務從數據庫中查詢undoLog進行數據回滾
undoLog日志清理策略有三種:
1、TC服務器會創建定時任務每天通知一次所以RM刪除7天以前的undoLog;
2、RM有定時任務1秒執行一次,刪除所有二階段提交成功的分支事務相關的undoLog
3、分支事務回滾后執行完undo邏輯之后會刪除undoLog

6、TC如何存儲全局事務信息?

TC可以采用redis、file、DB等方式存儲全局事務xixi

7、全局事務XID如何傳遞?

業務發起方發起全局事務獲取全局事務XID,調用RM服務時,會通過請求將XID傳遞給服務提供者,服務提供者通過Filter或Interceptor獲取XID並通過ThreadLocal存在本地的RootContext上下文中
如Dubbo采用RpcContext傳遞,HTTP請求采用Header傳遞

8、異常回滾的邏輯處理?

8.1、分支事務正常,業務系統異常導致回滾?

業務系統TM會發起全局事務回滾請求給TC,TC再遍歷所有提交成功的分支事務,發生分支事務回滾通知

8.2、分支事務異常導致全局事務回滾?

分支事務異常會上報狀態給TC,當TM發起全局事務回滾時,TC遍歷所有提交成功的分支事務,發生分支事務回滾通知;執行失敗的分支事務不需要再進行回滾;

8.3、全局事務回滾時,分支事務回滾失敗如果解決?

分支事務有undoLog記錄,當分支事務回滾失敗時,TC會每秒重試通知一次回滾

8.4、全局事務回滾時,TC異常斷開如何恢復?

TC重啟之后會再次通知需要回滾的分支事務進行回滾

8.5、全局事務回滾時,分支事務相關數據已經被修改如何回滾?

當分支事務需要回滾時,如果此時數據以及被其他事務修改,那么此時就會回滾失敗,提示當前數據和期望數據不一致。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM