七、seata客戶端二階段分支事務的提交和回滾


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

在閱讀seata自動配置相關的內容的時候,我們說過。客戶端會初始化一個RMClient的RPC客戶端,且同時會添加一個監聽器RmMessageListener,監聽器將監聽來自seata的server發送的RPC消息。我們再回顧一下這段代碼

public static void init(String applicationId, String transactionServiceGroup) {
    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
    rmRpcClient.setResourceManager(DefaultResourceManager.get());
    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
    rmRpcClient.init();
}

RmMessageListener監聽到的消息將交付給TransactionMessageHandler處理

這里的DefaultRMHandler.get()將返回一個DefaultRMHandler的單例對象,DefaultRMHandler簡介實現TransactionMessageHandler,並且組合了三種handler

1)RMHandlerAT

2) RMHandlerTCC

3) RMHandlerSaga

分別對應不同的事務模式,三種處理器和DefaultRMHandler一樣都繼承了AbstractRMHandler。而AbstractRMHandler包含了兩個核心方法,doBranchCommit和doBranchRollback,分別用於二階段分支事務的提交和分支事務的回滾。

 

分支事務提交doBranchCommit

跟進AbstractRMHandler的doBranchCommit方法

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 選擇對應的ResourceManager,調用commit
    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

doBranchCommit方法將會獲取到對應的ResourceManager,我們以AT模式為例。AT模式將會獲取到DataSourceManager這個ResourceManager。

我們跟進DataSourceManager的branchCommit方法

private ResourceManagerInbound asyncWorker;
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}

branchCommit方法中交付給了一個異步線程處理,我們跟進AsyncWorker的branchCommit

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                 String applicationData) throws TransactionException {
    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
        LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
            + "] will be handled by housekeeping later.");
    }
    return BranchStatus.PhaseTwo_Committed;
}

異步線程中使用了一個BlockQueue來排隊處理,將會有一個Scheduler定時從BlockQueue中獲取poll出來,然后進行undoLog的批量刪除

UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);

小結:所謂分支事務的二階段提交,其實就是異步刪除undoLog。因為一階段的時候已經提交了本地事務,所以二階段就非常地快速。

 

分支事務回滾doBranchRollback

和doBranchCommit,先跟進AbstractRMHandler的doBranchRollback方法

protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();
    // 選擇ResourceManager,調用rollback
    BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

可以看到,一樣是調用ResourceManager的方法。我們同樣以DataSourceManager為例,跟進DataSourceManager的branchRollback方法

@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 選擇Resource
    DataSourceProxy dataSourceProxy = get(resourceId);

    try {
        // undo補償
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

在閱讀DataSourceProxy的時候說過,DataSourceProxy將會作為一個Resource注冊到ResourceManager當中。

而在分支事務回滾的時候,將會獲取到該Resource,也就是DataSourceProxy。並且執行對應數據源的undo補償操作。

我們跟進undo方法,看看補償操作,方法較長,這里縮減掉一些內容

@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    // ...
    for (; ; ) {
        try {
            //...

            // 查找undoLog
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                   // ...
                   // 反編碼
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        //...
                        // 執行器處理undo
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                }
            }

            if (exists) {
                // 刪除undoLog
 deleteUndoLog(xid, branchId, conn);
                conn.commit();
            } else {
                //...
            }

            return;
        }
        // ...
    }
}

我們看到,首先是查詢出分支事務的undoLog。然后反序列化出undoLog數據對象,且丟給執行器去執行。執行完畢刪除undoLog,且提交事務。

由此可見,rollback就是把StatementProxy准備地undoLog拿出來,然后進行反向地補償操作。

 

總結

本文我們簡單地閱讀了一下seata客戶端在二階段分支事務的commit和rollback操作做了啥。commit主要就是把undoLog刪除,rollback則是獲取了undoLog然后對數據進行反向生成。

到這里,seata的客戶端代碼閱讀部分就結束了。我們從自動配置 -> 切面 -> 數據源代理 -> 監聽器這么一個流程閱讀下來可以發現AT模式的核心要點就是在於數據源代理,由undoLog做反向補償操作。

后續,將開始server端的代碼閱讀...

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM