所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
上一篇文章中,我們看了看DefaultCoordinator作為分布式事務的協調者,關於全局事務begin的流程。
DefaultCoordinator把begin的核心實現交付給了DefaultCore,DefaultCore將會構造處一個GlobalSession,一份放到內存里面,一份持久化(默認持久化到 ~/sessionStore/root.data)。
總結下來就是begin的時候將會在Server端構造一個GlobalSession。
那么,本文將看看關於全局事務的commit是怎么樣一個流程。
DefaultCore.commit
和begin一樣,DefaultCoordinator把commit全局事務的實現交付給了DefaultCore來做。我們打開DefaultCore的commit方法
@Override public GlobalStatus commit(String xid) throws TransactionException { // 通過XID找到GlobalSession GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } // 添加FileTransactionStoreManager globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // 加鎖 boolean shouldCommit = globalSession.lockAndExcute(() -> { // 關閉session,防止還有分支事務注冊 globalSession.closeAndClean(); if (globalSession.getStatus() == GlobalStatus.Begin) { // 狀態變化 globalSession.changeStatus(GlobalStatus.Committing); return true; } return false; }); // 如果原本不是begin的狀態,返回原本狀態 if (!shouldCommit) { return globalSession.getStatus(); } // 判斷是否可以異步提交 if (globalSession.canBeCommittedAsync()) { // 異步提交 asyncCommit(globalSession); return GlobalStatus.Committed; } else { // 同步提交 doGlobalCommit(globalSession, false); } // 返回狀態 return globalSession.getStatus(); }
commit方法比較長,但是邏輯並不復雜
1)首先,先根據XID找到對應的GlobalSession。
2)其次,將GlobalSession的狀態從begin到committing表示正在執行提交
3)然后,進行提交操作。
findGlobalSession方法將從file或者db中查詢GlobalSession,changeStatus的時候先close了當前全局事務,這樣就不會有新的分支事務注冊進來了。
canBeCommittedAsync將判斷是否可以進行異步提交,判斷標准是什么呢?我們看看該方法
public boolean canBeCommittedAsync() { // 遍歷分支session for (BranchSession branchSession : branchSessions) { // 如果有一個TCC分支返回false if (branchSession.getBranchType() == BranchType.TCC) { return false; } } return true; }
這里判斷的是分支事務的session是TCC類型。
也就是說,如果當前GlobalSession代表的是AT自動事務模式。那么只需要將GlobalSession的狀態改為AsyncCommitting即可,而通知RM端的事情交給DefaultCoordinator的異步線程來做。如果包含TCC分支,那么直接調用doGlobalCommit同步提交。
我們再看看DefaultCoordinator的異步線程怎么處理AT模式的的。
asyncCommitting.scheduleAtFixedRate(() -> { try { handleAsyncCommitting(); } catch (Exception e) { LOGGER.info("Exception async committing ... ", e); } }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); protected void handleAsyncCommitting() { // 獲取所有AsyncCommitting狀態的GlobalSession Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager().allSessions(); if (CollectionUtils.isEmpty(asyncCommittingSessions)) { return; } // 遍歷session for (GlobalSession asyncCommittingSession : asyncCommittingSessions) { try { if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) { continue; } asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // 調用core的doGlobalCommit通知客戶端 core.doGlobalCommit(asyncCommittingSession, true); } catch (TransactionException ex) { // ... } } }
異步線程默認1秒中執行一次,將從SessionManager中獲取所有AsyncCommitting狀態的GlobalSession。
然后逐個調用DefaultCore的doGlobalCommit方法通知客戶端刪除undoLog。
DefaultCore.rollback
上面看了全局事務的commit,那么rollback呢?我們跟進DefaultCore的rollback代碼
@Override public GlobalStatus rollback(String xid) throws TransactionException { // 找到GlobalSession GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // 加鎖 boolean shouldRollBack = globalSession.lockAndExcute(() -> { // 先關閉 globalSession.close(); // 狀態變更 if (globalSession.getStatus() == GlobalStatus.Begin) { globalSession.changeStatus(GlobalStatus.Rollbacking); return true; } return false; }); if (!shouldRollBack) { return globalSession.getStatus(); } // 直接同步處理 doGlobalRollback(globalSession, false); return globalSession.getStatus(); }
rollback比commit邏輯簡單多了,也不用同步異步的區分。直接都是同步遍歷所有分支事務的session,通知RM進行rollback操作。
總結
全局事務的提交,無非是從file或者db中拿到GlobalSession。不論是同步提交還是異步提交,最終都是拿到所有分支事務的session,然后通知RM去做相應的commit操作。
全局事務的回滾,一樣是拿到GlobalSession。然后通知RM做分支事務的rollback操作。
總得來說,Server端主動推送commit或者rollback通知到RM,如果失敗那么就進行retry。seata在一致性方面選擇了最終一致性。