五、connectionProxy分支事務注冊


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

上一篇關於DataSourceProxy的文章中,我們看到了一個DataSourceProxy在構造的時候將作為Resource注冊到ResourceManager並通過RPC注冊到Server端。

而我們知道Connection表示的是與DataSource的通信連接,seata對Connection也進行了代理。DataSourceProxy的getConnection方法中將獲取到該代理類。我們看看DataSourceProxy的getConnection方法

@Override
public ConnectionProxy getConnection() throws SQLException {
    Connection targetConnection = targetDataSource.getConnection();
    // 實例化一個代理類
    return new ConnectionProxy(this, targetConnection);
}

可以看到,從DataSourceProxy中get出來的Connection並不是原始對象,而是經過ConnectionProxy代理包裝以后的對象。

ConnectionProxy不僅包含了Connection,也包含了當前的DataSourceProxy對象,從而間接包含DataSource。

 

ConnectionProxy的類圖

了解ConnectionProxy的方法之前,我們先來看看它的UML類圖結構

類圖比較簡單,AbstractConnectionProxy直接實現Connection接口,且包含了DataSourceProxy和原始Connection的成員變量

public abstract class AbstractConnectionProxy implements Connection {
    /**
     * 數據源代理對象
     */
    protected DataSourceProxy dataSourceProxy;
    /**
     * 原始Connection對象
     */
    protected Connection targetConnection;

    // 省略
}   

ConnectionProxy直接繼承於AbstractConnectionProxy,主要擴展了commit方法和rollback方法。

 

commit注冊分支事務

我們跟進ConnectionProxy的commit方法

@Override
public void commit() throws SQLException {
    try {
        // 重試
        LOCK_RETRY_POLICY.execute(() -> {
            // 提交
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

commit方法加上了一個重試執行,跟進doCommit

private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}

這里區分了三種情況,如果沒有全局事務,沒有全局鎖,那么就直接調用原始的commit,提交本地事務即可。

我們這里以全局事務為例,進入processGlobalTransactionCommit看看

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 注冊分支事務
 register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }

    try {
        // 如果由undoLog
        if (context.hasUndoLog()) {
            // 插入到undo_log表中
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        }
        // 提交本地事務
 targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        // 廣播本地事務提交異常
        report(false);
        throw new SQLException(ex);
    }
    // 如果分支事務正常注冊、且本地事務提交,同時開啟了通知
    if (IS_REPORT_SUCCESS_ENABLE) {
        // 廣播分支事務提交正常
        report(true);
    }
    // 清除當前ConnectionProxy中的xid、branchId、undoLog buffer之類的,本次提交結束
 context.reset();
}

processGlobalTransactionCommit方法首先將RPC到Server端注冊分支事務,Server端返回一個branchId到當前ConnectionContext當中。

如果注冊分支事務失敗?直接向上拋出異常,由發起全局事務的地方捕獲,並進行全局事務的回滾。

如果分支事務順利注冊成功,那么就將undo_log插入到本地的undo_log表中,且提交本地事務。

如果本地事務提交失敗,意味着一階段執行失敗,那么就通過report方法通知Server端。

如果成功呢?若開啟通知的話,也通過report方法通知一階段成功。

最后清除當前Connection的上下文事務相關數據,本次結束。

 

最后,再看看report方法吧

private void report(boolean commitDone) throws SQLException {
    int retry = REPORT_RETRY_COUNT;
    while (retry > 0) {
        try {
            DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
            return;
        } catch (Throwable ex) {
            LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                + commitDone + "] Retry Countdown: " + retry);
            retry--;

            if (retry == 0) {
                throw new SQLException("Failed to report branch status " + commitDone, ex);
            }
        }
    }
}

report中通過ResourceManager把分支事務的執行情況上報,如果本地事務提交,那么就報一階段完成,如果沒有提交,那么就報一階段提交失敗。

 

小結:到這里我們回顧一下ConnectionProxy的commit方法干了啥?其實就是注冊branch到global Transaction當中,作為其中一個分支存在。

且本地事務將會直接提交,而不用等待全局事務提交才提交,這樣對於資源來說減少了鎖定時間。

 

rollback通知

前面我們說commit注冊分支事務,並提交本地事務。那么如果執行失敗呢?將會調用rollback方法,我們再看看rollback做了啥

@Override
public void rollback() throws SQLException {
    // 回滾本地事務
    targetConnection.rollback();
    // 如果在全局事務當中
    if (context.inGlobalTransaction()) {
        // 如果分支事務已經注冊
        if (context.isBranchRegistered()) {
            // 發送一階段失敗通知
            report(false);
        }
    }
    // 清理上下文
    context.reset();
}

rollback中,先是回滾了本地事務。然后判斷是否在全局事務當中,且當前分支事務已經提交,如果是的話發送一階段的回滾到Server端,最后清理上下文。

 

總結

我們看到commit和rollback的實現邏輯里,本地的事務都會預先commit或者rollback,不會等待全局事務一起commit或者rollback。

分支事務如果已經注冊成功,那么將會通過ResourceManager來report給Server端一階段的成功或者失敗的結果。那如果分支事務注冊不成功呢?將會向上拋出異常,由全局事務的發起者捕獲到,並發起全局事務回滾

總得來說,ConnectionProxy里面的commit和rollback盡量在程序正常的情況下進行分支事務的處理,而要保證最終一致性主要還是得由全局事務在Server端的二階段處理邏輯為主。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM