95--分布式事務六-Seata TCC模式-Spring Cloud微服務案例(添加TCC事務)


分布式事務(六)Seata TCC模式-介紹以及案例

TCC 基本原理

TCC 與 Seata AT 事務一樣都是兩階段事務,它與 AT 事務的主要區別為:

  • TCC 對業務代碼侵入嚴重
    每個階段的數據操作都要自己進行編碼來實現,事務框架無法自動處理。
  • TCC 效率更高
    不必對數據加全局鎖,允許多個事務同時操作數據。

a

准備訂單項目案例

新建 seata-tcc 工程

新建 Empty Project:

導入訂單項目,無事務版本

下載項目代碼

  1. 訪問 git 倉庫 https://gitee.com/benwang6/seata-samples
  2. 訪問項目標簽
  3. 下載無事務版本的,然后解壓到seta-tcc工程下

導入項目

  • 在 idea 中按兩下 shift 鍵,搜索 add maven projects,打開 maven 工具:
  • 然后選擇 seata-tcc 工程目錄下的 7 個項目的 pom.xml 導入:

order啟動全局事務,添加“保存訂單”分支事務

在訂單項目中執行添加訂單:

a

添加以下 TCC 事務操作的代碼:

  • Try - 第一階,凍結數據階段,向訂單表直接插入訂單,訂單狀態設置為0(凍結狀態)。

a

  • Confirm - 第二階段,提交事務,將訂單狀態修改成1(正常狀態)。

a

  • Cancel - 第二階段,回滾事務,刪除訂單。

a

order-parent 添加 seata 依賴

<!-- 打開 seata 依賴 -->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-alibaba-seata</artifactId>
          <version>${spring-cloud-alibaba-seata.version}</version>
          <exclusions>
            <exclusion>
              <artifactId>seata-all</artifactId>
              <groupId>io.seata</groupId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>io.seata</groupId>
          <artifactId>seata-all</artifactId>
          <version>${seata.version}</version>
        </dependency>

配置

application.yml

設置全局事務組的組名:

spring:
  application:
    name: order

  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost/seata_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
    username: root
    password: 1234

  # 事務組設置
  cloud:
    alibaba:
      seata:
        tx-service-group: order_tx_group

......


registry.conf 和 file.conf

registry.conf 文件配置

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    # application = "default"
    # weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    cluster = "default"
    timeout = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

file.conf文件配置

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  # order_tx_group 與 yml 中的 “tx-service-group: order_tx_group” 配置一致
  # “seata-server” 與 TC 服務器的注冊名一致
  # 從eureka獲取seata-server的地址,再向seata-server注冊自己,設置group
  vgroupMapping.order_tx_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  order_tx_group.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

OrderMapper 添加更新訂單狀態、刪除訂單

根據前面的分析,訂單數據操作有以下三項:

  • 插入訂單 ---新插入的訂單是凍結狀態的。
  • 修改訂單狀態 ----二階段提交訂單的時候,將訂單的狀態修改為正常。
  • 刪除訂單 ---當事務回滾的時候,刪除已經創建的訂單。

在 OrderMapper 中已經有插入訂單的方法了,現在需要添加修改訂單和刪除訂單的方法(刪除方法從BaseMapper繼承):

package cn.tedu.order.mapper;

import cn.tedu.order.entity.Order;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;


public interface OrderMapper extends BaseMapper {
    void create(Order order);
    //修改訂單狀態 
    void updateStatus(@Param("orderId") Long orderId, @Param("status") Integer status);
    //刪除訂單的方法通過BaseMapper中的方法
}

OrderMapper.xml 中添加 sql:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.order.mapper.OrderMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.order.entity.Order" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="user_id" property="userId" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="count" property="count" jdbcType="INTEGER" />
        <result column="money" property="money" jdbcType="DECIMAL" />
        <result column="status" property="status" jdbcType="INTEGER" />
    </resultMap>
    <insert id="create">
        INSERT INTO `order` (`id`,`user_id`,`product_id`,`count`,`money`,`status`)
        VALUES(#{id}, #{userId}, #{productId}, #{count}, #{money},#{status});
    </insert>
    <update id="updateStatus" >
        UPDATE `order` SET `status`=#{status} WHERE `id`=#{orderId};
    </update>
    <delete id="deleteById">
        DELETE FROM `order` WHERE `id`=#{orderId}
    </delete>

</mapper>

Seata 實現訂單的 TCC 操作方法

  • 第一階段 Try
  • 第二階段
    • Confirm
    • Cancel

第二階段為了處理冪等性問題這里首先添加一個工具類 ResultHolder

這個工具也可以在第二階段 Confirm 或 Cancel 階段對第一階段的成功與否進行判斷,在第一階段成功時需要保存一個標識。

冪等性:就是用戶對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點擊而產生了了副作用。舉個最簡單的例子,那就是支付,用戶購買商品后支付,支付扣款成功,但是返回結果的時候網絡異常,此時錢已經扣了,用戶再次點擊按鈕,此時會進行第二次扣款,返回結果成功,用戶查詢余額發現多扣了錢,流水記錄也變成了兩條。這就是冪等性問題

ResultHolder可以為每一個全局事務保存一個標識:

package cn.tedu.order.tcc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ResultHolder {
    private static Map<Class<?>, Map<String, String>> map = new ConcurrentHashMap<Class<?>, Map<String, String>>();

    public static void setResult(Class<?> actionClass, String xid, String v) {
        Map<String, String> results = map.get(actionClass);

        if (results == null) {
            synchronized (map) {
                if (results == null) {
                    results = new ConcurrentHashMap<>();
                    map.put(actionClass, results);
                }
            }
        }

        results.put(xid, v);
    }

    public static String getResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            return results.get(xid);
        }

        return null;
    }

    public static void removeResult(Class<?> actionClass, String xid) {
        Map<String, String> results = map.get(actionClass);
        if (results != null) {
            results.remove(xid);
        }
    }
}


Seata 實現 TCC 操作需要定義一個接口,在接口中添加以下方法:

  • Try - prepareCreateOrder() --方法的名稱可以根據實際業務指定
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.order.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface OrderTccAction {

    /*
    第一階段的方法
    通過注解指定第二階段的兩個方法名

    BusinessActionContext 上下文對象,用來在兩個階段之間傳遞數據
    @BusinessActionContextParameter 注解的參數數據會被存入 BusinessActionContext
     */
    @TwoPhaseBusinessAction(name = "orderTccAction", commitMethod = "commit", rollbackMethod = "rollback")
    boolean prepareCreateOrder(BusinessActionContext businessActionContext,
       @BusinessActionContextParameter(paramName = "orderId") Long orderId,
       @BusinessActionContextParameter(paramName = "userId") Long userId,
       @BusinessActionContextParameter(paramName = "productId") Long productId,
       @BusinessActionContextParameter(paramName = "count") Integer count,
       @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    // 第二階段 - 提交
    boolean commit(BusinessActionContext businessActionContext);

    // 第二階段 - 回滾
    boolean rollback(BusinessActionContext businessActionContext);

}

實現類:

package cn.tedu.order.tcc;

import cn.tedu.order.entity.Order;
import cn.tedu.order.mapper.OrderMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Component
@Slf4j
public class OrderTccActionImpl implements OrderTccAction {

    @Autowired
    private OrderMapper orderMapper;
    @Transactional
    @Override
    public boolean prepareCreateOrder(BusinessActionContext businessActionContext, Long orderId, Long userId, Long productId, Integer count, BigDecimal money) {
        //插入訂單 設置狀態為0 --凍結
        orderMapper.create(new Order(orderId,userId,productId,count,money,0));
        log.info("創建訂單第一階段 凍結訂單成功");
        //第一階段成功 添加一個標識
        ResultHolder.setResult(OrderTccAction.class,businessActionContext.getXid(),"p");
        return true;
    }
    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
       //判斷第一階段的成功標記,沒有標記則不執行提交操作
        if (ResultHolder.getResult(OrderTccAction.class,businessActionContext.getXid())==null){
            return true;
        }


        //修改訂單狀態 0---》1 正常狀態的轉變
        //通過訂單id
        Long orderId = Long.parseLong(businessActionContext.getActionContext("orderId").toString());
        orderMapper.updateStatus(orderId,1);
        log.info("創建訂單第二階段 提交訂單 解凍成功");

        //刪除標識 防止一直重復提交
        ResultHolder.removeResult(OrderTccAction.class,businessActionContext.getXid());

        return true;
    }
    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        //第一階段沒有完成的情況下,不必執行回滾
        //因為第一階段有本地事務,事務失敗時已經進行了回滾。
        //如果這里第一階段成功,而其他全局事務參與者失敗,這里會執行回滾
        //冪等性控制:如果重復執行回滾則直接返回
        log.info("創建 order 第二階段回滾,刪除訂單 - "+businessActionContext.getXid());
        //判斷第一階段的成功標記,沒有標記則不執行提交操作
        if (ResultHolder.getResult(OrderTccAction.class,businessActionContext.getXid())==null){
            return true;
        }

        //通過訂單id
        Long orderId = Long.parseLong(businessActionContext.getActionContext("orderId").toString());
        orderMapper.deleteById(orderId);
        log.info("創建訂單第二階段 回滾訂單 刪除成功");
        //刪除標識 防止一直重復提交
        ResultHolder.removeResult(OrderTccAction.class,businessActionContext.getXid());

        return true;
    }
}

在業務代碼中調用 Try 階段方法

業務代碼中不再直接保存訂單數據,而是調用 TCC 第一階段方法prepareCreateOrder(),並添加全局事務注解 @GlobalTransactional

package cn.tedu.order.service;

import cn.tedu.order.entity.Order;
import cn.tedu.order.feign.AccountClient;
import cn.tedu.order.feign.EasyIdGeneratorClient;
import cn.tedu.order.feign.StorageClient;
import cn.tedu.order.mapper.OrderMapper;
import cn.tedu.order.tcc.OrderTccAction;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Random;

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    EasyIdGeneratorClient easyIdGeneratorClient;
    @Autowired
    private AccountClient accountClient;
    @Autowired
    private StorageClient storageClient;

    @Autowired
    private OrderTccAction orderTccAction;

    @GlobalTransactional //開啟全局事務
    @Override
    public void create(Order order) {
        // 從全局唯一id發號器獲得id
        Long orderId = easyIdGeneratorClient.nextId("order_business");
        order.setId(orderId);
        //不再直接執行數據庫操作
//        orderMapper.create(order);
        //調用TCC 第一階段方法
        // JDK代理調用的 AOP 生成動態代理對象,攔截器會自動創建context 對象
        orderTccAction.prepareCreateOrder(null,
                order.getId(),
                order.getUserId(),
                order.getProductId(),
                order.getCount(),
                order.getMoney());

        // 修改庫存
        //storageClient.decrease(order.getProductId(), order.getCount());

        // 修改賬戶余額
       // accountClient.decrease(order.getUserId(), order.getMoney());

    }
}

啟動 order 進行測試

按順序啟動服務:

  1. Eureka
  2. Seata Server ---該服務需要單獨啟動
  3. Easy Id Generator
  4. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

觀察控制台日志:

a

查看數據庫表中的訂單數據:

a

storage添加“減少庫存”分支事務

在庫存項目中執行減少庫存:

a

我們要添加以下 TCC 事務操作的代碼:

  • Try - 第一階,凍結數據階段,將要減少的庫存量先凍結:

a

  • Confirm - 第二階段,提交事務,使用凍結的庫存完成業務數據處理:

a

  • Cancel - 第二階段,回滾事務,凍結的庫存解凍,恢復以前的庫存量:

a

配置

有三個文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

以上三個配置文件和order相應的配置文件完全相同

StorageMapper 添加凍結庫存相關方法

根據前面的分析,庫存數據操作有以下三項:

  • 凍結庫存
  • 凍結庫存量修改為已售出量
  • 解凍庫存

在 StorageMapper 中添加三個方法:

package cn.tedu.storage.mapper;

import cn.tedu.storage.entity.Storage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

public interface StorageMapper extends BaseMapper<Storage> {

    void decrease(Long productId, Integer count);

    // 凍結庫存
    void updateFrozen(@Param("productId") Long productId, @Param("residue") Integer residue, @Param("frozen") Integer frozen);
    // 提交時,把凍結量修改到已售出
    void updateFrozenToUsed(@Param("productId") Long productId, @Param("count") Integer count);
    // 回滾時,把凍結量修改到可用庫存
    void updateFrozenToResidue(@Param("productId") Long productId, @Param("count") Integer count);

}

StorageMapper.xml文件的修改配置

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.storage.mapper.StorageMapper" >
    <resultMap id="BaseResultMap" type="cn.tedu.storage.entity.Storage" >
        <id column="id" property="id" jdbcType="BIGINT" />
        <result column="product_id" property="productId" jdbcType="BIGINT" />
        <result column="total" property="total" jdbcType="INTEGER" />
        <result column="used" property="used" jdbcType="INTEGER" />
        <result column="residue" property="residue" jdbcType="INTEGER" />
    </resultMap>
    <update id="decrease">
      UPDATE storage SET used = used + #{count},residue = residue - #{count} WHERE product_id = #{productId}
    </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM storage WHERE `product_id`=#{productId}
    </select>

    <update id="updateFrozen">
        UPDATE storage SET `residue`=#{residue},`frozen`=#{frozen} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `used`=`used`+#{count} WHERE `product_id`=#{productId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE storage SET `frozen`=`frozen`-#{count}, `residue`=`residue`+#{count} WHERE `product_id`=#{productId}
    </update>
</mapper>
 

Seata 實現庫存的 TCC 操作方法

工具類 ResultHolder

復制order服務中的ResultHolder方法即可

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseStorage()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.storage.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface StorageTccAction {
    @TwoPhaseBusinessAction(name = "storageTccAction")
    boolean prepareDecreaseStorage(BusinessActionContext businessActionContext,
            @BusinessActionContextParameter(paramName = "productId") Long productId,
            @BusinessActionContextParameter(paramName = "count") Integer count
    );
    boolean commit(BusinessActionContext businessActionContext);
    boolean rollback(BusinessActionContext businessActionContext);


}

實現類

package cn.tedu.storage.tcc;

import cn.tedu.storage.entity.Storage;
import cn.tedu.storage.mapper.StorageMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Slf4j
public class StorageTccActionImpl implements StorageTccAction {

    @Autowired
    private StorageMapper storageMapper;

    @Transactional
    @Override
    public boolean prepareDecreaseStorage(BusinessActionContext businessActionContext, Long productId, Integer count) {

        log.info("減少庫存第一階段 開始執行凍結操作");
      //先查詢庫存 看是否可以凍結
        Storage storage = storageMapper.selectById(productId);
        if (storage.getResidue()<count){
            throw new  RuntimeException("庫存量不足,無法完成凍結操作");
        }
        //凍結操作
        storageMapper.updateFrozen(storage.getProductId(),storage.getResidue()-count,storage.getFrozen()+count);

        //添加標記
        ResultHolder.setResult(StorageTccAction.class,businessActionContext.getXid(),"p");
        log.info("減少庫存第一階段 執行凍結操作成功");

        return true;
    }
    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        if (ResultHolder.getResult(StorageTccAction.class,businessActionContext.getXid())==null){
            return  true;
        }
        log.info("減少庫存第二階段 開始執行提交操作");

        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());

        storageMapper.updateFrozenToUsed(productId,count);
        log.info("減少庫存第二階段 執行提交操作完畢");

        //刪除標識
        ResultHolder.removeResult(StorageTccAction.class, businessActionContext.getXid());
        return false;
    }

    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        //防止重復的回滾
        if (ResultHolder.getResult(StorageTccAction.class,businessActionContext.getXid())==null){
            return  true;
        }
        log.info("減少庫存第二階段 執行回滾操作");
        long productId = Long.parseLong(businessActionContext.getActionContext("productId").toString());
        int count = Integer.parseInt(businessActionContext.getActionContext("count").toString());
        storageMapper.updateFrozenToResidue(productId,count);
        log.info("減少庫存第二階段 執行回滾操作完畢");
        //刪除標識
        ResultHolder.removeResult(StorageTccAction.class, businessActionContext.getXid());
        return true;
    }
}

在業務代碼中調用 Try 階段方法

業務代碼中調用 TCC 第一階段方法prepareDecreaseStorage(),並添加全局事務注解 @GlobalTransactional

package cn.tedu.storage.service;

import cn.tedu.storage.mapper.StorageMapper;
import cn.tedu.storage.tcc.StorageTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StorageServiceImpl implements StorageService {
    @Autowired
    private StorageMapper storageMapper;
    @Autowired
    private StorageTccAction storageTccAction;


    @Override
    public void decrease(Long productId, Integer count) throws Exception {
//        storageMapper.decrease(productId,count);
        //調用TCC的第一階段方法
        storageTccAction.prepareDecreaseStorage(null,productId,count);
    }
}

啟動 storage 進行測試

按順序啟動服務:

  1. Eureka
  2. Seata Server
  3. Easy Id Generator
  4. Storage
  5. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

觀察 storage 的控制台日志:
a

查看數據庫表中的庫存數據:

a

account添加“扣減金額”分支事務

第一階段 Try

以賬戶服務為例,當下訂單時要扣減用戶賬戶金額:

a

假如用戶購買 100 元商品,要扣減 100 元。

TCC 事務首先對這100元的扣減金額進行預留,或者說是先凍結這100元:

a

第二階段 Confirm

如果第一階段能夠順利完成,那么說明“扣減金額”業務(分支事務)最終肯定是可以成功的。當全局事務提交時, TC會控制當前分支事務進行提交,如果提交失敗,TC 會反復嘗試,直到提交成功為止。

當全局事務提交時,就可以使用凍結的金額來最終實現業務數據操作:
a

第二階段 Cancel

如果全局事務回滾,就把凍結的金額進行解凍,恢復到以前的狀態,TC 會控制當前分支事務回滾,如果回滾失敗,TC 會反復嘗試,直到回滾完成為止。

a

多個事務並發的情況

多個TCC全局事務允許並發,它們執行扣減金額時,只需要凍結各自的金額即可:

a

配置

有三個文件需要配置:

  • application.yml
  • registry.conf
  • file.conf

以上三個文件的設置與上面 order 項目的配置完全相同。

AccountMapper 添加凍結金額相關方法

根據前面的分析,庫存數據操作有以下三項:

  • 凍結金額
  • 凍結金額修改為已使用量
  • 解凍金額

在 AccountMapper 中添加三個方法:

package cn.tedu.account.mapper;

import cn.tedu.account.entity.Account;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;

import java.math.BigDecimal;

public interface AccountMapper extends BaseMapper<Account> {
    void decrease(Long userId, BigDecimal money);
    void updateFrozen(@Param("userId") Long userId, @Param("residue") BigDecimal residue, @Param("frozen") BigDecimal frozen);

    void updateFrozenToUsed(@Param("userId") Long userId, @Param("money") BigDecimal money);

    void updateFrozenToResidue(@Param("userId") Long userId, @Param("money") BigDecimal money);
}

AccountMapper.xml文件編寫

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="cn.tedu.account.mapper.AccountMapper">
    <resultMap id="BaseResultMap" type="cn.tedu.account.entity.Account">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="user_id" property="userId" jdbcType="BIGINT"/>
        <result column="total" property="total" jdbcType="DECIMAL"/>
        <result column="used" property="used" jdbcType="DECIMAL"/>
        <result column="residue" property="residue" jdbcType="DECIMAL"/>
        <result column="frozen" property="frozen" jdbcType="DECIMAL"/>
    </resultMap>
    <update id="decrease">
        UPDATE account SET residue = residue - #{money},used = used + #{money} where user_id = #{userId};
    </update>
    <select id="selectById" resultMap="BaseResultMap">
        SELECT * FROM account WHERE `user_id`=#{userId}
    </select>

    <update id="updateFrozen">
        UPDATE account SET `residue`=#{residue},`frozen`=#{frozen} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToUsed">
        UPDATE account SET `frozen`=`frozen`-#{money}, `used`=`used`+#{money} WHERE `user_id`=#{userId}
    </update>

    <update id="updateFrozenToResidue">
        UPDATE account SET `frozen`=`frozen`-#{money}, `residue`=`residue`+#{money} WHERE `user_id`=#{userId}
    </update>
</mapper>

Seata 實現庫存的 TCC 操作方法

工具類 ResultHolder

復制order服務中的ResultHolder方法即可

添加 TCC 接口,在接口中添加以下方法:

  • Try - prepareDecreaseAccount()
  • Confirm - commit()
  • Cancel - rollback()
package cn.tedu.account.tcc;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

import java.math.BigDecimal;

@LocalTCC
public interface AccountTccAction {
    @TwoPhaseBusinessAction(name = "accountTccAction")
    boolean prepareDecreaseAccount(BusinessActionContext businessActionContext,
               @BusinessActionContextParameter(paramName = "userId") Long userId,
               @BusinessActionContextParameter(paramName = "money")BigDecimal money

               );

    boolean commit(BusinessActionContext businessActionContext);

    boolean rollback(BusinessActionContext businessActionContext);

}

實現類

package cn.tedu.account.tcc;

import cn.tedu.account.entity.Account;
import cn.tedu.account.mapper.AccountMapper;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;
@Component
@Slf4j
public class AccountTccActionImpl implements AccountTccAction {
    @Autowired
    private AccountMapper accountMapper;
    @Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("扣減金額第一階段 ,開始執行凍結金額操作");
        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money)<0){
            throw  new  RuntimeException("可用金額不足,金額凍結失敗");
        }
        //執行凍結操作
        accountMapper.updateFrozen(userId,account.getResidue().subtract(money),account.getFrozen().add(money));
        if (Math.random()<0.5){
            throw new RuntimeException("模擬異常");
        }

        //創建標識
        ResultHolder.setResult(getClass(),businessActionContext.getXid(),"p");
        log.info("扣減金額第一階段 ,執行凍結金額完成");
        return false;
    }
    @Transactional
    @Override
    public boolean commit(BusinessActionContext businessActionContext) {
        log.info("扣減金額第二階段 ,開始執行提交操作");
        //防止重復提交
        if (ResultHolder.getResult(getClass(),businessActionContext.getXid())==null){
            return  true;
        }

        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());

        BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());


        accountMapper.updateFrozenToUsed(userId,money);
        //刪除標識
        ResultHolder.removeResult(getClass(),businessActionContext.getXid());
        return false;
    }
    @Transactional
    @Override
    public boolean rollback(BusinessActionContext businessActionContext) {
        log.info("扣減金額第二階段 ,開始執行回滾操作");
        //防止重復回滾
        if (ResultHolder.getResult(getClass(),businessActionContext.getXid())==null){
            return  true;
        }
        long userId = Long.parseLong(businessActionContext.getActionContext("userId").toString());

        BigDecimal money = new BigDecimal(businessActionContext.getActionContext("money").toString());

        accountMapper.updateFrozenToResidue(userId,money);
        //刪除標識
        ResultHolder.removeResult(getClass(),businessActionContext.getXid());
        log.info("扣減金額第二階段 ,執行回滾操作完成");
        return false;
    }
}

在業務代碼中調用 Try 階段方法

業務代碼中調用 TCC 第一階段方法prepareDecreaseAccount(),並添加全局事務注解 @GlobalTransactional

package cn.tedu.account.service;

import cn.tedu.account.mapper.AccountMapper;
import cn.tedu.account.tcc.AccountTccAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
@Service
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountMapper accountMapper;

    @Autowired
    private AccountTccAction accountTccAction;

    @Override
    public void decrease(Long userId, BigDecimal money) {
//        accountMapper.decrease(userId,money);
        //調用TCC 第一階段的方法
        accountTccAction.prepareDecreaseAccount(null,userId,money);
    }
}

啟動 account 進行測試

按順序啟動服務:

  1. Eureka
  2. Seata Server ---需要單獨啟動服務器
  3. Easy Id Generator
  4. Storage
  5. Account
  6. Order

調用保存訂單,地址:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

觀察 account 的控制台日志:
a

查看數據庫表中的賬戶數據:

a

全局事務回滾

首先在 account 的第一階段代碼中添加模擬異常:

AccountTccActionImplprepareDecreaseAccount 方法

@Transactional
    @Override
    public boolean prepareDecreaseAccount(BusinessActionContext businessActionContext, Long userId, BigDecimal money) {
        log.info("扣減金額第一階段 ,開始執行凍結金額操作");
        Account account = accountMapper.selectById(userId);
        if (account.getResidue().compareTo(money)<0){
            throw  new  RuntimeException("可用金額不足,金額凍結失敗");
        }
        //執行凍結操作
        accountMapper.updateFrozen(userId,account.getResidue().subtract(money),account.getFrozen().add(money));
        if (Math.random()<0.5){
            throw new RuntimeException("模擬異常");
        }

        //創建標識
        ResultHolder.setResult(getClass(),businessActionContext.getXid(),"p");
        log.info("扣減金額第一階段 ,執行凍結金額完成");
        return false;
    }

重啟 account 后,訪問訂單:
http://localhost:8083/create?userId=1&productId=1&count=10&money=100

查看控制台,可以看到 storage 和 order 的回滾日志,order 的回滾日志如下:

a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM