Seata AT模式解析


AT 模式下,把每個數據庫被當做是一個 Resource,Seata 里稱為 DataSource Resource。業務通過 JDBC 標准接口訪問數據庫資源時,Seata 框架會對所有請求進行攔截,做一些操作。每個本地事務提交時,Seata RM(Resource Manager,資源管理器) 都會向 TC(Transaction Coordinator,事務協調器) 注冊一個分支事務。當請求鏈路調用完成后,發起方通知 TC 提交或回滾分布式事務,進入二階段調用流程。此時,TC 會根據之前注冊的分支事務回調到對應參與者去執行對應資源的第二階段。TC 是怎么找到分支事務與資源的對應關系呢?每個資源都有一個全局唯一的資源 ID,並且在初始化時用該 ID 向 TC 注冊資源。在運行時,每個分支事務的注冊都會帶上其資源 ID。這樣 TC 就能在二階段調用時正確找到對應的資源。

Seata 中有三大模塊,分別是 TM、RM 和 TC。 其中 TM 和 RM 是作為 Seata 的客戶端與業務系統集成在一起,TC 作為 Seata 的服務端獨立部署。

在 Seata 中,分布式事務的執行流程:

  • TM 開啟分布式事務(TM 向 TC 注冊全局事務記錄);
  • 按業務場景,編排數據庫、服務等事務內資源(RM 向 TC 匯報資源准備狀態 );
  • TM 結束分布式事務,事務一階段結束(TM 通知 TC 提交/回滾分布式事務);
  • TC 匯總事務信息,決定分布式事務是提交還是回滾;
  • TC 通知所有 RM 提交/回滾 資源,事務二階段結束。

Seata 會有 4 種分布式事務解決方案,分別是 AT 模式、TCC 模式、Saga 模式和 XA 模式,本文主要介紹AT模式。

  • AT:Auto Transaction,基於支持本地ACID事務的關系型數據庫,對業務無侵入;
  • MT:Manual Transaction,不依賴於底層數據資源的事務支持,需自定義prepare/commit/rollback操作,對業務有侵入;
  • XA:基於數據庫的XA實現,目前最新版seata已實現該模式。
  • TCC:TCC模式,對業務有侵入。

Seata類圖

TC類圖

TC在Seata中其實就是Server端,Client端其實屬於RM和TM,但是為了看起來完整,把Client端的RpcCient和Listener也放在了這里,類圖如下圖所示:

CLient listenner監聽服務端發過來的消息,從而進行相應的操作。

RM類圖

RM相關類圖如下圖所示:

TM類圖

TM相關類圖如下圖所示:

AT模式

AT 模式是一種無侵入的分布式事務解決方案。在 AT 模式下,用戶只需關注自己的“業務 SQL”,用戶的 “業務 SQL” 作為一階段,Seata 框架會自動生成事務的二階段提交和回滾操作。

在分析AT模式前,我們先提問幾個問題:

1、分布式Seata,怎么在系統中集成使用?
2、開始事務前,做了什么初始化工作?
3、AT模式具體怎么實現的?

初始化工作

Seata在GlobalTransactionScanner中進行了TM和RM初始化:

private void initClient() {
    //init TM
    TMClient.init(applicationId, txServiceGroup);

    //init RM
    RMClient.init(applicationId, txServiceGroup);
    
    //Register Spring ShutdownHook
    registerSpringShutdownHook();
}

其中TMClient的初始化如下:

public static void init(String applicationId, String transactionServiceGroup) {
    TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
    tmRpcClient.init();
}

其中初始化了一個TmRpcClient對象,它的init方法中,會啟動與Server的重連接線程(每5秒),以及消息發送線程和超時線程。重連接是,按照配置方式,尋找Server信息,比如采用file形式,那么首先從file.conf中根據分組名稱(service_group)找到集群名稱(cluster_name),再根據集群名稱找到fescar-server集群ip端口列表,然后從ip列表中選擇一個用netty進行連接。

RmClient的初始化如下:

public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    rmRpcClient.init();
}

其中,主要工作為:

  • 創建一個RmRpcClient對象
  • RmRpcClient對象設置ResourceManager;
  • RmRpcClient對象設置消息Listener;
  • RmRpcClient對象初始化

初始化階段總結:

1)Spring啟動時,初始化了2個客戶端TmClient、RmClient
2)TmClient與Server通過Netty建立連接並發送消息
3)RmClient與Server通過Netty建立連接,負責接收二階段提交、回滾消息並在回調器(RmHandler)中做處理

因此,使用Seata,需要初始化GlobalTransactionScanner進行初始化工作!

TM全局事務控制

哪些方法需要進行全局事務控制了?Seata定義了注解GlobalTransactional,只要添加了注解的方法,就代表需要分布式事務。

注解對應着攔截器,Seata定義的攔截器為GlobalTransactionalInterceptor。其核心處理邏輯如下:

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

    final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
    final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
    if (globalTransactionalAnnotation != null) {
        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    } else if (globalLockAnnotation != null) {
        return handleGlobalLock(methodInvocation);
    } else {
        return methodInvocation.proceed();
    }
}

上面方法的邏輯為:如果業務方法上有全局事務GlobalTransactional注解時,就執行全局事務處理handleGlobalTransaction;如果業務方法上有全局鎖GlobalLock注解時,就執行全局鎖處理handleGlobalLock;否則,就按照普通方法執行。

handleGlobalTransaction中,會調用TransactionalTemplate的execute方法,execute方法如下:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // 1.1 get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    try {
        // 2. begin transaction
        beginTransaction(txInfo, tx);
        
        Object rs = null;
        try {
            // 3. Do Your Business
            rs = business.execute();
        } catch (Throwable ex) {
            // 4.the needed business exception to rollback.
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 5. everything is fine, commit.
        commitTransaction(tx);

        return rs;
    } finally {
        //6. clear
        triggerAfterCompletion();
        cleanUp();
    }
}

1、創建一個GlobalTransaction,並根據業務TransactionalExecutor獲取到事務需要的相關信息;

2、開始全局事務

開始全局事務的方法為beginTransaction方法,方法內部調用了DefaultGlobalTransaction.begin方法,而DefaultGlobalTransaction.begin方法的內部又調用了TransactionManager的begin方法,

xid = transactionManager.begin(null, null, name, timeout);

TransactionManager接口類的默認實現為DefaultTransactionManager,其begin方法為:

public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

方法中,向服務器發起開始全局事務請求,服務會返回一個全局事務標示XID。OK,此時,由TM發起的一個全局事務就開始了!

3、執行業務邏輯

4、回滾全局事務

如果業務都執行沒有全部成功,那么就執行全局事務回滾,方法為completeTransactionAfterThrowing。方法執行邏輯與開始事務方法內部邏輯基本一致:內部調用了DefaultGlobalTransaction.rollback方法,而DefaultGlobalTransaction.rollback方法的內部又調用了TransactionManager的rollback方法,向服務器發起rollback,服務會返回GlobalStatus.

5、提交全局事務

如果業務都執行成功,那么就全局commit,方法為commitTransaction。方法執行邏輯與開始事務方法和回滾事務內部邏輯基本一致:內部調用了DefaultGlobalTransaction.commit方法,而DefaultGlobalTransaction.commit方法的內部又調用了TransactionManager的commit方法,向服務器發起Commit,服務會返回GlobalStatus.

6、清理
完成整個事務后,進行相關資源變量清理。

AT模式一階段

在一階段,Seata 會攔截“業務 SQL”,首先解析 SQL 語義,找到“業務 SQL”要更新的業務數據,在業務數據被更新前,將其保存成“before image”,然后執行“業務 SQL”更新業務數據,在業務數據更新之后,再將其保存成“after image”,最后生成行鎖。以上操作全部在一個數據庫事務內完成,這樣保證了一階段操作的原子性。

實現上,Seata對數據源做了封裝代理,然后對於數據源的操作處理,就由Seata內部邏輯完成了。看一個Demo中的數據源加載配置:

@Bean(name = "order")
public DataSourceProxy masterDataSourceProxy(@Qualifier("originOrder") DataSource dataSource) {
    return new DataSourceProxy(dataSource);
}

從Demo中可以看到,我們使用的是Seata封裝的代理數據源DataSourceProxy。DataSourceProxy初始化時,會進行Resouce注冊:

private void init(DataSource dataSource, String resourceGroupId) {
    this.resourceGroupId = resourceGroupId;
    Connection connection = dataSource.getConnection()
    jdbcUrl = connection.getMetaData().getURL();
    dbType = JdbcUtils.getDbType(jdbcUrl, null);
    DefaultResourceManager.get().registerResource(this);
}

Seata除了對數據庫的DataSource進行了封裝,同樣也對Connection,Statement進行了封裝代理,分別為ConnectionProxy和StatementProxy。

DataSourceProxy中重寫了DataSource的getConnection方法,以此來獲得ConnectionProxy,方法如下:

@Override
public ConnectionProxy getConnection() throws SQLException {
    Connection targetConnection = targetDataSource.getConnection();
    return new ConnectionProxy(this, targetConnection);
}

ConnectionProxy代理了Sql connection。我們重點關於一下其createStatement方法,prepareStatement方法,commit方法,rollback方法以及setAutoCommit方法。

ConnectionProxy的createStatement會返回一個代理的StatementProxy,如下:

@Override
public Statement createStatement() throws SQLException {
    Statement targetStatement = getTargetConnection().createStatement();
    return new StatementProxy(this, targetStatement);
}

ConnectionProxy的prepareStatement同樣會返回一個代理的PreparedStatementProxy,如下:

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
    PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
    return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}

StatementProxy和PreparedStatementProxy是對執行Sql的Statement的一個封裝,兩者基本一樣,我們看看StatementProxy的execute方法:

@Override
public boolean execute(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
        @Override
        public Boolean execute(T statement, Object... args) throws SQLException {
            return statement.execute((String) args[0]);
        }
    }, sql);
}

可以看到,內部交給了ExecuteTemplate執行,ExecuteTemplate的execute方法如下:

public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                             StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {

    if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    if (sqlRecognizer == null) {
        sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
    }
    Executor<T> executor = null;
    if (sqlRecognizer == null) {
        executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
    } else {
        switch (sqlRecognizer.getSQLType()) {
            case INSERT:
                executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case UPDATE:
                executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case DELETE:
                executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            case SELECT_FOR_UPDATE:
                executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                break;
            default:
                executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                break;
        }
    }
    T rs = null;
    try {
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException)ex;
    }
    return rs;
}

方法內部流程如下:

1)先判斷是否開啟了全局事務,如果沒有,不走代理;
2)調用SQLVisitorFactory對目標sql進行解析
3)針對特定類型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等進行特殊解析
4)執行sql並返回結果

InsertExecutor,UpdateExecutor,DeleteExecutor均繼承自AbstractDMLBaseExecutor,AbstractDMLBaseExecutor又繼承自BaseTransactionalExecutor,BaseTransactionalExecutor又繼承自Executor。其中execute方法實現為:

@Override
public Object execute(Object... args) throws Throwable {
    if (RootContext.inGlobalTransaction()) {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }

    if (RootContext.requireGlobalLock()) {
        statementProxy.getConnectionProxy().setGlobalLockRequire(true);
    } else {
        statementProxy.getConnectionProxy().setGlobalLockRequire(false);
    }
    return doExecute(args);
}

其中doExecute方法如下:

@Override
public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

其中executeAutoCommitFalse方法為:

protected T executeAutoCommitFalse(Object[] args) throws Throwable {
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}

其中首先獲取執行Sql前的beforeImage,然后執行Sql,執行晚,獲取執行后的afterImage,並將執行前后Image賦給ConnectionProxy。注意插入,刪除,更新SQl語句,獲取beforeImage和afterImage的方法不同,因此定義了InsertExecutor,UpdateExecutor,DeleteExecutor。

執行完Sql,此時需要提交,ConnectionProxy的commit方法如下:

@Override
public void commit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

方法邏輯為:

1)如果處於全局事務中,則調用processGlobalTransactionCommit()處理全局事務提交
2)如果加了全局鎖注解,加全局鎖並提交
3)如果沒有對應注釋,按直接進行事務提交

重點關於一下processGlobalTransactionCommit代碼:

private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e);
    }

    try {
        if (context.hasUndoLog()) {
            UndoLogManager.flushUndoLogs(this);
        }
        targetConnection.commit();
    } catch (Throwable ex) {
        report(false);
        if (ex instanceof SQLException) {
            throw new SQLException(ex);
        }
    }
    report(true);
    context.reset();
}

流程分為如下幾步:

1、注冊分支事務register(),並將branchId分支id綁定到上下文中:

private void register() throws TransactionException {
    Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
            null, context.getXid(), null, context.buildLockKeys());
    context.setBranchId(branchId);
}

2、如果包含undolog,則將之前綁定到上下文中的undolog進行入庫:

UndoLogManager.flushUndoLogs(this);

前面提到,SQl執行時,會生成beforeImage和afterImage,最尾一個Unlog賦給ConneectionProxy,此時這里就是將這些Unlog保存到數據庫,SQl語句為:

private static String INSERT_UNDO_LOG_SQL = "INSERT INTO " + UNDO_LOG_TABLE_NAME +
    " (branch_id, xid, context, rollback_info, log_status, log_created, log_modified)" +
    " VALUES (?, ?, ?, ?, ?, now(), now())";

3、提交本地事務;

4、如果操作失敗,report()中通過RM提交Branch失敗消息,如果成功,report()提交Branch成功消息,其中Report方法如下:

private void report(boolean commitDone) throws SQLException {
    int retry = REPORT_RETRY_COUNT;
    while (retry > 0) {
        try {
          DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                    (commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed), null);
            return;
        } catch (Throwable ex) {
            retry--;
            if (retry == 0) {
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

AT模式二階段

AT是分為兩個階段的,第一階段,就是各個階段本地提交操作;第二階段會根據第一階段的情況決定是進行全局提交還是全局回滾操作。

全局提交回滾操作由TM發起,具體為,如果Branch執行沒有出現異常,那么就表明各個Branch均執行成功,即進行全局提交,如果某個Branch執行時出現異常,那么就需要進行全局回滾。代碼即為TransactionalTemplate的execute的第4步和5步,如下:

try {
    // 3. Do Your Business
    rs = business.execute();
} catch (Throwable ex) {
    // 4.the needed business exception to rollback.
    completeTransactionAfterThrowing(txInfo,tx,ex);
    throw ex;
}

// 5. everything is fine, commit.
commitTransaction(tx);

return rs;

Seata Serer 收到TM發送的提交或者回滾請求后,就會向各個RM 發送消息,讓他們執行相應的提交或者回滾操作。 RM接收消息是通過RMHandlerAT,在初始化RmListrenner時,指定了RMHandlerAT,其中處理方法為:

@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
    BranchCommitResponse response = new BranchCommitResponse();
    exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
        @Override
        public void execute(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            doBranchCommit(request, response);
        }
    }, request, response);
    return response;
}

@Override
public BranchRollbackResponse handle(BranchRollbackRequest request) {
    BranchRollbackResponse response = new BranchRollbackResponse();
    exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
        @Override
        public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
            throws TransactionException {
            doBranchRollback(request, response);
        }
    }, request, response);
    return response;
}

二階段提交

二階段如果是提交的話,因為“業務 SQL”在一階段已經提交至數據庫, 所以 Seata 框架只需將一階段保存的快照數據和行鎖刪掉,完成數據清理即可。

如果所有Branch RM都執行成功了,那么就進行全局Commit。因為此時我們不用回滾,而每個Branch本地數據庫操作已經完成了,那么我們其實主要做的事情就是把本地的Undolog刪了即可,看看Seata內部實現邏輯。

doBranchCommit方法內部會調用ResourceManager的branchCommit方法:

BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);

根據Seata SPI,其實就是執行的DataSourceManager類的branchCommit,如下:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}

DataSourceManager中調用了asyncWorker來異步提交,看下AsyncWorker中branchCommit方法:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
            + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

方法中往ASYNC_COMMIT_BUFFER緩沖List中新增了一個二階段提交的context,那么真正branchCommit在哪里了?

其實是在AsyncWorker的init()方法中,Init中會起一個定時線程,每一秒執行一次,線程中的主要邏輯為:

1)先按resourceId(也就是數據連接)對提交操作進行分組,一個數據庫的可以一起操作,提升效率;
2) 根據resourceId找到對應DataSourceProxy,並獲取一個普通的數據庫連接getPlainConnection(),估計這本身不需要做代理操作,故用了普通的數據庫連接;
3)調用UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn)刪除undolog

二階段回滾

二階段如果是回滾的話,Seata 就需要回滾一階段已經執行的“業務 SQL”,還原業務數據。回滾方式便是用“before image”還原業務數據;但在還原前要首先要校驗臟寫,對比“數據庫當前業務數據”和 “after image”,如果兩份數據完全一致就說明沒有臟寫,可以還原業務數據,如果不一致就說明有臟寫,出現臟寫就需要轉人工處理。

AT 模式的一階段、二階段提交和回滾均由 Seata 框架自動生成,用戶只需編寫“業務 SQL”,便能輕松接入分布式事務,AT 模式是一種對業務無任何侵入的分布式事務解決方案。

如果某個RM Branch執行失敗了,那么就要進行全局回滾。RMHandlerAT收到回滾消息,進行回滾處理,內部會調用DataSourceManager的branchRollback方法:

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException();
    }
    try {
        UndoLogManager.undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

交由UndoLogManager執行undo操作,undo中主要分為以下幾步:

1、首先去數據庫查詢,是否有undolog,sql語句為:

private static String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME WHERE branch_id = ? AND xid = ? FOR UPDATE";

2、數據庫中有如有undolog,那么查出來構造BranchUndoLog對象,一條log對應一個對象;

3、對於每一個BranchUndoLog對象,內部可能包含多個SQLUndoLog,遍歷BranchUndoLog中的SQLUndoLog List,分別進行處理;

for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
    sqlUndoLog.setTableMeta(tableMeta);
    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
            dataSourceProxy.getDbType(),
            sqlUndoLog);
    undoExecutor.executeOn(conn);
}

處理過程就是,根實際操作反着來,比如我們插入一條數據,那么就刪除這條數據;如果修改了一條數據,那么就給他修改回去。相應的undoExecutor分別為:MySQLUndoDeleteExecutor,MySQLUndoInsertExecutor,MySQLUndoUpdateExecutor。因為我們第一階段執行本地操作時,保存了beforeImage和afterImage,所以反着構建SQl時,可以拿到原來的數據。

 

吃水不忘挖井人:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM