Seata的分布式事務實現原理


Seata分布式事務方案

簡介

Seata是阿里開源的分布式事務解決方案中間件,對業務侵入小,在應用中Seata整體事務邏輯基於兩階段提交的模型,核心概念包含三個角色:

  • TM:事務發起者。用來告訴TC全局事務的開始,提交,回滾。
  • RM:事務資源,每一個RM都會作為一個分支事務注冊在TC。
  • TC:事務協調者,即獨立運行的seata-server,用於接收事務注冊,提交和回滾。

Seata的運行分AT和MT兩種模式。還有其他的模式如SAGA,還未研究。

AT(Auto Transaction)模式

這個模式需要模塊為Java語言,並且數據庫支持本地事務。一個典型的分布式事務過程:

  • TM 向 TC 申請開啟一個全局事務,全局事務創建並生成一個全局唯一的XID。
  • XID 在微服務調用鏈路的上下文中傳播。
  • RM 向 TC 注冊分支事務,將其納入 XID 對應全局事務的管轄。
  • TM 向 TC 發起針對 XID 的全局提交或回滾決議。
  • TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。

MT(Manual Transaction)模式

這個模式適合其他的場景,因為底層存儲可能沒有事務支持,需要自己實現 prepare、commit和rollback的邏輯

源碼分析

參考 https://juejin.im/post/6844904148089962510

初始化

全局的兩階段提交,實際上是通過對數據源的代理實現的,Seata中的代理數據源對druid數據源做了一層代理

兩階段提交

在需要加全局事務的方法上,加上GlobalTransactional注解,Seata中攔截全局事務的攔截器是GlobalTransactionalInterceptor

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
        : null;
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

    final GlobalTransactional globalTransactionalAnnotation =
        getAnnotation(method, targetClass, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
    if (!disable && globalTransactionalAnnotation != null) {
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (!disable && globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}

調用handleGlobalTransaction方法開啟全局事務;否則按普通方法執行。handleGlobalTransaction方法

private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            @Override
            public TransactionInfo getTransactionInfo() {
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                throw e.getCause();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));

        }
    }
}

方法中調用了TransactionalTemplate的execute方法

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1 get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 1.2 Handle the Transaction propatation and the branchType
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                suspendedResourcesHolder = tx.suspend(true);
                return business.execute();
            case REQUIRES_NEW:
                suspendedResourcesHolder = tx.suspend(true);
                break;
            case SUPPORTS:
                if (!existingTransaction()) {
                    return business.execute();
                }
                break;
            case REQUIRED:
                break;
            case NEVER:
                if (existingTransaction()) {
                    throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                    ,RootContext.getXID()));
                } else {
                    return business.execute();
                }
            case MANDATORY:
                if (!existingTransaction()) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }


        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        tx.resume(suspendedResourcesHolder);
    }
}

該方法中主要有以下幾個步驟:

  1. 獲取事務信息,
  2. 開啟事務
  3. 執行業務方法
  4. 提交事務(沒有拋出異常)
  5. 回滾操作(拋出異常)

beginTransaction最終調用了DefaultGlobalTransaction的begin方法

@Override
public void begin(int timeout, String name) throws TransactionException {
    if (role != GlobalTransactionRole.Launcher) {
        assertXIDNotNull();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
        }
        return;
    }
    assertXIDNull();
    if (RootContext.getXID() != null) {
        throw new IllegalStateException();
    }
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Begin new global transaction [{}]", xid);
    }
}

通過transactionManager.begin()方法通過TmRpcClient與server通信並生成一個xid,再將將xid綁定到Root上下文中。全局事務攔截成功后還是會執行原業務方法,但是由於seata代理了數據源,sql解析undolog是在代理數據源中完成的。seata不止會代理數據源,還會對Connection,Statement做代理封裝。對sql解析發生在StatementProxy中

@Override
public ResultSet executeQuery(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}

最終執行了ExecuteTemplate類的execute方法:

public static <T, S extends Statement> T execute(
        List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, 
        StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
    if (!shouldExecuteInATMode()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizers == null) {
        sqlRecognizers = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor;
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        executor = new PlainExecutor<>(statementProxy, statementCallback);
    } else {
        if (sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<>(statementProxy, statementCallback);
                    break;
            }
        } else {
            executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
        }
    }
    T rs;
    try {
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException) ex;
    }
    return rs;
}

先判斷是否存在全局事務,不在全局事務中按普通方法執行,如果在全局事務中則開始解析sql,對不同的DML語句做響應的處理,再調用執行方法。具體流程為:

  1. 先判斷是否開啟了全局事務,如果沒有,不走代理,不解析sql。
  2. 調用SQLVisitorFactory對目標sql進行解析。
  3. 針對特定類型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等進行特殊解析。
  4. 執行sql並返回結果。

關鍵點在於特定類型執行器里面的execute方法(下面以InsertExecutor類的execute方法舉例), 調用了父類BaseTransactionalExecutor的execute方法,

@Override
public T execute(Object... args) throws Throwable {
    if (RootContext.inGlobalTransaction()) {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    return doExecute(args);
}

將XID綁定到connectionProxy中並調用了doExecute方法,這里又調用了它的子類的AbstractDMLBaseExecutor的doExecute方法

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

executeAutoCommitTrue方法中也會將AutoCommit屬性設置為false,對sql進行解析生成undolog,防止在undolog生成之前入庫。

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.setAutoCommit(false);
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

在將AutoCommit設置為false后會繼續執行AbstractDMLBaseExecutor中的executeAutoCommitFalse(args)

  protected T executeAutoCommitFalse(Object[] args) throws Exception {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

注意這是一個很關鍵的方法,executeAutoCommitFalse中主要分四步執行:

  1. 獲取sql執行前記錄快照beforeImage;
  2. 執行sql;
  3. 獲取sql執行后記錄快照afterimage;
  4. 根據beforeImage,afterImage生成undolog記錄並添加到connectionProxy的上下文中

生成undolog的方法,就是記錄lockKey后,將beforeImage和afterImage都記錄下來

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    if (!beforeImage.getRows().isEmpty() || !afterImage.getRows().isEmpty()) {
        ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = this.sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = this.buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);
        SQLUndoLog sqlUndoLog = this.buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
    SQLType sqlType = this.sqlRecognizer.getSQLType();
    String tableName = this.sqlRecognizer.getTableName();
    SQLUndoLog sqlUndoLog = new SQLUndoLog();
    sqlUndoLog.setSqlType(sqlType);
    sqlUndoLog.setTableName(tableName);
    sqlUndoLog.setBeforeImage(beforeImage);
    sqlUndoLog.setAfterImage(afterImage);
    return sqlUndoLog;
}

最終會通過UndoLogManager,對undolog記錄進行undo或delete操作

try {
    // put serializer name to local
    setCurrentSerializer(parser.getName());
    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
    if (sqlUndoLogs.size() > 1) {
        Collections.reverse(sqlUndoLogs);
    }
    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
            conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
        sqlUndoLog.setTableMeta(tableMeta);
        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
            dataSourceProxy.getDbType(), sqlUndoLog);
        undoExecutor.executeOn(conn);
    }
} finally {
    // remove serializer name
    removeCurrentSerializer();
}

但是在這之前,會對當前數據庫中的記錄和afterImage中的記錄進行對比,需要相同才會繼續進行

/**
 * Data validation.
 *
 * @param conn the conn
 * @return return true if data validation is ok and need continue undo, and return false if no need continue undo.
 * @throws SQLException the sql exception such as has dirty data
 */
protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {

    TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
    TableRecords afterRecords = sqlUndoLog.getAfterImage();

    // Compare current data with before data
    // No need undo if the before data snapshot is equivalent to the after data snapshot.
    Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
    if (beforeEqualsAfterResult.getResult()) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Stop rollback because there is no data change " +
                    "between the before data snapshot and the after data snapshot.");
        }
        // no need continue undo.
        return false;
    }

    // Validate if data is dirty.
    TableRecords currentRecords = queryCurrentRecords(conn);
    // compare with current data and after image.
    Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
    if (!afterEqualsCurrentResult.getResult()) {

        // If current data is not equivalent to the after data, then compare the current data with the before 
        // data, too. No need continue to undo if current data is equivalent to the before data snapshot
        Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
        if (beforeEqualsCurrentResult.getResult()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Stop rollback because there is no data change " +
                        "between the before data snapshot and the current data snapshot.");
            }
            // no need continue undo.
            return false;
        } else {
            if (LOGGER.isInfoEnabled()) {
                if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
                    LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
                }
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("check dirty datas failed, old and new data are not equal," +
                        "tableName:[" + sqlUndoLog.getTableName() + "]," +
                        "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
                        "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
            }
            throw new SQLException("Has dirty records when undo.");
        }
    }
    return true;
}

從這個可以說明,Seata的分布式事務實際上全局是串行的,對於存在熱點資源的情況下,會導致性能問題。

分支事務注冊與事務提交
業務sql和undolog執行完成后會在代理連接ConnectionProxy中執行commit操作

@Override
public void commit() throws SQLException {
    try {
        LOCK_RETRY_POLICY.execute(() -> {
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

如果處於全局事務中則調用processGlobalTransactionCommit處理全局事務提交;
如果加了全局鎖注釋調用 processLocalCommitWithGlobalLocks()加全局鎖並提交;
其他情況直接進行事務提交。

private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}

processGlobalTransactionCommit方法有以下幾個操作步驟:

  1. 注冊分支事務,將branchId分支綁定在上下文中。
  2. 如果包含undolog,則將之前綁定到上下文中的undolog進行入庫;
  3. 提交本地事務;
  4. 如果操作失敗,report()中通過RM提交第一階段失敗消息,如果成功,report()提交第一階段成功消息
private void report(boolean commitDone) throws SQLException {
    if (context.getBranchId() == null) {
        return;
    }
    int retry = REPORT_RETRY_COUNT;
    while (retry > 0) {
        try {
            DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
            return;
        } catch (Throwable ex) {
            LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                + commitDone + "] Retry Countdown: " + retry);
            retry--;

            if (retry == 0) {
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

由於undolog入庫和業務sql的執行調用了同一個connection,處於同一個事務中,這就保證了業務sql和undolog肯定是成對存在。

總結

Seata的AT模式實現的是一個傳統意義的分布式事務,以自動生成undolog的形式實現了各資源節點的兩段式提交。
這個方案的好處在於對現有基於MySQL, PostgreSQL和Oracle的應用可以快速實現分布式事務並且對現有代碼無需大量改造,但是缺點在於整體是串行的,並且因為undolog的處理會帶來額外損耗,不能解決熱點資源的性能問題。
Seata在每個子模塊中增加undolog表, 利用節點數據庫的單機事務保證子事務和補償信息的原子性, 可以在分布式事務設計中借鑒

其他: 分布式事務的模式有2PC, TCC, SAGA等, 其中SAGA有集中編排和自由編排兩種形式, 分布式事務框架除了Seata, 還有Axon, ServiceComb等.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM