seata 源碼解析(圖解_秒懂_史上最全)


文章很長,而且持續更新,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈(總入口) 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分布式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鍾看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
11: 分布式事務( 圖解 + 史上最全 + 吐血推薦 ) 12:seata AT模式實戰(圖解+秒懂+史上最全)
13:seata 源碼解讀(圖解+秒懂+史上最全) 14:seata TCC模式實戰(圖解+秒懂+史上最全)

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多線程面試題(史上最全) 30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

seata AT模式源碼解讀( 圖解+秒懂+史上最全)

閱讀此文之前,請先閱讀 :

分布式事務( 圖解 + 史上最全 + 吐血推薦 )

seata AT模式實戰(圖解+秒懂+史上最全)

參考鏈接
系統架構知識圖譜(一張價值10w的系統架構知識圖譜)

https://www.processon.com/view/link/60fb9421637689719d246739

秒殺系統的架構

https://www.processon.com/view/link/61148c2b1e08536191d8f92f

seata基礎知識

Seata 中有三大模塊,分別是 TM、RM 和 TC。其中 TM 和 RM 是作為 Seata 的客戶端與業務系統集成在一起,TC 作為 Seata 的服務端獨立部署。

角色划分:
TM:

事務管理,開啟、提交、回滾分布式事務
RM:

資源管理,注冊、匯報、執資源,負責接收TC發過來的提交、回滾消息,並作出提交,回滾操作
TC:

事務管理器服務功能,存儲事務日志、補償異常事務等、集中管理事務全局鎖(全局行鎖)

事務執行整體流程:

圖解:Seata AT分布式事務的執行流程

先從官網借一張圖,回顧AT模式的角色和流程
at模式的整體流程

  1. TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
  2. 按業務場景,編排數據庫、服務等事務內資源(RM 向 TC 匯報資源准備狀態);
  3. TM 結束分布式事務,事務一階段結束(TM 通知 TC 提交 / 回滾分布式事務);
  4. TC 匯報事務信息,決定分布式事務是提交還是回滾;
  5. TC 通知所有 RM 提交 / 回滾資源,事務二階段結束。

AT 模式對應於阿里雲的全局事務服務(Global Transaction Service,簡稱 GTS)。

分布式事務的執行流程整體圖

在這里插入圖片描述

10WQPS秒殺的分布式事務 執行流程

在這里插入圖片描述

TM&RM啟動

springboot-starter 啟動

spring.factories

在這里插入圖片描述

SeataAutoConfiguration


@ComponentScan(
    basePackages = {"io.seata.spring.boot.autoconfigure.properties"}
)
@ConditionalOnProperty(
    prefix = "seata",
    name = {"enabled"},
    havingValue = "true",
    matchIfMissing = true
)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    public SeataAutoConfiguration() {
    }

    @Bean({"springApplicationContextProvider"})
    @ConditionalOnMissingBean(
        name = {"springApplicationContextProvider"}
    )
    public SpringApplicationContextProvider springApplicationContextProvider() {
        return new SpringApplicationContextProvider();
    }

    @Bean({"failureHandler"})
    @ConditionalOnMissingBean({FailureHandler.class})
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    @Bean
    @DependsOn({"springApplicationContextProvider", "failureHandler"})
    @ConditionalOnMissingBean({GlobalTransactionScanner.class})
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }

        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

    @Bean({"seataAutoDataSourceProxyCreator"})
    @ConditionalOnProperty(
        prefix = "seata",
        name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"},
        havingValue = "true",
        matchIfMissing = true
    )
    @ConditionalOnMissingBean({SeataAutoDataSourceProxyCreator.class})
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(), seataProperties.getExcludesForAutoProxying());
    }
}

配置事務分組名稱

這里有一個配置項SeataProperties,用於配置事務分組名稱,即讀取如下配置:

@EnableConfigurationProperties(SpringCloudAlibabaConfiguration.class)
public class SeataProperties {

    public String getTxServiceGroup() {
        if (txServiceGroup == null) {
            txServiceGroup = springCloudAlibabaConfiguration.getTxServiceGroup();
        }
        return txServiceGroup;
    }

SpringCloudAlibabaConfiguration 如何加載分組

  • 首先查找配置的分組名稱
  • 沒有,則使用默認的分組名稱
/**
 * The type Spring cloud alibaba configuration.
 *
 * @author slievrly
 */
@ConfigurationProperties(prefix = StarterConstants.SEATA_SPRING_CLOUD_ALIBABA_PREFIX)
public class SpringCloudAlibabaConfiguration implements ApplicationContextAware {


    /**
     * Gets tx service group.
     *
     * @return the tx service group
     */
    public String getTxServiceGroup() {
     if (txServiceGroup == null) {
     String applicationId = getApplicationId();
     if (applicationId == null) {
     LOGGER.warn("{} is null, please set its value", SPRING_APPLICATION_NAME_KEY);
     }
     txServiceGroup = applicationId + DEFAULT_SPRING_CLOUD_SERVICE_GROUP_POSTFIX;
     }
     return txServiceGroup;
    }

如果沒有配置,則使用spring.application.name+ -seata-service-group生成一個名稱,
所以如果不配置spring.application.name啟動會報錯

上面用到的常量 ,seata的配置前綴

public static final String SEATA_SPRING_CLOUD_ALIBABA_PREFIX = "spring.cloud.alibaba.seata";

有了applicationId, txServiceGroup之后則創建一個io.seata.spring.annotation.GlobalTransactionScanner對象,主要看它的initClient()

GlobalTransactionScanner 初始化

通過Spring 自動調用的 InitializingBean 的 生命周期函數 afterPropertiesSet 初始化


 */
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,
    DisposableBean {

    private static final long serialVersionUID = 1L;

    private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionScanner.class);

    private static final int AT_MODE = 1;
    private static final int MT_MODE = 2;

    private static final int ORDER_NUM = 1024;
    private static final int DEFAULT_MODE = AT_MODE + MT_MODE;

    private static final Set<String> PROXYED_SET = new HashSet<>();

    private MethodInterceptor interceptor;

    private final String applicationId;
    private final String txServiceGroup;
    private final int mode;
    private final boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(
     ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);

    private final FailureHandler failureHandlerHook;

   

    /**
     * Instantiates a new Global transaction scanner.
     *
     * @param applicationId     the application id
     * @param txServiceGroup     the tx service group
     * @param mode     the mode
     * @param failureHandlerHook the failure handler hook
     */
    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode,
     FailureHandler failureHandlerHook) {
     setOrder(ORDER_NUM);
     setProxyTargetClass(true);
     this.applicationId = applicationId;
     this.txServiceGroup = txServiceGroup;
     this.mode = mode;
     this.failureHandlerHook = failureHandlerHook;
    }

    @Override
    public void destroy() {
     ShutdownHook.getInstance().destroyAll();
    }

    private void initClient() {
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Initializing Global Transaction Clients ... ");
     }
     if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
     throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
     }
     //init TM   //init TM register TM success
     TMClient.init(applicationId, txServiceGroup);
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
     }
     //init RM
     RMClient.init(applicationId, txServiceGroup);
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
     }

     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Global Transaction Clients are initialized. ");
     }


     //注冊鈎子事件,封裝銷毀操作
     registerSpringShutdownHook();

    }

    private void registerSpringShutdownHook() {
     if (applicationContext instanceof ConfigurableApplicationContext) {
     ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
     ShutdownHook.removeRuntimeShutdownHook();
     }
     ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
     ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
    }

  
    private boolean existsAnnotation(Class<?>[] classes) {
     if (CollectionUtils.isNotEmpty(classes)) {
     for (Class<?> clazz : classes) {
     if (clazz == null) {
     continue;
     }
     Method[] methods = clazz.getMethods();
     for (Method method : methods) {
     GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
     if (trxAnno != null) {
     return true;
     }

     GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
     if (lockAnno != null) {
     return true;
     }
     }
     }
     }
     return false;
    }

    private MethodDesc makeMethodDesc(GlobalTransactional anno, Method method) {
     return new MethodDesc(anno, method);
    }

    @Override
    protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
     throws BeansException {
     return new Object[]{interceptor};
    }
    //InitializingBean實現方法,spring自動調用
    @Override
    public void afterPropertiesSet() {
     if (disableGlobalTransaction) {
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Global transaction is disabled.");
     }
     return;
     }

     //初始化
     initClient();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
     this.applicationContext = applicationContext;
     this.setBeanFactory(applicationContext);
    }
}

可以看到初始化了TMClientRMClient,所以對於一個服務既可以是TM角色也可以是RM角色,至於什么時候是TM或者RM則要看在一次全局事務中@GlobalTransactional注解標注在哪。

TMClient初始化

TM的一個作用就是開啟全局事務,實際應用時在需要開啟事務的方法上加注解@GlobalTransactional,TMClient初始化主要完成以下三件事:

  • 創建連接池
  • 創建客戶端Netty,並啟動
  • 創建並啟動用於檢測的線程池

public class TMClient {

    /**
     * Init.
     *
     * @param applicationId     the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) {
     TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
     tmRpcClient.init();
    }

}

獲取 Netty RPC實例

TM和RM的初始化,初始化話的工作重點:就是連接TC的過程。

public final class TmRpcClient extends AbstractRpcRemotingClient {
 /**
     * Gets instance.
     *
     * @return the instance
     */
    public static TmRpcClient getInstance() {
     if (null == instance) {
     synchronized (TmRpcClient.class) {
     if (null == instance) {
     NettyClientConfig nettyClientConfig = new NettyClientConfig();
     final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
     nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
     KEEP_ALIVE_TIME, TimeUnit.SECONDS,
     new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
     new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
     nettyClientConfig.getClientWorkerThreads()),
     RejectedPolicies.runsOldestTaskPolicy());
     instance = new TmRpcClient(nettyClientConfig, null, messageExecutor);
     }
     }
     }
     return instance;
    }
    

初始化 Netty RPC實例 TmRpcClient

 @Sharable
public final class TmRpcClient extends AbstractRpcRemotingClient {


    @Override
    public void init() {
     if (initialized.compareAndSet(false, true)) {
     enableDegrade = CONFIG.getBoolean(ConfigurationKeys.SERVICE_PREFIX + ConfigurationKeys.ENABLE_DEGRADE_POSTFIX);
     super.init();
     }
    }

Netty RPC客戶端的繼承關系

在這里插入圖片描述

AbstractRpcRemotingClient發起連接

在這里插入圖片描述

1)啟動ScheduledExecutorService定時執行器,每10秒嘗試進行一次重連TC

2)重連時,先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name)

3)再根據集群名稱找到fescar-server集群ip端口列表

4)從ip列表中選擇一個用netty進行連接

 @Override
    public void init() {
     clientBootstrap.setChannelHandlers(new ClientHandler());
     clientBootstrap.start();

     //啟動ScheduledExecutorService定時執行器,每10  秒嘗試進行一次重連TC
     timerExecutor.scheduleAtFixedRate(new Runnable() {
     @Override
     public void run() {
     clientChannelManager.reconnect(getTransactionServiceGroup());
     }
     }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
     if (NettyClientConfig.isEnableClientBatchSendRequest()) {
     //用於多數據合並,減少通信次數
     mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
     MAX_MERGE_SEND_THREAD,
     KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<>(),
     new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
     mergeSendExecutorService.submit(new MergedSendRunnable());
     }
     super.init();
    }

RpcClientBootstrap#setChannelHandlers

上面的客戶端,調用了 引導類(啟動類) 的設置 處理器

clientBootstrap.setChannelHandlers(new ClientHandler());

pcClientBootstrap的方法


   protected void setChannelHandlers(ChannelHandler... handlers) {
     if (null != handlers) {
     this.channelHandlers = handlers;
     }

    }

基礎的處理器:

 @Sharable
    class ClientHandler extends AbstractHandler {
     ClientHandler() {
     super(AbstractRpcRemotingClient.this);
     }

     public void dispatch(RpcMessage request, ChannelHandlerContext ctx) {
     if (AbstractRpcRemotingClient.this.clientMessageListener != null) {
     String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
     AbstractRpcRemotingClient.this.clientMessageListener.onMessage(request, remoteAddress);
     }

     }

     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     if (msg instanceof RpcMessage) {
     RpcMessage rpcMessage = (RpcMessage)msg;
     if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
     if (AbstractRpcRemotingClient.LOGGER.isDebugEnabled()) {
     AbstractRpcRemotingClient.LOGGER.debug("received PONG from {}", ctx.channel().remoteAddress());
     }

     } else if (rpcMessage.getBody() instanceof MergeResultMessage) {
     MergeResultMessage results = (MergeResultMessage)rpcMessage.getBody();
     MergedWarpMessage mergeMessage = (MergedWarpMessage)AbstractRpcRemotingClient.this.mergeMsgMap.remove(rpcMessage.getId());

     for(int i = 0; i < mergeMessage.msgs.size(); ++i) {
     int msgId = (Integer)mergeMessage.msgIds.get(i);
     MessageFuture future = (MessageFuture)AbstractRpcRemotingClient.this.futures.remove(msgId);
     if (future == null) {
     if (AbstractRpcRemotingClient.LOGGER.isInfoEnabled()) {
     AbstractRpcRemotingClient.LOGGER.info("msg: {} is not found in futures.", msgId);
     }
     } else {
     future.setResultMessage(results.getMsgs()[i]);
     }
     }

     } else {
     super.channelRead(ctx, msg);
     }
     }
     }

     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     if (!AbstractRpcRemotingClient.this.messageExecutor.isShutdown()) {
     if (AbstractRpcRemotingClient.LOGGER.isInfoEnabled()) {
     AbstractRpcRemotingClient.LOGGER.info("channel inactive: {}", ctx.channel());
     }

     AbstractRpcRemotingClient.this.clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));
     super.channelInactive(ctx);
     }
     }

RpcClientBootstrap.start()方法

最后我們看一下clientBootstrap.start()方法:

就是使用本地的配置來初始化netty的bootstrap。這些配置在file.conf這個文件中。


    @Override
    public void start() {

     //defaultEventExecutorGroup初始化
     if (this.defaultEventExecutorGroup == null) {
     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
     new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
     nettyClientConfig.getClientWorkerThreads()));
     }

     //對連接的配置
     this.bootstrap.group(this.eventLoopGroupWorker).channel(
     nettyClientConfig.getClientChannelClazz()).option(
     ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
     ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
     ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,
     nettyClientConfig.getClientSocketRcvBufSize());

     if (nettyClientConfig.enableNative()) {
     if (PlatformDependent.isOsx()) {
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("client run on macOS");
     }
     } else {
     //非mac系統則配置epoll模式/ 邊沿觸發 /和TCP快速確認機制
     //邊沿觸發 ,請參見尼恩 視頻  selector 底層原理
     //當TCP套接口的ACK策略處於QUICKACK模式時,意味着TCP套接口將嘗試立即回復對端ACK確認報文。
     bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED)
     .option(EpollChannelOption.TCP_QUICKACK, true);
     }
     }

     bootstrap.handler(
     new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) {
     ChannelPipeline pipeline = ch.pipeline();
     pipeline.addLast(
     new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
     nettyClientConfig.getChannelMaxWriteIdleSeconds(),
     nettyClientConfig.getChannelMaxAllIdleSeconds()))
     .addLast(new ProtocolV1Decoder())
     .addLast(new ProtocolV1Encoder());
     if (null != channelHandlers) {
     addChannelPipelineLast(ch, channelHandlers);
     }
     }
     });

     if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
     LOGGER.info("RpcClientBootstrap has started");
     }
    }

如果文件類型的配置,netty的配置,在file.conf里邊

在這里插入圖片描述

在這里插入圖片描述

getTransactionServiceGroup()

SeataAutoConfiguration

@ComponentScan(
    basePackages = {"io.seata.spring.boot.autoconfigure.properties"}
)
@ConditionalOnProperty(
    prefix = "seata",
    name = {"enabled"},
    havingValue = "true",
    matchIfMissing = true
)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
....

    @Bean
    @DependsOn({"springApplicationContextProvider", "failureHandler"})
    @ConditionalOnMissingBean({GlobalTransactionScanner.class})
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Automatically configure Seata");
     }

     return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

  
}

seataProperties.getApplicationId(),

seataProperties.getTxServiceGroup()

@ConfigurationProperties(prefix = StarterConstants.SEATA_SPRING_CLOUD_ALIBABA_PREFIX)
public class SpringCloudAlibabaConfiguration implements ApplicationContextAware {

    private static final Logger LOGGER = LoggerFactory.getLogger(SpringCloudAlibabaConfiguration.class);
    private static final String SPRING_APPLICATION_NAME_KEY = "spring.application.name";
    private static final String DEFAULT_SPRING_CLOUD_SERVICE_GROUP_POSTFIX = "-seata-service-group";
    private String applicationId;
    private String txServiceGroup;
    private ApplicationContext applicationContext;

public class StarterConstants {
    private static final int MAP_CAPACITY = 64;
    public static final String SEATA_PREFIX = "seata";
    public static final String SEATA_SPRING_CLOUD_ALIBABA_PREFIX = "spring.cloud.alibaba.seata";

注意:上面方法中的2個參數正是來自我們服務中的application.yml文件,代碼如下:


spring:
    application:
     name: seata-seckill-demo
    cloud:
     alibaba:
     seata:
     tx-service-group: my_test_tx_group

初始化 GlobalTransactionScanner

    public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) {
     this.disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean("service.disableGlobalTransaction", false);
     this.setOrder(1024);
     this.setProxyTargetClass(true);
     this.applicationId = applicationId;
     this.txServiceGroup = txServiceGroup;
     this.mode = mode;
     this.failureHandlerHook = failureHandlerHook;
    }

再傳遞給 TMClient 、RMClient

 //init TM   //init TM register TM success
     TMClient.init(applicationId, txServiceGroup);
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
     }
     //init RM
     RMClient.init(applicationId, txServiceGroup);

NettyClientChannelManager 的reconnect 方法

我們首先看一下上面的clientChannelManager.reconnect方法

這個方法在一個定時執行器中,會定時去執行。這段代碼在NettyClientChannelManager類,

  void reconnect(String transactionServiceGroup) {
     List<String> availList = null;
     try {
     availList = getAvailServerList(transactionServiceGroup);
     } catch (Exception e) {
     LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
     return;
     }
     if (CollectionUtils.isEmpty(availList)) {
     String serviceGroup = RegistryFactory.getInstance()
     .getServiceGroup(transactionServiceGroup);
     LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
     return;
     }
     for (String serverAddress : availList) {
     try {
     acquireChannel(serverAddress);
     } catch (Exception e) {
     LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
     }
     }
    }

getAvailServerList

上面的getAvailServerList是通過transactionServiceGroup這個屬性,來查找seata-server集群地址列表,。

邏輯就是通過key (group name )拼接出vgroupMapping.group name,然后找到這個屬性值(default),表示默認的集群,然后去默認的nacos集群,查找seata-server服務列表。

    private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {
     List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance()
     .lookup(transactionServiceGroup);
     if (CollectionUtils.isEmpty(availInetSocketAddressList)) {
     return Collections.emptyList();
     }

     return availInetSocketAddressList.stream()
     .map(NetUtil::toStringAddress)
     .collect(Collectors.toList());
    }
    
    default String getServiceGroup(String key) {
     Configuration config = ConfigurationFactory.getInstance();
     return config.getConfig("service.vgroupMapping." + key);
    }

在這里插入圖片描述

拿到default之后,再根據這個,和seata-server, 去 nacos 獲取 seata-server服務列表

充分體現nacos 注冊中心的特點。

在這里插入圖片描述

NettyClientChannelManager 的acquireChannel方法

上面獲取的availList(seata-server集群地址列表)如果不空,則調用方法acquireChannel。acquireChannel方法首先判斷連接是否存在,不存在,則創建連接:

  void reconnect(String transactionServiceGroup) {
     List<String> availList = null;
     try {
     availList = getAvailServerList(transactionServiceGroup);
     } catch (Exception e) {
     LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);
     return;
     }
     if (CollectionUtils.isEmpty(availList)) {
     String serviceGroup = RegistryFactory.getInstance()
     .getServiceGroup(transactionServiceGroup);
     LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);
     return;
     }
     for (String serverAddress : availList) {
     try {
     acquireChannel(serverAddress);
     } catch (Exception e) {
     LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);
     }
     }
    }
   /**
     * Acquire netty client channel connected to remote server.
     *
     * @param serverAddress server address
     * @return netty channel
     */
    Channel acquireChannel(String serverAddress) {
     Channel channelToServer = channels.get(serverAddress);

     //當前  channel  已經存在連接,直接返回
     if (channelToServer != null) {
     channelToServer = getExistAliveChannel(channelToServer, serverAddress);
     if (null != channelToServer) {
     return channelToServer;
     }
     }
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("will connect to " + serverAddress);
     }
     channelLocks.putIfAbsent(serverAddress, new Object());
     synchronized (channelLocks.get(serverAddress)) {
     return doConnect(serverAddress);
     }
    }

NettyClientChannelManager 的doConnect

通過nettyClientKeyPool.borrowObject方法就是從連接池中獲取一個連接,seata在這里使用的連接池是commons-pool,

 
    private Channel doConnect(String serverAddress) {
     Channel channelToServer = channels.get(serverAddress);
     if (channelToServer != null && channelToServer.isActive()) {
     return channelToServer;
     }
     Channel channelFromPool;
     try {
     NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);
     NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);
     
     //TM和RM的初始化流程都要走這段代碼,如果是RM,則要set一下ResourceIds,還記得這個嗎?看下面RM部分的講解
     if (null != previousPoolKey && previousPoolKey.getMessage() instanceof RegisterRMRequest) {
     RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();
     ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());
     }
     channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));
     channels.put(serverAddress, channelFromPool);
     } catch (Exception exx) {
     LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);
     throw new FrameworkException("can not register RM,err:" + exx.getMessage());
     }
     return channelFromPool;
    }

上面nettyClientKeyPool.borrowObject方法就是從連接池中獲取一個連接,seata在這里使用的連接池是commons-pool,可以看看 commons-pool 的源碼。

AbstractNettyRemoting的init方法

回到 AbstractRpcRemotingClient

1)啟動ScheduledExecutorService定時執行器,每10秒嘗試進行一次重連TC

2)重連時,先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name)

3)再根據集群名稱找到fescar-server集群ip端口列表

4)從ip列表中選擇一個用netty進行連接

public abstract class AbstractRpcRemotingClient extends AbstractRpcRemoting
    implements RegisterMsgListener, ClientMessageSender {
    @Override
    public void init() {
     clientBootstrap.start();
     //啟動ScheduledExecutorService定時執行器,每5秒嘗試進行一次重連TC
     timerExecutor.scheduleAtFixedRate(new Runnable() {
     @Override
     public void run() {
     clientChannelManager.reconnect(getTransactionServiceGroup());
     }
     }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
     if (NettyClientConfig.isEnableClientBatchSendRequest()) {
     //用於多數據合並,減少通信次數
     mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
     MAX_MERGE_SEND_THREAD,
     KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<>(),
     new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
     mergeSendExecutorService.submit(new MergedSendRunnable());
     }
     super.init();
    }
    }

super.init()方法,這個方法在父類AbstractNettyRemoting,代碼如下:

    /**
     * Init.
     */
    public void init() {
     timerExecutor.scheduleAtFixedRate(new Runnable() {
     @Override
     public void run() {
     for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
     if (entry.getValue().isTimeout()) {
     futures.remove(entry.getKey());
     entry.getValue().setResultMessage(null);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
     }
     }
     }

     nowMills = System.currentTimeMillis();
     }
     }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
    }

這個方法非常簡單,定時任務不斷檢測消息發送結果,如果是超時3秒,則移除消息,然后把消息結果置為空。

所有的異步發送的消息,都放在 futures:


    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
     throws TimeoutException {
     if (channel == null) {
     LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
     return null;
     }
     final RpcMessage rpcMessage = new RpcMessage();
     rpcMessage.setId(getNextMessageId());
     rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
     rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
     rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
     rpcMessage.setBody(msg);

     final MessageFuture messageFuture = new MessageFuture();
     messageFuture.setRequestMessage(rpcMessage);
     messageFuture.setTimeout(timeout);
     futures.put(rpcMessage.getId(), messageFuture);

     if (address != null) {
     /*
     The batch send.
     Object From big to small: RpcMessage -> MergedWarpMessage -> AbstractMessage
     @see AbstractRpcRemotingClient.MergedSendRunnable
     */
     if (NettyClientConfig.isEnableClientBatchSendRequest()) {
     ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
     BlockingQueue<RpcMessage> basket = map.get(address);
     if (basket == null) {
     map.putIfAbsent(address, new LinkedBlockingQueue<>());
     basket = map.get(address);
     }
     basket.offer(rpcMessage);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("offer message: {}", rpcMessage.getBody());
     }
     if (!isSending) {
     synchronized (mergeLock) {
     mergeLock.notifyAll();
     }
     }
     } else {
     // the single send.
     sendSingleRequest(channel, msg, rpcMessage);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("send this msg[{}] by single send.", msg);
     }
     }
     } else {
     sendSingleRequest(channel, msg, rpcMessage);
     }
     if (timeout > 0) {
     try {
     return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (Exception exx) {
     LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), address, msg);
     if (exx instanceof TimeoutException) {
     throw (TimeoutException) exx;
     } else {
     throw new RuntimeException(exx);
     }
     }
     } else {
     return null;
     }
    }

RM初始化

RM的客戶端初始化

RM的初始化跟TM基本一樣,我們從RMClient.init(applicationId, txServiceGroup)方法講起

RMClient.init(applicationId, txServiceGroup)

public class RMClient {

    /**
     * Init.
     *
     * @param applicationId     the application id
     * @param transactionServiceGroup the transaction service group
     */
    public static void init(String applicationId, String transactionServiceGroup) {
     RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);

     //資源管理器ResourceManager

     rmRpcClient.setResourceManager(DefaultResourceManager.get());

     //消息回調監聽器,rmHandler用於接收TC在二階段發出的提交或者回滾請求
     rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get(), rmRpcClient));
     rmRpcClient.init();
    }

}

DefaultResourceManager 的Spi 實現

此處用到了seata Spi拓展機制,可插拔

public class DefaultResourceManager implements ResourceManager {

    /**
     * all resource managers
     */
    protected static Map<BranchType, ResourceManager> resourceManagers
     = new ConcurrentHashMap<>();

    private DefaultResourceManager() {
     initResourceManagers();
    }

    /**
     * Get resource manager.
     *
     * @return the resource manager
     */
    public static DefaultResourceManager get() {
     return SingletonHolder.INSTANCE;
    }


    protected void initResourceManagers() {
     //init all resource managers
     List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
     if (CollectionUtils.isNotEmpty(allResourceManagers)) {
     for (ResourceManager rm : allResourceManagers) {
     resourceManagers.put(rm.getBranchType(), rm);
     }
     }
    }

根據Seata Spi 加載ResourceManager實現類

io.seata.core.model.ResourceManager

io.seata.rm.datasource.DataSourceManager
io.seata.rm.datasource.xa.ResourceManagerXA

在這里插入圖片描述

ResourceManager是seata的重要組件之一,RM負責管理分支數據資源的事務。

它接口定義如下,實現ResourceManagerInbound以及ResourceManagerOutbound接口

public interface ResourceManager extends ResourceManagerInbound, ResourceManagerOutbound {

   // 注冊一個resource至事務管理器上
   void registerResource(Resource resource);

   // 從事務管理器上取消注冊一個resource
   void unregisterResource(Resource resource);

   // 獲取所有管理的resource
   // @return resourceId -> Resource Map
   Map<String, Resource> getManagedResources();

   // 獲取此事務管理器的分支類型,有AT自動和TCC手動類型
   BranchType getBranchType();
}

ResourceManagerInbound接口提供給TC進行rpc調用的方法

public interface ResourceManagerInbound {

   // TM通知RM提交事務
   BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;

   // TM通知RM回滾事務
   BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
}

提供rpc請求至TC

public interface ResourceManagerOutbound {

   // 請求注冊分支resource
   Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws
     TransactionException;

   // 報告分支狀態
   void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData) throws TransactionException;

   // 鎖住query
   boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
     throws TransactionException;
}

AbstractResourceManager

AbstractResourceManager實現ResourceManager提供模板方法

public abstract class AbstractResourceManager implements ResourceManager {

// 創建BranchRegisterRequest請求,通過RmRpcClient客戶端使用netty進行rpc調用,請求至TC,返回唯一的分支Id數據,
// 超時或報錯拋出TransactionException
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
try {
    BranchRegisterRequest request = new BranchRegisterRequest();
    request.setXid(xid);
    request.setLockKey(lockKeys);
    request.setResourceId(resourceId);
    request.setBranchType(branchType);
    request.setApplicationData(applicationData);

    BranchRegisterResponse response = (BranchRegisterResponse) RmRpcClient.getInstance().sendMsgWithResponse(request);
    if (response.getResultCode() == ResultCode.Failed) {
     throw new TransactionException(response.getTransactionExceptionCode(), "Response[" + response.getMsg() + "]");
    }
    return response.getBranchId();
} catch (TimeoutException toe) {
    throw new TransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
    throw new TransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
}
}

// 創建BranchReportRequest請求,通過RmRpcClient客戶端使用netty進行rpc調用,請求至TC,返回唯一的分支Id數據,
// 超時或報錯拋出TransactionException
@Override
public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData) throws TransactionException {
try {
    BranchReportRequest request = new BranchReportRequest();
    request.setXid(xid);
    request.setBranchId(branchId);
    request.setStatus(status);
    request.setApplicationData(applicationData);

    BranchReportResponse response = (BranchReportResponse) RmRpcClient.getInstance().sendMsgWithResponse(request);
    if (response.getResultCode() == ResultCode.Failed) {
     throw new TransactionException(response.getTransactionExceptionCode(), "Response[" + response.getMsg() + "]");
    }
} catch (TimeoutException toe) {
    throw new TransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
} catch (RuntimeException rex) {
    throw new TransactionException(TransactionExceptionCode.BranchReportFailed, "Runtime", rex);
}
}

// 默認返回false
public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys) throws TransactionException {
return false;
}

// 需子類實現
public void unregisterResource(Resource resource) {
throw new NotSupportYetException("unregister a resource");
}

// 調用RmRpcClient客戶端,創建netty連接,進行rpc調用注冊至全局tc
public void registerResource(Resource resource) {
RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}
}

DefaultResourceManager

DefaultResourceManager是虛擬的ResourceManager,適配所有的ResourceManager,所有方法調用都委派給對應負責的ResourceManager處理。

public class DefaultResourceManager implements ResourceManager {

   // 所有的ResourceManager緩存
   protected static Map<BranchType, ResourceManager> resourceManagers
     = new ConcurrentHashMap<>();
   // 構造方法初始化
   private DefaultResourceManager() {
     initResourceManagers();
   }
   // 單例模式
   public static DefaultResourceManager get() {
     return SingletonHolder.INSTANCE;
   }
   public static void mockResourceManager(BranchType branchType, ResourceManager rm) {
     resourceManagers.put(branchType, rm);
   }
   // 初始化加載所有的ResourceManager,此處目前只有DataResourceManager和TCCResourceManager
   protected void initResourceManagers() {
     //init all resource managers
     List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
     if (CollectionUtils.isNotEmpty(allResourceManagers)) {
     for (ResourceManager rm : allResourceManagers) {
     resourceManagers.put(rm.getBranchType(), rm);
     }
     }
   }
   @Override
   public BranchStatus branchCommit(BranchType branchType, String xid, long branchId,
     String resourceId, String applicationData)
     throws TransactionException {
     return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);
   }
   @Override
   public BranchStatus branchRollback(BranchType branchType, String xid, long branchId,
     String resourceId, String applicationData)
     throws TransactionException {
     return getResourceManager(branchType).branchRollback(branchType, xid, branchId, resourceId, applicationData);
   }
   @Override
   public Long branchRegister(BranchType branchType, String resourceId,
     String clientId, String xid, String applicationData, String lockKeys)
     throws TransactionException {
     return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,
     lockKeys);
   }
   @Override
   public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
     String applicationData) throws TransactionException {
     getResourceManager(branchType).branchReport(branchType, xid, branchId, status, applicationData);
   }
   @Override
   public boolean lockQuery(BranchType branchType, String resourceId,
     String xid, String lockKeys) throws TransactionException {
     return getResourceManager(branchType).lockQuery(branchType, resourceId, xid, lockKeys);
   }
   @Override
   public void registerResource(Resource resource) {
     getResourceManager(resource.getBranchType()).registerResource(resource);
   }
   @Override
   public void unregisterResource(Resource resource) {
     getResourceManager(resource.getBranchType()).unregisterResource(resource);
   }
   @Override
   public Map<String, Resource> getManagedResources() {
     Map<String, Resource> allResource = new HashMap<String, Resource>();
     for (ResourceManager rm : resourceManagers.values()) {
     Map<String, Resource> tempResources = rm.getManagedResources();
     if (tempResources != null) {
     allResource.putAll(tempResources);
     }
     }
     return allResource;
   }
   public ResourceManager getResourceManager(BranchType branchType) {
     ResourceManager rm = resourceManagers.get(branchType);
     if (rm == null) {
     throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
     }
     return rm;
   }
   private static class SingletonHolder {
     private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
   }
}

DataSourceManager

DataSourceManager繼承AbstractResourceManager,管理數據庫自動resouce的注冊,提交以及回滾等

public class DataSourceManager extends AbstractResourceManager implements Initialize {

   private ResourceManagerInbound asyncWorker;
   private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
   public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
     this.asyncWorker = asyncWorker;
   }

   @Override
   public boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)
     throws TransactionException {
     try {
    // 創建全球鎖GlobalLockQueryRequest
     GlobalLockQueryRequest request = new GlobalLockQueryRequest();
     request.setXid(xid);
     request.setLockKey(lockKeys);
     request.setResourceId(resourceId);
     
     GlobalLockQueryResponse response = null;
    // 如果當前線程context已經是在全球事務處理中,則發送請求
     if (RootContext.inGlobalTransaction()) {
     response = (GlobalLockQueryResponse) RmRpcClient.getInstance().sendMsgWithResponse(request);
     } else if (RootContext.requireGlobalLock()) {
    // 或則開啟了本地事務控制,能夠獲取到本地線程事務對象,進行負載均衡發送請求
     response = (GlobalLockQueryResponse) RmRpcClient.getInstance().sendMsgWithResponse(loadBalance(),
     request, NettyClientConfig.getRpcRequestTimeout());
     } else {
     throw new RuntimeException("unknow situation!");
     }

     if (response.getResultCode() == ResultCode.Failed) {
     throw new TransactionException(response.getTransactionExceptionCode(),
     "Response[" + response.getMsg() + "]");
     }
     return response.isLockable();
     } catch (TimeoutException toe) {
     throw new TransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
     } catch (RuntimeException rex) {
     throw new TransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);
     }

   }

   // 負載均衡,獲取注冊中心的所有socket地址列表,返回負載均衡下的address
   private String loadBalance() {
     InetSocketAddress address = null;
     try {
     List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(
     TmRpcClient.getInstance().getTransactionServiceGroup());
     address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);
     } catch (Exception ignore) {
     LOGGER.error(ignore.getMessage());
     }
     if (address == null) {
     throw new FrameworkException(NoAvailableService);
     }
     return NetUtil.toStringAddress(address);
   }

   public DataSourceManager() {
   }

   // 實例化異步處理器,提供異步刪除undo日志的方法
   public void init() {
     AsyncWorker asyncWorker = new AsyncWorker();
     asyncWorker.init();
     initAsyncWorker(asyncWorker);
   }

   // 注冊DataSourceProxy resource,放入緩存,同時告知TC進行注冊
   public void registerResource(Resource resource) {
     DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
     dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
     super.registerResource(dataSourceProxy);
   }

   // 根據resourceId獲取數據庫的DataSource
   public DataSourceProxy get(String resourceId) {
     return (DataSourceProxy) dataSourceCache.get(resourceId);
   }

   // 提交成功,調用asyncWorker提交成功
   public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
     return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
   }

   // 事務回滾
   public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
     DataSourceProxy dataSourceProxy = get(resourceId);
     if (dataSourceProxy == null) {
     throw new ShouldNeverHappenException();
     }
     try {
    // 委派給UndoLogManager回滾已經提交的數據,將當前resouce的dataSourceProxy傳入參數
     UndoLogManager.undo(dataSourceProxy, xid, branchId);
     } catch (TransactionException te) {
     if (LOGGER.isInfoEnabled()){
     LOGGER.info("branchRollback failed reason [{}]", te.getMessage());
     }
     if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
     return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
     } else {
     return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
     }
     }
     return BranchStatus.PhaseTwo_Rollbacked;

   }
   @Override
   public Map<String, Resource> getManagedResources() {
     return dataSourceCache;
   }

   // 此為AT自動模式管理器
   public BranchType getBranchType() {
     return BranchType.AT;
   }
}

異步AsyncWorker

采用異步方式,提高效率

DataSourceManager事務提交委派給AsyncWorker進行異步提交的。
因為都成功了,無需回滾成功的數據,只需要刪除生成的操作日志就行,采用異步方式,提高效率。

public class AsyncWorker implements ResourceManagerInbound {


private static ScheduledExecutorService timerExecutor;

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
     String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
  LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
     + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}

// 初始化
public synchronized void init() {
LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
// 創建定時器,每一秒定時doBranchCommits
timerExecutor = new ScheduledThreadPoolExecutor(1,
  new NamedThreadFactory("AsyncWorker", 1, true));
timerExecutor.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
     try {
     doBranchCommits();
     } catch (Throwable e) {
     LOGGER.info("Failed at async committing ... " + e.getMessage());

     }
  }
}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

// 分支提交具體方法
private void doBranchCommits() {
if (ASYNC_COMMIT_BUFFER.size() == 0) {
  return;
}
// 獲取需要2步執行的數據Phase2Context,並根據ResourceId進行分類
Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
  Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
  List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
  if (contextsGroupedByResourceId == null) {
     contextsGroupedByResourceId = new ArrayList<>();
     mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
  }
  contextsGroupedByResourceId.add(commitContext);
}

// 遍歷Map.Entry<String, List<Phase2Context>>
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
  Connection conn = null;
  try {
     try {
// 獲取DataSourceManager
     DataSourceManager resourceManager = (DataSourceManager)DefaultResourceManager.get()
     .getResourceManager(BranchType.AT);
// 更加resourceId獲取DataSourceProxy
     DataSourceProxy dataSourceProxy = resourceManager.get(entry.getKey());
     if (dataSourceProxy == null) {
     throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
     }
// 創建連接
     conn = dataSourceProxy.getPlainConnection();
     } catch (SQLException sqle) {
     LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
     continue;
     }
    // 將緩存中的xid和branchId放入數組set中
     List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
     Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
     Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
    // 獲取當前resourceId下需要執行的commitContext
     for (Phase2Context commitContext : contextsGroupedByResourceId) {
     xids.add(commitContext.xid);
     branchIds.add(commitContext.branchId);
     int maxSize = xids.size() > branchIds.size() ? xids.size() : branchIds.size();
    // 如果xid或branchId數組set中有一個等於批量操作1000條,就調用批量刪除
     if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
     try {
     // 調用UndoLogManager刪除日志
     UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
     } catch (Exception ex) {
     LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
     }
     xids.clear();
     branchIds.clear();
     }
     }

     if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
     return;
     }
     // 批量刪除最后不滿1000的數據
     try {
     UndoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
     } catch (Exception ex) {
     LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
     }

  } finally {
     if (conn != null) {
     try {
     conn.close();
     } catch (SQLException closeEx) {
     LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
     }
     }
  }
}
}

// 不支持回滾
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
     String applicationData) throws TransactionException {
throw new NotSupportYetException();

}
}

UndoLogManager

UndoLogManager批量刪除undo_log表中日志的邏輯,創建sql,然后批量設置參數,最后批量執行

public static void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException {
int xidSize = xids.size();
int branchIdSize = branchIds.size();
String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
PreparedStatement deletePST = null;
try {
    deletePST = conn.prepareStatement(batchDeleteSql);
    int paramsIndex = 1;
    for (Long branchId : branchIds) {
     deletePST.setLong(paramsIndex++, branchId);
    }
    for (String xid : xids) {
     deletePST.setString(paramsIndex++, xid);
    }
    int deleteRows = deletePST.executeUpdate();
    if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("batch delete undo log size " + deleteRows);
    }
} catch (Exception e) {
    if (!(e instanceof SQLException)) {
     e = new SQLException(e);
    }
    throw (SQLException)e;
} finally {
    if (deletePST != null) {
     deletePST.close();
    }
}

}

Rm netty Channel 啟動

1)啟動ScheduledExecutorService定時執行器,每5秒嘗試進行一次重連TC

2)重連時,先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name)

3)再根據集群名稱找到fescar-server集群ip端口列表

4)從ip列表中選擇一個用netty進行連接

@Sharable
public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {


   public static RmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {
     RmNettyRemotingClient rmNettyRemotingClient = getInstance();
     rmNettyRemotingClient.setApplicationId(applicationId);
     rmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);
     return rmNettyRemotingClient;
   }


   @Override
   public void init() {
     // registry processor
     registerProcessor();
     // CAS 保證原子性
     if (initialized.compareAndSet(false, true)) {
     	super.init();
							}
			}

AbstractNettyRemotingClient 初始化

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
 implements RemotingClient {
 @Override
 public void init() {
     //啟動ScheduledExecutorService定時執行器,每10秒嘗試進行一次重連TC
     timerExecutor.scheduleAtFixedRate(new Runnable() {
     @Override
     public void run() {
     clientChannelManager.reconnect(getTransactionServiceGroup());
     }
     }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
     //用於多數據合並,減少通信次數
     if (NettyClientConfig.isEnableClientBatchSendRequest()) {
     mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
     MAX_MERGE_SEND_THREAD,
     KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
     new LinkedBlockingQueue<>(),
     new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
     mergeSendExecutorService.submit(new MergedSendRunnable());
     }
     super.init();
     clientBootstrap.start();
 }
 }

在RMClient初始化時,啟動了RMHandlerAT接收TC在二階段發出的提交或者回滾請求

at模式的整體流程

為DataSource生成代理的DataSourceProxy

要使用AT模式,必須向spring ioc注入DataSourceProxy

  @Bean
  public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
     return new DataSourceProxy(druidDataSource);
  }

如果使用了seata-spring-boot-start.jar這個包,就不需要手動向spring ioc注入DataSourceProxy。這個包里面配置了spring boot的自動裝配 一個 SeataAutoDataSourceProxyCreator

自動裝配 SeataAutoDataSourceProxyCreator

SeataAutoConfiguration.seataAutoDataSourceProxyCreator

seata的加載入口位於io.seata.spring.boot.autoconfigure.SeataAutoConfiguration:

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class SeataAutoConfiguration {
.....

    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, 
				    name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"},
								havingValue = "true", matchIfMissing = true)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
 
     	return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
    }
}

SeataAutoDataSourceProxyCreator 何方神聖?

看一下最后一個方法中返回了一個SeataAutoDataSourceProxyCreator,這個對象是用來干嘛的呢?

不是很奇怪這里並沒有配置DataSourceProxy?
它繼承了AbstractAutoProxyCreator.

AbstractAutoProxyCreator是aop里面一個把目標對象轉換成代理對象的一個后置處理器。
在spring中,只要把后置處理器的bean定義給到ioc容器,BeanFactory就調用后置處理器的各種方法參與到bean的生命周期的各個步驟中。

來看一下SeataAutoDataSourceProxyCreator,它的shouldSkip是說這個后置處理器只會對DataSource對象生成其代理對象,它用到的橫切關注點邏輯SeataAutoDataSourceProxyAdvice。

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
	private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
	private final String[] excludes;
	private final Advisor advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice());

	public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) {
					this.excludes = excludes;
					setProxyTargetClass(!useJdkProxy);
	}

	@Override
	protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
		if (LOGGER.isInfoEnabled()) {
					LOGGER.info("Auto proxy of [{}]", beanName);
		}
					return new Object[]{advisor};
	}

	// 這個方法里面確定對那些Bean不起作用
	// 非DataSource的都會不起作用
	@Override
	protected boolean shouldSkip(Class<?> beanClass, String beanName) {
			return SeataProxy.class.isAssignableFrom(beanClass) ||
						!DataSource.class.isAssignableFrom(beanClass) ||
			Arrays.asList(excludes).contains(beanClass.getName());
	}
}

基類AbstractAutoProxyCreator 創建動態代理

public Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException {
Object cacheKey = this.getCacheKey(beanClass, beanName);
if (beanName == null || !this.targetSourcedBeans.contains(beanName)) {
		if (this.advisedBeans.containsKey(cacheKey)) {
								return null;
		}

		if (this.isInfrastructureClass(beanClass) || this.shouldSkip(beanClass, beanName)) {
			this.advisedBeans.put(cacheKey, Boolean.FALSE);
			return null;
		}
}

if (beanName != null) {
	TargetSource targetSource = this.getCustomTargetSource(beanClass, beanName);
		if (targetSource != null) {
			this.targetSourcedBeans.add(beanName);
								
		//獲取攔截器
		Object[] specificInterceptors = this.getAdvicesAndAdvisorsForBean(beanClass, beanName, targetSource);
								
		// 創建代理
	Object proxy = this.createProxy(beanClass, beanName, specificInterceptors, targetSource);
		this.proxyTypes.put(cacheKey, proxy.getClass());
		return proxy;
	}
}

return null;
}


SeataAutoDataSourceProxyAdvice

它用到的橫切關注點邏輯SeataAutoDataSourceProxyAdvice。

SeataAutoDataSourceProxyAdvice里面的invoke方法,一旦調用DataSource的方法,就會把它替換成DataSourceProxy對象。

ostProcessBeforeInitialization為DataSource生成代理的DataSource。

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo {

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
			DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis());
			Method method = invocation.getMethod();
			Object[] args = invocation.getArguments();
			Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
			if (m != null) {
							return m.invoke(dataSourceProxy, args);
			} else {
							return invocation.proceed();
			}
}

@Override
public Class<?>[] getInterfaces() {
			return new Class[]{SeataProxy.class};
}

}

DataSourceProxy初始化

DataSourceProxy初始化的時候向server注冊RM資源管理器

	public DataSourceProxy(DataSource targetDataSource) {
					this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
	}

	public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
					super(targetDataSource);
					init(targetDataSource, resourceGroupId);
	}

DefaultResourceManager的registerResource

DataSourceProxy的init方法里面調用了DefaultResourceManager的registerResource

private void init(DataSource dataSource, String resourceGroupId) {
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
		jdbcUrl = connection.getMetaData().getURL();
		dbType = JdbcUtils.getDbType(jdbcUrl);
		if (JdbcConstants.ORACLE.equals(dbType)) {
						userName = connection.getMetaData().getUserName();
		}
} catch (SQLException e) {
		throw new IllegalStateException("can not init dataSource", e);
}
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
		tableMetaExcutor.scheduleAtFixedRate(() -> {
						try (Connection connection = dataSource.getConnection()) {
										TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
														.refresh(connection, DataSourceProxy.this.getResourceId());
						} catch (Exception ignore) {
						}
		}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}
}

DefaultResourceManager的registerResource方法,首先根據resource的branchType選擇一個ResourceManager.

resource是DataSourceProxy,它的branchType是BranchType.AT,BranchType.AT對應的ResourceManager是DataSourceManager。

@Override
public void registerResource(Resource resource) {
     getResourceManager(resource.getBranchType()).registerResource(resource);
}

DataSourceManager的registerResource

DataSourceManager的registerResource方法,最終調用了父類的registerResource,父類就是AbstractResourceManager。

@Override
public void registerResource(Resource resource) {
				DataSourceProxy dataSourceProxy = (DataSourceProxy)resource;
				dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
				super.registerResource(dataSourceProxy);
}

AbstractResourceManager的registerResource

AbstractResourceManager的registerResource方法,調用RmRpcClient中的registerResource方法去了。

@Override
public void registerResource(Resource resource) {
				RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

RmRpcClient的registerResource

RmRpcClient的registerResource方法,配置的seata server可能是單機或者集群,集群的話需要向每個sever都注冊一下。

public void registerResource(String resourceGroupId, String resourceId) {
			if (getClientChannelManager().getChannels().isEmpty()) {
				getClientChannelManager().reconnect(transactionServiceGroup);
				return;
			}
			synchronized (getClientChannelManager().getChannels()) {
							for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
									String serverAddress = entry.getKey();									Channel rmChannel = entry.getValue();
											if (LOGGER.isInfoEnabled()) {
						LOGGER.info("will register resourceId:{}", resourceId);
											}
						sendRegisterMessage(serverAddress, rmChannel, resourceId);
							}
			}
}

sendRegisterMessage里面生成了RegisterRMRequest對象,然后把RegisterRMRequest對象傳給sendAsyncRequestWithoutResponse方法。

public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
message.setResourceIds(resourceId);
try {
				super.sendAsyncRequestWithoutResponse(channel, message);
} catch (FrameworkException e) {
				if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
					getClientChannelManager().releaseChannel(channel, serverAddress);
		     		if (LOGGER.isInfoEnabled()) {
						LOGGER.info("remove not writable channel:{}", channel);
					}
				} else {
			LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);
				}
} catch (TimeoutException e) {
				LOGGER.error(e.getMessage());
}
}

sendAsyncRequestWithoutResponse里面調用了sendAsyncRequest

    protected Object sendAsyncRequestWithoutResponse(Channel channel, Object msg) throws
     TimeoutException {
     return sendAsyncRequest(null, channel, msg, 0);
    }

sendAsyncRequest方法發送請求

sendAsyncRequest方法里面,第一步生成了RpcMessage 和MessageFuture 對象;第二步真正調用server;第三步,等待server返回結果。重要的是第二步,又分有沒有開啟多線程去處理發送消息,如果有,就把RpcMessage 直接放到阻塞隊列里面,等待線程處理,沒有的話直接調用sendSingleRequest方法。

   private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
     throws TimeoutException {
     if (channel == null) {
     LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
     return null;
     }
     final RpcMessage rpcMessage = new RpcMessage();
     rpcMessage.setId(getNextMessageId());
     rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
     rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
     rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
     rpcMessage.setBody(msg);

     final MessageFuture messageFuture = new MessageFuture();
     messageFuture.setRequestMessage(rpcMessage);
     messageFuture.setTimeout(timeout);
     futures.put(rpcMessage.getId(), messageFuture);

     if (address != null) {
     /*
     The batch send.
     Object From big to small: RpcMessage -> MergedWarpMessage -> AbstractMessage
     @see AbstractRpcRemotingClient.MergedSendRunnable
     */
     if (NettyClientConfig.isEnableClientBatchSendRequest()) {
     ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
     BlockingQueue<RpcMessage> basket = map.get(address);
     if (basket == null) {
     map.putIfAbsent(address, new LinkedBlockingQueue<>());
     basket = map.get(address);
     }
     basket.offer(rpcMessage);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("offer message: {}", rpcMessage.getBody());
     }
     if (!isSending) {
     synchronized (mergeLock) {
     mergeLock.notifyAll();
     }
     }
     } else {
     // the single send.
     sendSingleRequest(channel, msg, rpcMessage);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("send this msg[{}] by single send.", msg);
     }
     }
     } else {
     sendSingleRequest(channel, msg, rpcMessage);
     }
     if (timeout > 0) {
     try {
     return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (Exception exx) {
     LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), address, msg);
     if (exx instanceof TimeoutException) {
     throw (TimeoutException) exx;
     } else {
     throw new RuntimeException(exx);
     }
     }
     } else {
     return null;
     }
    }

sendSingleRequest,這里真正調用了 channel.writeAndFlush把數據發送出去。

    private void sendSingleRequest(Channel channel, Object msg, RpcMessage rpcMessage) {
     ChannelFuture future;
     channelWritableCheck(channel, msg);
     future = channel.writeAndFlush(rpcMessage);
     future.addListener(new ChannelFutureListener() {
     @Override
     public void operationComplete(ChannelFuture future) {
     if (!future.isSuccess()) {
     MessageFuture messageFuture = futures.remove(rpcMessage.getId());
     if (messageFuture != null) {
     messageFuture.setResultMessage(future.cause());
     }
     destroyChannel(future.channel());
     }
     }
     });
    }

數據源代理

seata不止會代理數據源,還會對Connection,Statement做代理封裝。對sql解析發生在StatementProxy中.

  1 public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
  2 
  3     @Override
  4     public boolean execute(String sql) throws SQLException {
  5     this.targetSQL = sql;
  6     return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
  7     @Override
  8     public Boolean execute(T statement, Object... args) throws SQLException {
  9     return statement.execute((String) args[0]);
 10     }
 11     }, sql);
 12     }
 13 }
 14 
 15 public class ExecuteTemplate{
 16     
 17    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
 18     StatementProxy<S> statementProxy,
 19     StatementCallback<T, S> statementCallback,
 20     Object... args) throws SQLException {
 21     if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
 22     // 未開啟全局事務時,正常執行
 23     return statementCallback.execute(statementProxy.getTargetStatement(), args);
 24     }
 25     //解析SQL
 26     if (sqlRecognizer == null) {
 27     sqlRecognizer = SQLVisitorFactory.get(
 28     statementProxy.getTargetSQL(),
 29     statementProxy.getConnectionProxy().getDbType());
 30     }
 31     Executor<T> executor = null;
 32     if (sqlRecognizer == null) {
 33     executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
 34     } else {
 35     //對不同的SQL類型特殊處理
 36     switch (sqlRecognizer.getSQLType()) {
 37     case INSERT:
 38     executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
 39     break;
 40     case UPDATE:
 41     executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
 42     break;
 43     case DELETE:
 44     executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
 45     break;
 46     case SELECT_FOR_UPDATE:
 47     executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
 48     break;
 49     default:
 50     executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
 51     break;
 52     }
 53     }
 54     T rs = null;
 55     try {
 56     //真正執行業務邏輯
 57     rs = executor.execute(args);
 58     } catch (Throwable ex) {
 59     if (!(ex instanceof SQLException)) {
 60     // Turn other exception into SQLException
 61     ex = new SQLException(ex);
 62     }
 63     throw (SQLException)ex;
 64     }
 65     return rs;
 66     }
 67 }
 68 
 69 
 70 public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
 71 
 72     //接下來執行到這里
 73     @Override
 74     public T doExecute(Object... args) throws Throwable {
 75     AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
 76     if (connectionProxy.getAutoCommit()) {
 77     return executeAutoCommitTrue(args);
 78     } else {
 79     return executeAutoCommitFalse(args);
 80     }
 81     }
 82 
 83     protected T executeAutoCommitFalse(Object[] args) throws Exception {
 84     //業務SQL執行前快照
 85     TableRecords beforeImage = beforeImage();
 86     //真正執行業務SQL
 87     T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
 88     //業務SQL執行后快照
 89     TableRecords afterImage = afterImage(beforeImage);
 90     //准備快照
 91     prepareUndoLog(beforeImage, afterImage);
 92     return result;
 93     }
 94     
 95     
 96     protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
 97     if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
 98     return;
 99     }
100     ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
101     TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
102     String lockKeys = buildLockKey(lockKeyRecords);
103     connectionProxy.appendLockKey(lockKeys);
104     SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
105     connectionProxy.appendUndoLog(sqlUndoLog);
106     }
107 }

RM分布式事務的第一階段

全局事務的初始化

問題:

@GlobalTransactional注解的方法,是如何初始化

SeataSeckillServiceImpl.doSeckill

在這里插入圖片描述

AbstractAutoProxyCreator

GlobalTransactionScanner實現了AbstractAutoProxyCreator

/**
 * The type Global transaction scanner.
 *
 * @author slievrly
 */
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,
    DisposableBean {

AbstractAutoProxyCreator就比較復雜了,它Spring實現AOP的一種方式。

本質上是一個BeanPostProcessor,他在bean初始化之前,調用內部的createProxy方法,創建一個bean的AOP代理bean並返回。

但是它不是把所有的bean都增強,選取哪些bean做增強呢?

選取的策略是根據 getAdvicesAndAdvisorsForBean 方法返回的Advices/Advisors來確定的。

GlobalTransactionScanner的 getAdvicesAndAdvisorsForBean 方法,如下:

在這里插入圖片描述

其實總體的邏輯基本就清晰了,GlobalTransactionScanner掃描有注解的bean,做AOP增強。

wrapIfNecessary

GlobalTransactionScannerde 的wrapIfNecessary這里面做了兩個事情,

1)根據配置判斷,到底用的是TCC模式,還是其他模式,會放置不同的interceptor。這些interceptor會在getAdvicesAndAdvisorsForBean中返回。

2)如果Bean不是代理類,則走Spring默認的AOP的Wrap;否則調用getAdvicesAndAdvisorsForBean獲取要使用的Advices/Advisors,其實就是用第一步中配置的interceptor。

GlobalTransactionScanner 的wrapIfNecessary使用到 getAdvicesAndAdvisorsForBean:

這個方法從名字上就已經知道作用了,並不是所有的bean都會被增強。哪些需要被增強,還看對應的Advices和Advisors具體要攔截哪些Bean。

    @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
     if (disableGlobalTransaction) {
     return bean;
     }
     try {
     synchronized (PROXYED_SET) {
     if (PROXYED_SET.contains(beanName)) {
     return bean;
     }
     interceptor = null;
     //是否TCC
     //check TCC proxy
     if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
     //TCC代理的Bean有 sofa:reference/dubbo:reference/本地TCC
     //使用TccActionInterceptor作為Advices/Advisors
     //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
     interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
     } else {
     Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
     Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

     if (!existsAnnotation(new Class[]{serviceInterface})
     && !existsAnnotation(interfacesIfJdk)) {
     return bean;
     }

     if (interceptor == null) {

     ////使用GlobalTransactionalInterceptor作為Advices/Advisors
     if (globalTransactionalInterceptor == null) {
     globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
     ConfigurationCache.addConfigListener(
     ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
     (ConfigurationChangeListener)globalTransactionalInterceptor);
     }
     interceptor = globalTransactionalInterceptor;
     }
     }

     LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
     //不是代理類則走Spring的默認wrap,是代理則用上面配置的interceptor代理
     if (!AopUtils.isAopProxy(bean)) {
     bean = super.wrapIfNecessary(bean, beanName, cacheKey);
     } else {
     AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
     Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
     for (Advisor avr : advisor) {
     advised.addAdvisor(0, avr);
     }
     }
     PROXYED_SET.add(beanName);
     return bean;
     }
     } catch (Exception exx) {
     throw new RuntimeException(exx);
     }
    }

Spring中Bean的關鍵初始化過程

我們看其他的方法前,先回顧一下Spring中Bean的關鍵初始化過程:

實例化 -> 屬性注入 -> postProcessBeforeInitialization -> afterPropertiesSet/init方法 -> postProcessAfterInitialization

屬性注入這一步和我們講事務沒關系,忽略。

class AbstractAutoProxyCreator的方法

 public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
     if (bean != null) {
     Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
     if (this.earlyProxyReferences.remove(cacheKey) != bean) {
     return this.wrapIfNecessary(bean, beanName, cacheKey);
     }
     }

     return bean;
    }

以上的bean初始化場景為:

singletonFactory.getObject()實例化Bean的時候,最終調用getEarlyBeanReference來實例化Bean,

DefaultSingletonBeanRegistry的方法

 public Object getSingleton(String beanName, ObjectFactory<?> singletonFactory) {
   
     try {
     singletonObject = singletonFactory.getObject();
     newSingleton = true;
     }

GlobalTransactionalInterceptor 事務攔截器

TM的一個作用就是開啟全局事務,實際應用時在需要開啟事務的方法上加注解@GlobalTransactional,與之相關的,有一個攔截器,io.seata.spring.annotation.GlobalTransactionalInterceptor:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {


    /**
     * Instantiates a new Global transactional interceptor.
     *
     * @param failureHandler the failure handler
     */
    public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
     this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
     this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
     DEFAULT_DISABLE_GLOBAL_TRANSACTION);
    }


    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
     Class<?> targetClass =
     methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
     Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
     if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
     final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
     final GlobalTransactional globalTransactionalAnnotation =
     getAnnotation(method, targetClass, GlobalTransactional.class);
     final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
     boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
     if (!localDisable) {
     if (globalTransactionalAnnotation != null) {
     // //全局事務開始
     return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
     } else if (globalLockAnnotation != null) {
     
     ////全局鎖
     return handleGlobalLock(methodInvocation);
     }
     }
     }
     return methodInvocation.proceed();
    }

如果啟用seata的分布式事務且有注解 @GlobalTransactional, 則執行 handleGlobalTransaction():

根據注解開啟 aop切面

根據@GlobalTransactional注釋的方法,通過GlobalTransactionalInterceptor過濾器加入cglib切面,並new TransactionalTemplate開啟事務

postProcessAfterInitialization:299, AbstractAutoProxyCreator
  public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
     if (bean != null) {
     Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
     if (this.earlyProxyReferences.remove(cacheKey) != bean) {
     return this.wrapIfNecessary(bean, beanName, cacheKey);
     }
     }

     return bean;
    }
wrapIfNecessary:223, GlobalTransactionScanner
  protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
     if (this.disableGlobalTransaction) {
     return bean;
     } else {
     try {
     synchronized(PROXYED_SET) {
     if (PROXYED_SET.contains(beanName)) {
     return bean;
     } else {
     this.interceptor = null;
     if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, this.applicationContext)) {
     this.interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
     } else {
     Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
     Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
																												
																												#沒有注解,則pass
     if (!this.existsAnnotation(new Class[]{serviceInterface}) && !this.existsAnnotation(interfacesIfJdk)) {
     return bean;
     }

     if (this.interceptor == null) {
     if (this.globalTransactionalInterceptor == null) {
																																# 實例化  GlobalTransactionalInterceptor
     this.globalTransactionalInterceptor = new GlobalTransactionalInterceptor(this.failureHandlerHook);
     ConfigurationCache.addConfigListener("service.disableGlobalTransaction", new ConfigurationChangeListener[]{(ConfigurationChangeListener)this.globalTransactionalInterceptor});
     }

     this.interceptor = this.globalTransactionalInterceptor;
     }
     }

在這里插入圖片描述

TransactionalTemplate事務模板

 /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws Throwable {
     // 1 get transactionInfo
     TransactionInfo txInfo = business.getTransactionInfo();
     if (txInfo == null) {
     throw new ShouldNeverHappenException("transactionInfo does not exist");
     }
     // 1.1 get or create a transaction
     GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

     // 1.2 Handle the Transaction propatation and the branchType
     Propagation propagation = txInfo.getPropagation();
     SuspendedResourcesHolder suspendedResourcesHolder = null;
     try {
     switch (propagation) {
     case NOT_SUPPORTED:
     suspendedResourcesHolder = tx.suspend(true);
     return business.execute();
     case REQUIRES_NEW:
     suspendedResourcesHolder = tx.suspend(true);
     break;
     case SUPPORTS:
     if (!existingTransaction()) {
     return business.execute();
     }
     break;
     case REQUIRED:
     break;
     case NEVER:
     if (existingTransaction()) {
     throw new TransactionException(
     String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
     ,RootContext.getXID()));
     } else {
     return business.execute();
     }
     case MANDATORY:
     if (!existingTransaction()) {
     throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
     }
     break;
     default:
     throw new TransactionException("Not Supported Propagation:" + propagation);
     }


     try {

     // 2. begin transaction
     beginTransaction(txInfo, tx);

     Object rs = null;
     try {

     // Do Your Business
     rs = business.execute();

     } catch (Throwable ex) {

     // 3.the needed business exception to rollback.
     completeTransactionAfterThrowing(txInfo, tx, ex);
     throw ex;
     }

     // 4. everything is fine, commit.
     commitTransaction(tx);

     return rs;
     } finally {
     //5. clear
     triggerAfterCompletion();
     cleanUp();
     }
     } finally {
     tx.resume(suspendedResourcesHolder);
     }

    }

濃縮一下:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. 獲取或者創建一個全局事務
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 1.1 獲取事務信息
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {
        // 2. 開始全局事務
        beginTransaction(txInfo, tx);
        Object rs = null;
        try {
            // 執行業務邏輯
            rs = business.execute();
        } catch (Throwable ex) {
            // 3.rollback全局事務
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }
        // 4. commit全局事務
        commitTransaction(tx);
        return rs;
    } finally {
        //5. 清理
        triggerAfterCompletion();
        cleanUp();
    }
}

TransactionalTemplate事務模板execute方法中主要有以下幾個步驟:

execute方法的邏輯我們應該非常的熟悉,這和JDBC的API非常的相似。同樣是經歷:begin -> commit || rollback,這樣一個邏輯。

步驟主要分為如下幾個:

1)獲取或者創建一個全局事務;

2)begin全局事務;

3)異常rollback事務;

4)正常commit事務;

下面,我們將逐步閱讀對應步驟的代碼

首先咱們關注 開啟事務 方法 beginTransaction:

beginTransaction最終調用了DefaultGlobalTransaction的begin方法

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
     try {
     triggerBeforeBegin();
     tx.begin(txInfo.getTimeOut(), txInfo.getName());
     triggerAfterBegin();
     } catch (TransactionException txe) {
     throw new TransactionalExecutor.ExecutionException(tx, txe,
     TransactionalExecutor.Code.BeginFailure);

     }
    }

真正執行事務開始的地方: 獲取xid

    @Override
    public void begin(int timeout, String name) throws TransactionException {

     //此處的角色判斷有關鍵的作用
     //表明當前是——全局事務的發起者(Launcher)  還是參與者(Participant)
     //如果在分布式事務的下游系統方法中也加上GlobalTransactional注解
     //那么它的角色就是Participant,即會忽略后面的begin就退出了
     //而判斷是發起者(Launcher)還是參與者(Participant)是根據當前上下文是否已存在XID來判斷
     //- 沒有XID的就是Launcher
     //- 已經存在XID的就是Participant

     if (role != GlobalTransactionRole.Launcher) {
     assertXIDNotNull();
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
     }
     return;
     }
     //開始新事物, xid 必須 為空
     assertXIDNull();
     if (RootContext.getXID() != null) {
     throw new IllegalStateException();
     }
     xid = transactionManager.begin(null, null, name, timeout);
     status = GlobalStatus.Begin;
     RootContext.bind(xid);
     if (LOGGER.isInfoEnabled()) {
     LOGGER.info("Begin new global transaction [{}]", xid);
     }

    }

方法開頭處if (role != GlobalTransactionRole.Launcher)對 role 的判斷有關鍵的作用,表明當前是全局事務的發起者(Launcher)還是參與者(Participant)。

如果在分布式事務的下游系統方法中也加上@GlobalTransactional注解,那么它的角色就是 Participant,會忽略后面的 begin 直接 return,而判斷是 Launcher 還是 Participant 是根據當前上下文是否已存在 XID 來判斷,沒有 XID 的就是 Launcher,已經存在 XID的就是 Participant。

由此可見,全局事務的創建只能由 Launcher 執行,而一次分布式事務中也只有一個Launcher 存在。

如果Launcher 開始新事務, xid 必須 為空.

DefaultTransactionManager負責 TM 與 TC 通訊

接下來:

通過transactionManager.begin() 方法通過 TmRpcClient 與server通信並生成一個xid,再將將xid綁定到Root上下文中。

DefaultTransactionManager負責 TM 與 TC 通訊,發送 begin、commit、rollback 指令。

/**
 * The type Default transaction manager.
 *
 * @author sharajava
 */
public class DefaultTransactionManager implements TransactionManager {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
     throws TransactionException {
     GlobalBeginRequest request = new GlobalBeginRequest();
     request.setTransactionName(name);
     request.setTimeout(timeout);
     GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
     if (response.getResultCode() == ResultCode.Failed) {
     throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
     }
     return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
     GlobalCommitRequest globalCommit = new GlobalCommitRequest();
     globalCommit.setXid(xid);
     GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
     return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
     GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
     globalRollback.setXid(xid);
     GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
     return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
     GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
     queryGlobalStatus.setXid(xid);
     GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);
     return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {
     GlobalReportRequest globalReport = new GlobalReportRequest();
     globalReport.setXid(xid);
     globalReport.setGlobalStatus(globalStatus);
     GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);
     return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
     try {
     return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
     } catch (TimeoutException toe) {
     throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
     }
    }
}


這里是 GlobalBeginRequest 請求,是begin指令。

GlobalBeginRequest 消息類型的說明

public class MessageType {

    /**
     * The constant TYPE_GLOBAL_BEGIN.
     */
    public static final short TYPE_GLOBAL_BEGIN = 1;
    /**
     * The constant TYPE_GLOBAL_BEGIN_RESULT.
     */
    public static final short TYPE_GLOBAL_BEGIN_RESULT = 2;
    /**
     * The constant TYPE_GLOBAL_COMMIT.
     */
    public static final short TYPE_GLOBAL_COMMIT = 7;
 ...
 }

至此拿到TC返回的 XID 表示一個全局事務創建成功。

業務代碼執行business.execute()

全局事務創建后,就開始執行 business.execute(),

對應於秒殺服務而言,執行的是加了@GlobalTransactional 注解的原來的方法。

public class SeataSeckillServiceImpl {

    @Autowired
    private SeataDemoOrderFeignClient stockFeignClient;
    @Autowired
    private SeataDemoStockFeignClient orderFeignClient;

    /**
     * 減庫存,下訂單
     */
    @GlobalTransactional  //開啟全局事務(重點) 使用 seata 的全局事務
    public void doSeckill(@RequestBody SeckillDTO dto) {

     stockFeignClient.addOrder(dto);
     orderFeignClient.minusStock(dto);
    }
}

至此拿到TC返回的XID一個全局事務就開啟了,全局事務創建后,就開始執行business.execute(),即我們的業務代碼,進入RM處理流程

TM分支事務的第一階段

圖解 AT 模式一階段分支事務流程

由於seata代理了數據源,sql解析undolog是在代理數據源中完成的。

一階段中分支事務的具體工作有:

  1. 根據需要執行的 SQLUPDATEINSERTDELETE)類型生成相應的 SqlRecognizer
  2. 進而生成相應的 SqlExecutor
  3. 接着便進入核心邏輯查詢數據的前后快照,例如圖中標紅的部分,拿到修改數據行的前后快照之后,將二者整合生成 UndoLog,並嘗試將其和業務修改在同一事務中提交。

整個流程的流程圖如下:

img

值得注意的是,本地事務提交前必須先向服務端注冊分支,分支注冊信息中包含由表名和行主鍵組成的全局鎖,如果分支注冊過程中發現全局鎖正在被其他全局事務鎖定則拋出全局鎖沖突異常,客戶端需要循環等待,直到其他全局事務釋放鎖之后該本地事務才能提交。Seata 以這樣的機制保證全局事務間的寫隔離。

分支事務注冊與事務提交

img

Seata AT 的工作流程

工作流程總覽

img

概括來講,AT 模式的工作流程分為兩階段。一階段進行業務 SQL 執行,並通過 SQL 攔截、SQL 改寫等過程生成修改數據前后的快照(Image),並作為 UndoLog 和業務修改在同一個本地事務中提交

如果一階段成功那么二階段僅僅異步刪除剛剛插入的 UndoLog;如果二階段失敗則通過 UndoLog 生成反向 SQL 語句回滾一階段的數據修改。其中關鍵的 SQL 解析和拼接工作借助了 Druid Parser 中的代碼,這部分本文並不涉及,感興趣的小伙伴可以去翻看源碼,並不是很復雜

RM的一階段提交

AT模式的一階段流程由 數據源代理+SQL識別器 的方式實現

首先回憶jdbc的執行流程

	//通過數據源獲取連接
	Connection connection = dataSource.getConnection();
	// 獲得 聲明
	PrepareStatement pst = connection.prepareStatement();
	// 執行SQL語句
	pst.executeUpdate();
	// 提交事務
	connection.commit();

一階段加載

在這里插入圖片描述

在一階段,Seata 攔截“業務 SQL”:

  1. 解析 SQL 語義,找到“業務 SQL”要更新的業務數據,在業務數據被更新前,將其保存成“before image”(前置鏡像);
  2. 執行“業務 SQL”更新業務數據;
  3. 在業務數據更新之后,其保存成"after image” (后置鏡像),最后生成行鎖。

以上操作全部在一個數據庫事務內完成,這樣保證了一階段操作的原子性。

Seata AT 模式客戶端部分

Seata 中主要針對 java.sql 包下的 DataSource、Connection、Statement、PreparedStatement 四個接口進行了再包裝,包裝類分別為 DataSourceProxy、ConnectionProxy、StatementProxy、PreparedStatementProxy,很好一一對印,其功能是在 SQL 語句執行前后、事務 commit 或者 rollbakc 前后進行一些與 Seata 分布式事務相關的操作,例如分支注冊、狀態回報、全局鎖查詢、快照存儲、反向 SQL 生成等。

下圖來源於 Seata 官方文檔: 數據源代理部分 —— 三類 Proxy

AT模式對 DataSource,Connection,Statement 都做了代理

  • dataSource 被DataSourceProxy代理, dataSource.getConnection 獲得的對象是 ConnectionProxy 對象, connection.prepareStatement 獲得的是 PreparedStatementProxy 對象
  • prepareStatement.executeUpdate() 做了特殊了處理, 通過Duird數據源提供的API創建Seata的SQL識別器,SQL識別器提供了識別SQL語句的功能,用於支持Executor創建前置鏡像,后置鏡像。
  • executor 構建前置鏡像, 執行業務SQL,構建后置鏡像, 通過前置鏡像和后置鏡像,XID等數據構建回滾日志對象,添加到ConnectionProxy的上下文
  • connectionProxy.commit, 注冊分支事物, 根據connectionProxy的上下文對象將回滾日志生成SQL,執行回滾日志SQL,真實連接提交,如果配置了一階段提交報告(client.rm.reportSuccessEnable=true,默認是false),則向TC發送一階段提交完成的請求

prepareStatement.executeUpdate


public class PreparedStatementProxy extends AbstractPreparedStatementProxy
    implements PreparedStatement, ParametersHolder {

    @Override
    public boolean execute() throws SQLException {
     return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }

    @Override
    public ResultSet executeQuery() throws SQLException {
     return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
    }

    @Override
    public int executeUpdate() throws SQLException {
     return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
    }

ExecuteTemplate.executeUpdate

AT 模式下,真正分支事務開始是在 StatementProxy 和 PreparedStatementProxy 的 execute、executeQuery、executeUpdate 等具體執行方法中,這些方法均實現自 Statement 和 PreparedStatement 的標准接口,而方法體內調用了 ExecuteTemplate.execute 做方法攔截,

img

AT 模式下,真正分支事務開始是在 StatementProxy 和 PreparedStatementProxy 的 execute、executeQuery、executeUpdate 等具體執行方法中,這些方法均實現自 Statement 和 PreparedStatement 的標准接口,而方法體內調用了 ExecuteTemplate.execute 做方法攔截,下面我們來看看這個方法的實現:

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                    StatementProxy<S> statementProxy,
                                                    StatementCallback<T, S> statementCallback,
                                                    Object... args) throws SQLException {
    
    // 如果不是處於全局事務中,即上游沒有 xid 傳遞下來
    // 或者沒有 GlobalLock 修飾,該數據操作不需要納入 Seata 框架下進行管理
    // 則直接執行這個 SQL                                                        
    if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizer == null) {
        sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor = null;
    if (sqlRecognizer == null) {
        executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
    } else {

        // 通過 SQL 的類型,生成不同的執行器
        // 1.3.0 支持Mysql,Oracle,PGSql 的插入執行器
        switch (sqlRecognizer.getSQLType()) {
            case INSERT:
                executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case UPDATE:
                executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case DELETE:
                executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case SELECT_FOR_UPDATE:
                executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            default:
                executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                break;
        }
    }
    T rs = null;
    try {
    // 執行器去執行

        // 調用執行器的 execute 方法,顯然這是一個抽象方法,最后會調到三個具體的執行器實現類之一
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException)ex;
    }
    return rs;
}

下面我們看看這個 executor.execute 方法的實現。

執行器接口 execute 的實現

execute 方法的實現位於 BaseTransactionalExecutor 類中:

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {

@Override
public Object execute(Object... args) throws Throwable {
    
    // 如果處於全局事務中,綁定 xid
    if (RootContext.inGlobalTransaction()) {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }
    
  // 設置全局鎖的狀態
  statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  

    // 調用抽象方法 doExecute
    return doExecute(args);
}

BaseTransactionalExecutor 類中 execute 方法主要做了一些與全局事務相關的狀態值的設定,繼續追蹤進入 doExecute 方法的實現。

AbstractDMLBaseExecutor 執行器基類

終於進入正題,doExecute 方法位於 AbstractDMLBaseExecutor 類中,該類繼承自上文中的 BaseTransactionalExecutor。

doExecute 方法體內先拿到具體的連接代理對象 connectionProxy,然后根據 Commit 標識進行不同方法的調用,但翻看代碼實現時發現,其實 executeCommitTrue 方法就是先把 Commit 標識改成 false 然后再調用 executeCommitFalse 方法。

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    
    // 判斷當前連接是否開啟了自動提交, 這里看executeAutoCommitFalse的部分。
		// 開啟自動提交的部分關掉自動提交,然后調用了下面的部分,然后恢復自動提交為true
		
    if (connectionProxy.getCommit()) {
        return executeCommitTrue(args);
    } else {
        return executeCommitFalse(args);
    }
}

executeCommitTrue 方法體中有一個無限循環,這么做的意義是,一旦分支注冊時拋出鎖沖突異常,則需要一直等待直到別的全局事務釋放該全局鎖之后才能提交自己的修改,否則一直阻塞等待。

protected T executeCommitTrue(Object[] args) throws Throwable {
    T result = null;
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    LockRetryController lockRetryController = new LockRetryController();
    try {
        
        // 先將 Commit 標識改成 false,只允許手動提交
        connectionProxy.setCommit(false);
        
        // 進入一個無限循環
        while (true) {
            try {
                
                // 調用 executeCommitFalse 方法
                result = executeCommitFalse(args);
                
                // 如果分支成功,則 commit,提交本地事務,該方法也是代理方法,下文會敘述
                connectionProxy.commit();
                break;
            } catch (LockConflictException lockConflict) {
                
                // 如果全局鎖沖突,可能是已經有別的事務拿到了要修改行的全局鎖,則回滾
                connectionProxy.getTargetConnection().rollback();

                // 然后 sleep 一段時間,不要立即重試
                lockRetryController.sleep(lockConflict);
            }
        }

    } catch (Exception e) {

        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("exception occur", e);
        throw e;
    } finally {
        connectionProxy.setCommit(true);
    }
    return result;
}

下面我們仔細看一下 executeCommitFalse 方法的邏輯,它是實現 AT 模式的關鍵步驟。

其中,beforeImage 是一個抽象方法,針對 INSERT、UPDATE、DELETE 有不同的實現,因為需要將這三種不同的 SQL 解析為相應的 SELECT 語句,查詢操作前數據的快照;同樣的 afterImage 也是一個抽象方法,來查詢操作后數據的快照;statementCallback.execute 語句真正執行 SQL;prepareUndoLog 整合 beforeImage 和 afterImage 生成 UndoLog 對象。


// 執行自動提交

/**
* Execute auto commit false t.
*
* @param args the args
* @return the t
* @throws Exception the exception
*/
protected T executeAutoCommitFalse(Object[] args) throws Exception {
		if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)
		{
						throw new NotSupportYetException("multi pk only support mysql!");
		}
	 // beforeImage 是一個抽象方法,針對 INSERT、UPDATE、DELETE 有不同的實現
  	// 抽象方法, 子類Mysql,Oracle,PGSql 會知道如何構建前置鏡像
		TableRecords beforeImage = beforeImage();
		// 執行業務SQL
		T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
	  // 原理同 beforeImage
 	// 通過前置鏡像構建后置鏡像
		TableRecords afterImage = afterImage(beforeImage);
		  // 整合 beforeImage 和 afterImage 生成 UndoLog
 // 通過前置鏡像和后置鏡像生成回滾日志,插入到代理連接的上下文
		prepareUndoLog(beforeImage, afterImage);
		return result;
}

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
		// 如果前置鏡像為空,並且后置鏡像也是空,就不用構建回滾日志了

		if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
						return;
		}

		ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

		TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
		String lockKeys = buildLockKey(lockKeyRecords);
		// 添加lockKey
		connectionProxy.appendLockKey(lockKeys);
		// 構建回滾日志
		SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);

		// 將回滾日志添加到代理連接的上下文中
		connectionProxy.appendUndoLog(sqlUndoLog);
}

本地connectionProxy.commit() 代理提交

executeCommitFalse 執行過后,會調用 connectionProxy.commit() 做事務提交,我們看看該代理方法的實現。

ConnectionProxy 復寫的 commit 方法
該 commit 方法實現自 Connection 接口的 commit 方法:

@Override
public void commit() throws SQLException {
    
    // 針對分支事務處理
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } 

    // 針對 GlobalLock 的處理
    else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

執行一階段本地事務提交

分支事務:代理連接的一階段提交

如果是分支事務,調用 processGlobalTransactionCommit 方法進行提交

private void processGlobalTransactionCommit() throws SQLException {
		try {
		  // 調用 RM 注冊分支事務,包括行記錄的主鍵作為全局鎖
			register();
		} catch (TransactionException e) {
		  // 如果報鎖沖突異常,則 executeCommitTrue 會循環等待
		  recognizeLockKeyConflictException(e, context.buildLockKeys());
		}
		try {
		  // 分支注冊成功不拋異常,則將 UndoLog 插入數據庫
			// 插入回滾日志						UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
			// 真實連接提交
			 // 將業務修改和 UndoLog 一並提交
		   targetConnection.commit();
		} catch (Throwable ex) {
		LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
	        // 匯報分支狀態為一階段失敗,默認失敗會重試五次
	        report(false);
				throw new SQLException(ex);
		}
		// 是否報告一階段提交完成,默認為false
		if (IS_REPORT_SUCCESS_ENABLE) {
		  // 匯報分支狀態為一階段成功 
			report(true);
		}
		context.reset();
}

第一階段本地事務相關的問題

GlobalLock 的具體作用

如果是用 GlobalLock 修飾的業務方法,雖然該方法並非某個全局事務下的分支事務,但是它對數據資源的操作也需要先查詢全局鎖,如果存在其他 Seata 全局事務正在修改,則該方法也需等待。

所以,如果想要 Seata 全局事務執行期間,數據庫不會被其他事務修改,則該方法需要強制添加 GlobalLock 注解,來將其納入 Seata 分布式事務的管理范圍。

功能有點類似於 Spring 的 @Transactional 注解,如果你希望開啟事務,那么必須添加該注解,如果你沒有添加那么事務功能自然不生效,業務可能出 BUG;

Seata 也一樣,如果你希望某個不在全局事務下的 SQL 操作不影響 AT 分布式事務,那么必須添加 GlobalLock 注解。

public class ConnectionProxy extends AbstractConnectionProxy {

private void processLocalCommitWithGlobalLocks() throws SQLException {
    
    // 查詢這些主鍵是不是被其他全局事務鎖住,如果有就拋出鎖沖突異常
    checkLock(context.buildLockKeys());
    try {

        // 否則,提交事務,因為該方法的修改並不影響已存在的 Seata 分布式事務
        targetConnection.commit();
    } catch (Throwable ex) {
        throw new SQLException(ex);
    }
    context.reset();
}

檢查鎖

public class ConnectionProxy extends AbstractConnectionProxy {
  

    public void checkLock(String lockKeys) throws SQLException {
        if (!StringUtils.isBlank(lockKeys)) {
            try {
                boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT, this.getDataSourceProxy().getResourceId(), this.context.getXid(), lockKeys);
                if (!lockable) {
                    throw new LockConflictException();
                }
            } catch (TransactionException var3) {
                this.recognizeLockKeyConflictException(var3, lockKeys);
            }

        }
    }
    

匯報狀態

    /**
 * abstract ResourceManager
 *
 * @author zhangsen
 */
public abstract class AbstractResourceManager implements ResourceManager {


    
       /**
     * report branch status
     *
     * @param branchType      the branch type
     * @param xid             the xid
     * @param branchId        the branch id
     * @param status          the status
     * @param applicationData the application data
     * @throws TransactionException
     */
    @Override
    public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData) throws TransactionException {
        try {
            BranchReportRequest request = new BranchReportRequest();
            request.setXid(xid);
            request.setBranchId(branchId);
            request.setStatus(status);
            request.setApplicationData(applicationData);

            BranchReportResponse response = (BranchReportResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
            }
        } catch (TimeoutException toe) {
            throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
        } catch (RuntimeException rex) {
            throw new RmTransactionException(TransactionExceptionCode.BranchReportFailed, "Runtime", rex);
        }
    }
    

RM是如何加入到全局事務中的呢?

答案是: seata數據源代理,
通過DataSourceProxy才能在業務代碼的事務提交時,seata通過這個切入點,來給TC發送RM的處理結果

RM是加入到全局事務中的具體步驟

1.獲取business-service傳來的XID
2.綁定XID到當前上下文中
3.執行業務邏輯sql
4.向TC創建本次RM的Netty連接
5.向TC發送分支事務的相關信息
6.獲得TC返回的branchId
7.記錄Undo Log數據
8.向TC發送本次事務PhaseOne階段的處理結果
9.從當前上下文中解綁XID

連接代理類ConnectionProxy的核心代碼

//部分代碼
public class ConnectionProxy extends AbstractConnectionProxy {
	@Override
    public void commit() throws SQLException {
     try {
     LOCK_RETRY_POLICY.execute(() -> {
     doCommit();
     return null;
     });
     } catch (SQLException e) {
     throw e;
     } catch (Exception e) {
     throw new SQLException(e);
     }
    }

    private void doCommit() throws SQLException {
    	//如果當前是全局事務,則執行全局事務的提交
    	//判斷是不是全局事務,就是看當前上下文是否存在XID
     if (context.inGlobalTransaction()) {
     processGlobalTransactionCommit();
     } else if (context.isGlobalLockRequire()) {
     processLocalCommitWithGlobalLocks();
     } else {
     targetConnection.commit();
     }
    }

    private void processLocalCommitWithGlobalLocks() throws SQLException {
     checkLock(context.buildLockKeys());
     try {
     targetConnection.commit();
     } catch (Throwable ex) {
     throw new SQLException(ex);
     }
     context.reset();
    }

    private void processGlobalTransactionCommit() throws SQLException {
     try {
     	//首先是向TC注冊RM,拿到TC分配的branchId
     register();
     } catch (TransactionException e) {
     recognizeLockKeyConflictException(e, context.buildLockKeys());
     }
     try {
     	//寫入undolog
     UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
     //提交本地事務,可以看到寫入undolog和業務數據是在同一個本地事務中
     targetConnection.commit();
     } catch (Throwable ex) {
     	//向TC發送RM的事務處理失敗的通知
     LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
     report(false);
     throw new SQLException(ex);
     }
     //向TC發送rm的事務處理成功的通知
     if (IS_REPORT_SUCCESS_ENABLE) {
     report(true);
     }
     context.reset();
    }
	//注冊RM,構建request通過netty向TC發送指令
    //將返回的branchId存在上下文中
    private void register() throws TransactionException {
     if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
     return;
     }
     Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
     null, context.getXid(), null, context.buildLockKeys());
     context.setBranchId(branchId);
    }
}

由於業務代碼本身的事務提交被ConnectionProxy代理,所以在提交本地事務時,實際執行的是ConnectionProxy的commit方法

RM如何綁定 XID到上下文

springboot 場景的TransactionPropagationIntercepter完成了bind和unbind XID到上下文中。

public class TransactionPropagationIntercepter extends HandlerInterceptorAdapter {


    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationIntercepter.class);


    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
     String xid = RootContext.getXID();
     String rpcXid = request.getHeader(RootContext.KEY_XID);

     if (LOGGER.isDebugEnabled()) {	
     LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
     }
     if (rpcXid != null) {
     RootContext.bind(rpcXid);
     if (LOGGER.isDebugEnabled()) {
     LOGGER.debug("bind[{}] to RootContext", rpcXid);
     }
     }


     return true;
    }

    @Override
    public void postHandle(
     HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) {
     XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
    }


}
/**
 * Auto bean add for spring context if in springboot env.
 *
 * @author wangxb
 */
@Configuration
public class HttpAutoConfiguration implements WebMvcConfigurer {


    @Override
    public void addInterceptors(InterceptorRegistry registry) {
     registry.addInterceptor(new TransactionPropagationIntercepter());
    }

}

至此一階段事務完成

全局事務二階段提交

圖解:TM全局事務二階段 Commit 流程

對服務端來說,等到一階段完成未拋異常,全局事務的發起方會向服務端申請提交這個全局事務,服務端根據 xid 查詢出該全局事務后加鎖並關閉這個全局事務,目的是防止該事務后續還有分支繼續注冊上來,同時將其狀態從 Begin 修改為 Committing

緊接着,判斷該全局事務下的分支類型是否均為 AT 類型,若是則服務端會進行異步提交,因為 AT 模式下一階段完成數據已經落地。服務端僅僅修改全局事務狀態為 AsyncCommitting,然后會有一個定時線程池去存儲介質(File 或者 Database)中查詢出待提交的全局事務日志進行提交,如果全局事務提交成功則會釋放全局鎖並刪除事務日志。整個流程如下圖所示:

img

對客戶端來說,先是接收到服務端發送的 branch commit 請求,然后客戶端會根據 resourceId 找到相應的 ResourceManager,接着將分支提交請求封裝成 Phase2Context 插入內存隊列 ASYNC_COMMIT_BUFFER,客戶端會有一個定時線程池去查詢該隊列進行 UndoLog 的異步刪除。

一旦客戶端提交失敗或者 RPC 超時,則服務端會將該全局事務狀態置位 CommitRetrying,之后會由另一個定時線程池去一直重試這些事務直至成功。整個流程如下圖所示:

img

圖解:RM分支事務的二階段提交

在這里插入圖片描述

二階段如果順利提交的話,因為“業務 SQL”在一階段已經提交至數據庫,所以 Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理即可。

邏輯:RM的二階段提交

AT模式的資源管理器(RMHandlerAT) 接受事物協調者(TC)的分支提交請求

  • 由資源管理器(RMHandlerAT)執行分支提交請求
  • AT模式的資源管理器內部由異步工作器(asyncWorker)執行, 將請求用非阻塞(offer)的方式插入到blockingQueue中
  • asyncWorker內部有一個定時器, 1秒鍾執行一次(在上次執行完之后)。 定時器不停的用非阻塞的(poll)方式從阻塞隊列中獲取數據,然后批量刪除回滾日志

在RMClient初始化時,啟動了RMHandlerAT接收TC在二階段發出的提交或者回滾請求

在RM啟動時創建了與TC通訊的Netty連接,TC在獲取各RM的匯報結果后,就會給各RM發送commit或rollback的指令

io.seata.rm.AbstractRMHandler.handle(BranchCommitRequest request) :


@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
		BranchCommitResponse response = new BranchCommitResponse();
		exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
						@Override
		public void execute(BranchCommitRequest request, BranchCommitResponse response)
			 TransactionException {
					doBranchCommit(request, response);
			}
		}, request, response);
		return response;
}


RM提交分支事務 doBranchCommit

具體看下doBranchCommit的過程:

io.seata.rm.AbstractRMHandler.doBranchCommit():


/**
* Do branch commit.
*
* @param request  the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
			throws TransactionException {
			String xid = request.getXid();
			long branchId = request.getBranchId();
			String resourceId = request.getResourceId();
			String applicationData = request.getApplicationData();
			if (LOGGER.isInfoEnabled()) {
							LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
			}
			BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
							applicationData);
			response.setXid(xid);
			response.setBranchId(branchId);
			response.setBranchStatus(status);
			if (LOGGER.isInfoEnabled()) {
							LOGGER.info("Branch commit result: " + status);
			}

}

獲取request里的xid 、branchId 、resourceId、applicationData 、branchType,這里的branchType是一個枚舉類型:


package io.seata.core.model;

public enum BranchType {
    AT,
    TCC,
    SAGA,
    XA;

異步提交分支事務

然后執行branchCommit,將需要提交的XID加入list:
io.seata.rm.datasource.AsyncWorker.branchCommit():

public class AsyncWorker implements ResourceManagerInbound {

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
																																	String applicationData) throws TransactionException {
				
				//加入BlockingQueue
																																	
				if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
								LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
				}
				return BranchStatus.PhaseTwo_Committed;
}

異步刪除對應的undo_log記錄

全局提交時,RM只需刪除Undo_log表

img

因為一階段本地事務已經提交了,如果是全局提交只需要異步刪除對應的undo_log記錄即可,所以有如下操作:

/AT模式下,最終是由AsyncWorker執行提交

//通過一個定時任務消費list中的待提交XID
public synchronized void init() {
	LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
	ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
	
	//每秒執行
	
	timerExecutor.scheduleAtFixedRate(() -> {
					try {

									doBranchCommits();

					} catch (Throwable e) {
									LOGGER.info("Failed at async committing ... {}", e.getMessage());

					}
	}, 10, 1000 * 1, TimeUnit.MILLISECONDS);
}

private void doBranchCommits() {
	if (ASYNC_COMMIT_BUFFER.isEmpty()) {
					return;
	}

	Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
	//一次定時任務取出ASYNC_COMMIT_BUFFER中的所有待辦數據
	//以resourceId作為key分組待辦數據,resourceId就是一個數據庫的連接url
	//在前面的日志中可以看到,目的是為了覆蓋應用的多數據源問題
	while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
					Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
					List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());
					contextsGroupedByResourceId.add(commitContext);
	}

	for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
		Connection conn = null;
		DataSourceProxy dataSourceProxy;
		try {
			try {
							DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
											.getResourceManager(BranchType.AT);
											
							// //根據resourceId查找對應dataSourceProxy				
							dataSourceProxy = resourceManager.get(entry.getKey());
							if (dataSourceProxy == null) {
											throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
							}
							conn = dataSourceProxy.getPlainConnection();
			} catch (SQLException sqle) {
							LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
							continue;
			}
			List<Phase2Context> contextsGroupedByResourceId = entry.getValue();
			Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
			Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
			for (Phase2Context commitContext : contextsGroupedByResourceId) {
				xids.add(commitContext.xid);
				branchIds.add(commitContext.branchId);
				int maxSize = Math.max(xids.size(), branchIds.size());

					//1000個一起執行
				if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
								try {
				//刪除相應的undo_log記錄
				UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
																xids, branchIds, conn);
								} catch (Exception ex) {
												LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
								}
								xids.clear();
								branchIds.clear();
				}
			}

			if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
							return;
			}
			
			//剩余未滿1000的,在執行一次

			try {
			
					//刪除undo_log
							UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
											branchIds, conn);
			} catch (Exception ex) {
							LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
			}

			if (!conn.getAutoCommit()) {
							conn.commit();
			}
		} catch (Throwable e) {
			LOGGER.error(e.getMessage(), e);
			try {
							conn.rollback();
			} catch (SQLException rollbackEx) {
							LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
			}
		} finally {
			if (conn != null) {
							try {
											conn.close();
							} catch (SQLException closeEx) {
											LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
							}
			}
		}
	}
}

通過資源管理器提交分支事務

DefaultResourceManager.branchCommit

public class DefaultResourceManager implements ResourceManager {

	@Override
	public BranchStatus branchCommit(BranchType branchType, String xid, long branchId,
																																		String resourceId, String applicationData)
					throws TransactionException {
					return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);
	}

分支事務子類的完成提交

分支事務的子類

在這里插入圖片描述

ResourceManagerXA.branchCommit

/**
 * RM for XA mode.
 *
 * @author sharajava
 */
public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager {


@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
																																	String applicationData) throws TransactionException {
				return finishBranch(true, branchType, xid, branchId, resourceId, applicationData);
}


全局事務的二階段回滾

官方的RM的二階段回滾

在RMClient初始化時,啟動了RMHandlerAT接收TC在二階段發出的提交或者回滾請求

at模式的整體流程

所以,二階段回滾由事物協調者(TC)發起, 微服務的資源管理器執行的操作

img

AT模式由 RMHandlerAT#handle(BranchRollbackRequest request) 處理

  • 通過全局事物ID(xid)和分支事物id(branchId)查詢回滾日志表(undo_log)獲得回滾日志
  • 通過數據庫類型和回滾日志創建執行器(Executor)
  • 由執行器驅動數據回滾, 首先進行數據驗證,驗證通過則回滾

如果相等就不用執行數據回滾,然后對比前置鏡像和當前對象,
如果相等就不用執行數據回滾,
如果后置鏡像和當前對象不相等就拋出臟數據檢查異常,
如果后置鏡像和當前對象相等,執行數據回滾。

  • 如果查詢到了回滾日志, 刪除回滾日志。 如果沒查詢到回滾日志, 插入一條狀態全局事物已完成的回滾日志 。

圖解TC二階段 Rollback 流程

回滾相對復雜一些,如果發起方一階段拋異常會向服務端請求回滾該全局事務,服務端會根據 xid 查詢出這個全局事務,加鎖關閉事務使得后續不會再有分支注冊上來,並同時更改其狀態 BeginRollbacking,接着進行同步回滾以保證數據一致性。除了同步回滾這個點外,其他流程同提交時相似,如果同步回滾成功則釋放全局鎖並刪除事務日志,如果失敗則會進行異步重試。整個流程如下圖所示:

img

圖解RM二階段 Rollback 流程

客戶端接收到服務端的 branch rollback 請求,先根據 resourceId 拿到對應的數據源代理,然后根據 xidbranchId 查詢出 UndoLog 記錄,反序列化其中的 rollback 字段拿到數據的前后快照,我們稱該全局事務為 A

根據具體 SQL 類型生成對應的 UndoExecutor,校驗一下數據 UndoLog 中的前后快照是否一致或者前置快照和當前數據(這里需要 SELECT 一次)是否一致,如果一致說明不需要做回滾操作,如果不一致則生成反向 SQL 進行補償,在提交本地事務前會檢測獲取數據庫本地鎖是否成功,如果失敗則說明存在其他全局事務(假設稱之為 B)的一階段正在修改相同的行,但是由於這些行的主鍵在服務端已經被當前正在執行二階段回滾的全局事務 A 鎖定,因此事務 B 的一階段在本地提交前嘗試獲取全局鎖一定是失敗的,等到獲取全局鎖超時后全局事務 B 會釋放本地鎖,這樣全局事務 A 就可以繼續進行本地事務的提交,成功之后刪除本地 UndoLog 記錄。整個流程如下圖所示:

img

二階段回滾

在這里插入圖片描述

二階段如果是回滾的話,Seata 就需要回滾一階段已經執行的“業務 SQL”,還原業務數據。回滾方式便是用“before image”還原業務數據;但在還原前要首先要校驗臟寫 ,對比”數據庫當前業務數據”和"after image”,如果兩份數據完全一致就說明沒有臟寫, 可以還原業務數據,如果不一致就說明有臟寫,出現臟寫就需要轉人工處理

AbstractRMHandler.handle(BranchRollbackRequest request)


@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
BranchRollbackResponse response = new BranchRollbackResponse();
	exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
		@Override
		public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
						throws TransactionException {
						doBranchRollback(request, response);
		}
	}, request, response);
	return response;
}

/**
* Do branch rollback.
*
* @param request  the request
* @param response the response
* @throws TransactionException the transaction exception
*/
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
		throws TransactionException {
		String xid = request.getXid();
		long branchId = request.getBranchId();
		String resourceId = request.getResourceId();
		String applicationData = request.getApplicationData();
		if (LOGGER.isInfoEnabled()) {
						LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);
		}
		BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
						applicationData);
		response.setXid(xid);
		response.setBranchId(branchId);
		response.setBranchStatus(status);
		if (LOGGER.isInfoEnabled()) {
						LOGGER.info("Branch Rollbacked result: " + status);
		}
}

DataSourceManager 執行回滾

public class DataSourceManager extends AbstractResourceManager implements Initialize {


@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
																							String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
		throw new ShouldNeverHappenException();
}
try {
		UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
		StackTraceLogger.info(LOGGER, te,
						"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
						new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
		if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
						return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
		} else {
						return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
		}
}
return BranchStatus.PhaseTwo_Rollbacked;

}

AbstractUndoLogManager 執行回滾

public abstract class AbstractUndoLogManager implements UndoLogManager

/**
* Undo.
*
* @param dataSourceProxy the data source proxy
* @param xid     the xid
* @param branchId     the branch id
* @throws TransactionException the transaction exception
*/
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
try {
	conn = dataSourceProxy.getPlainConnection();

	// The entire undo process should run in a local transaction.
	if (originalAutoCommit = conn.getAutoCommit()) {
					conn.setAutoCommit(false);
	}
		//根據Xid查詢出數據
	// Find UNDO LOG
	selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
	selectPST.setLong(1, branchId);
	selectPST.setString(2, xid);
	rs = selectPST.executeQuery();

	boolean exists = false;
	while (rs.next()) {
					exists = true;
					//防重復提交
					// It is possible that the server repeatedly sends a rollback request to roll back
					// the same branch transaction to multiple processes,
					// ensuring that only the undo_log in the normal state is processed.
					int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
					if (!canUndo(state)) {
									if (LOGGER.isInfoEnabled()) {
													LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
									}
									return;
					}

					String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
					Map<String, String> context = parseContext(contextString);
					byte[] rollbackInfo = getRollbackInfo(rs);

					String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
					UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
									: UndoLogParserFactory.getInstance(serializer);
					BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

					try {
									// put serializer name to local
									setCurrentSerializer(parser.getName());
									List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
									if (sqlUndoLogs.size() > 1) {
													Collections.reverse(sqlUndoLogs);
									}
									//反解析出回滾SQL並執行
									for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
													TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
																	conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
													sqlUndoLog.setTableMeta(tableMeta);
													AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
																	dataSourceProxy.getDbType(), sqlUndoLog);
													undoExecutor.executeOn(conn);
									}
					} finally {
									// remove serializer name
									removeCurrentSerializer();
					}
	}

	// If undo_log exists, it means that the branch transaction has completed the first phase,
	// we can directly roll back and clean the undo_log
	// Otherwise, it indicates that there is an exception in the branch transaction,
	// causing undo_log not to be written to the database.
	// For example, the business processing timeout, the global transaction is the initiator rolls back.
	// To ensure data consistency, we can insert an undo_log with GlobalFinished state
	// to prevent the local transaction of the first phase of other programs from being correctly submitted.
	// See https://github.com/seata/seata/issues/489

	if (exists) {
					deleteUndoLog(xid, branchId, conn);
					conn.commit();
					if (LOGGER.isInfoEnabled()) {
									LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
													State.GlobalFinished.name());
					}
	} else {
					insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
					conn.commit();
					if (LOGGER.isInfoEnabled()) {
									LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
													State.GlobalFinished.name());
					}
	}

	return;
} catch (SQLIntegrityConstraintViolationException e) {
	// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
	if (LOGGER.isInfoEnabled()) {
					LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
	}
} catch (Throwable e) {
	if (conn != null) {
					try {
									conn.rollback();
					} catch (SQLException rollbackEx) {
									LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
					}
	}
	throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
					.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
									branchId, e.getMessage()), e);

} finally {
	try {
					if (rs != null) {
									rs.close();
					}
					if (selectPST != null) {
									selectPST.close();
					}
					if (conn != null) {
									if (originalAutoCommit) {
													conn.setAutoCommit(true);
									}
									conn.close();
					}
	} catch (SQLException closeEx) {
					LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
	}
}
}
}

參考文檔:

seata 官方文檔地址:

http://seata.io/zh-cn/docs/overview/what-is-seata.html

https://www.cnblogs.com/babycomeon/p/11504210.html

https://www.cnblogs.com/javashare/p/12535702.html

https://blog.csdn.net/chen_kkw/article/details/94757874

https://blog.csdn.net/qq853632587/article/details/111356009

https://blog.csdn.net/qq_35721287/article/details/103573862

https://www.cnblogs.com/anhaogoon/p/13033986.html

https://blog.51cto.com/u_15072921/2606182

https://blog.csdn.net/weixin_45661382/article/details/105539999

https://blog.csdn.net/f4761/article/details/89077400

https://blog.csdn.net/qq_27834905/article/details/107353159

https://zhuanlan.zhihu.com/p/266584169


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM