四、全局事務的commit和rollback


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

上一篇文章中,我們看了看DefaultCoordinator作為分布式事務的協調者,關於全局事務begin的流程。

DefaultCoordinator把begin的核心實現交付給了DefaultCore,DefaultCore將會構造處一個GlobalSession,一份放到內存里面,一份持久化(默認持久化到 ~/sessionStore/root.data)。

總結下來就是begin的時候將會在Server端構造一個GlobalSession。

那么,本文將看看關於全局事務的commit是怎么樣一個流程。

 

DefaultCore.commit

和begin一樣,DefaultCoordinator把commit全局事務的實現交付給了DefaultCore來做。我們打開DefaultCore的commit方法

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 通過XID找到GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }

    // 添加FileTransactionStoreManager
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    // 加鎖
    boolean shouldCommit = globalSession.lockAndExcute(() -> {
        // 關閉session,防止還有分支事務注冊
        globalSession.closeAndClean();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 狀態變化
            globalSession.changeStatus(GlobalStatus.Committing);
            return true;
        }
        return false;
    });

    // 如果原本不是begin的狀態,返回原本狀態
    if (!shouldCommit) {
        return globalSession.getStatus();
    }

    // 判斷是否可以異步提交
    if (globalSession.canBeCommittedAsync()) {
        // 異步提交
 asyncCommit(globalSession);
        return GlobalStatus.Committed;
    } else {
        // 同步提交
        doGlobalCommit(globalSession, false);
    }
    // 返回狀態
    return globalSession.getStatus();
}

commit方法比較長,但是邏輯並不復雜

1)首先,先根據XID找到對應的GlobalSession。

2)其次,將GlobalSession的狀態從begin到committing表示正在執行提交

3)然后,進行提交操作。

findGlobalSession方法將從file或者db中查詢GlobalSession,changeStatus的時候先close了當前全局事務,這樣就不會有新的分支事務注冊進來了。

canBeCommittedAsync將判斷是否可以進行異步提交,判斷標准是什么呢?我們看看該方法

public boolean canBeCommittedAsync() {
    // 遍歷分支session
    for (BranchSession branchSession : branchSessions) {
        // 如果有一個TCC分支返回false
        if (branchSession.getBranchType() == BranchType.TCC) {
            return false;
        }
    }
    return true;
}

這里判斷的是分支事務的session是TCC類型。

也就是說,如果當前GlobalSession代表的是AT自動事務模式。那么只需要將GlobalSession的狀態改為AsyncCommitting即可,而通知RM端的事情交給DefaultCoordinator的異步線程來做。如果包含TCC分支,那么直接調用doGlobalCommit同步提交。

我們再看看DefaultCoordinator的異步線程怎么處理AT模式的的。

asyncCommitting.scheduleAtFixedRate(() -> {
    try {
        handleAsyncCommitting();
    } catch (Exception e) {
        LOGGER.info("Exception async committing ... ", e);
    }
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

protected void handleAsyncCommitting() {
    // 獲取所有AsyncCommitting狀態的GlobalSession
    Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager().allSessions();
    if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
        return;
    }
    // 遍歷session
    for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
        try {
            if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) {
                continue;
            }
            asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            // 調用core的doGlobalCommit通知客戶端
            core.doGlobalCommit(asyncCommittingSession, true);
        } catch (TransactionException ex) {
            // ...
        }
    }
}

異步線程默認1秒中執行一次,將從SessionManager中獲取所有AsyncCommitting狀態的GlobalSession。

然后逐個調用DefaultCore的doGlobalCommit方法通知客戶端刪除undoLog。

 

DefaultCore.rollback

上面看了全局事務的commit,那么rollback呢?我們跟進DefaultCore的rollback代碼

@Override
public GlobalStatus rollback(String xid) throws TransactionException {
    // 找到GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // 加鎖
    boolean shouldRollBack = globalSession.lockAndExcute(() -> {
        // 先關閉
        globalSession.close();
        // 狀態變更
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            globalSession.changeStatus(GlobalStatus.Rollbacking);
            return true;
        }
        return false;
    });
    if (!shouldRollBack) {
        return globalSession.getStatus();
    }
    // 直接同步處理
    doGlobalRollback(globalSession, false);
    return globalSession.getStatus();
}

rollback比commit邏輯簡單多了,也不用同步異步的區分。直接都是同步遍歷所有分支事務的session,通知RM進行rollback操作。

 

總結

全局事務的提交,無非是從file或者db中拿到GlobalSession。不論是同步提交還是異步提交,最終都是拿到所有分支事務的session,然后通知RM去做相應的commit操作。

全局事務的回滾,一樣是拿到GlobalSession。然后通知RM做分支事務的rollback操作。

總得來說,Server端主動推送commit或者rollback通知到RM,如果失敗那么就進行retry。seata在一致性方面選擇了最終一致性。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM