所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在上一篇關於DataSourceProxy的文章中,我們看到了一個DataSourceProxy在構造的時候將作為Resource注冊到ResourceManager並通過RPC注冊到Server端。
而我們知道Connection表示的是與DataSource的通信連接,seata對Connection也進行了代理。DataSourceProxy的getConnection方法中將獲取到該代理類。我們看看DataSourceProxy的getConnection方法
@Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); // 實例化一個代理類 return new ConnectionProxy(this, targetConnection); }
可以看到,從DataSourceProxy中get出來的Connection並不是原始對象,而是經過ConnectionProxy代理包裝以后的對象。
ConnectionProxy不僅包含了Connection,也包含了當前的DataSourceProxy對象,從而間接包含DataSource。
ConnectionProxy的類圖
了解ConnectionProxy的方法之前,我們先來看看它的UML類圖結構
類圖比較簡單,AbstractConnectionProxy直接實現Connection接口,且包含了DataSourceProxy和原始Connection的成員變量
public abstract class AbstractConnectionProxy implements Connection { /** * 數據源代理對象 */ protected DataSourceProxy dataSourceProxy; /** * 原始Connection對象 */ protected Connection targetConnection; // 省略 }
ConnectionProxy直接繼承於AbstractConnectionProxy,主要擴展了commit方法和rollback方法。
commit注冊分支事務
我們跟進ConnectionProxy的commit方法
@Override public void commit() throws SQLException { try { // 重試 LOCK_RETRY_POLICY.execute(() -> { // 提交 doCommit(); return null; }); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException(e); } }
commit方法加上了一個重試執行,跟進doCommit
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
這里區分了三種情況,如果沒有全局事務,沒有全局鎖,那么就直接調用原始的commit,提交本地事務即可。
我們這里以全局事務為例,進入processGlobalTransactionCommit看看
private void processGlobalTransactionCommit() throws SQLException { try { // 注冊分支事務 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { // 如果由undoLog if (context.hasUndoLog()) { // 插入到undo_log表中 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); } // 提交本地事務 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); // 廣播本地事務提交異常 report(false); throw new SQLException(ex); } // 如果分支事務正常注冊、且本地事務提交,同時開啟了通知 if (IS_REPORT_SUCCESS_ENABLE) { // 廣播分支事務提交正常 report(true); } // 清除當前ConnectionProxy中的xid、branchId、undoLog buffer之類的,本次提交結束 context.reset(); }
processGlobalTransactionCommit方法首先將RPC到Server端注冊分支事務,Server端返回一個branchId到當前ConnectionContext當中。
如果注冊分支事務失敗?直接向上拋出異常,由發起全局事務的地方捕獲,並進行全局事務的回滾。
如果分支事務順利注冊成功,那么就將undo_log插入到本地的undo_log表中,且提交本地事務。
如果本地事務提交失敗,意味着一階段執行失敗,那么就通過report方法通知Server端。
如果成功呢?若開啟通知的話,也通過report方法通知一階段成功。
最后清除當前Connection的上下文事務相關數據,本次結束。
最后,再看看report方法吧
private void report(boolean commitDone) throws SQLException { int retry = REPORT_RETRY_COUNT; while (retry > 0) { try { DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); return; } catch (Throwable ex) { LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done [" + commitDone + "] Retry Countdown: " + retry); retry--; if (retry == 0) { throw new SQLException("Failed to report branch status " + commitDone, ex); } } } }
report中通過ResourceManager把分支事務的執行情況上報,如果本地事務提交,那么就報一階段完成,如果沒有提交,那么就報一階段提交失敗。
小結:到這里我們回顧一下ConnectionProxy的commit方法干了啥?其實就是注冊branch到global Transaction當中,作為其中一個分支存在。
且本地事務將會直接提交,而不用等待全局事務提交才提交,這樣對於資源來說減少了鎖定時間。
rollback通知
前面我們說commit注冊分支事務,並提交本地事務。那么如果執行失敗呢?將會調用rollback方法,我們再看看rollback做了啥
@Override public void rollback() throws SQLException { // 回滾本地事務 targetConnection.rollback(); // 如果在全局事務當中 if (context.inGlobalTransaction()) { // 如果分支事務已經注冊 if (context.isBranchRegistered()) { // 發送一階段失敗通知 report(false); } } // 清理上下文 context.reset(); }
rollback中,先是回滾了本地事務。然后判斷是否在全局事務當中,且當前分支事務已經提交,如果是的話發送一階段的回滾到Server端,最后清理上下文。
總結
我們看到commit和rollback的實現邏輯里,本地的事務都會預先commit或者rollback,不會等待全局事務一起commit或者rollback。
分支事務如果已經注冊成功,那么將會通過ResourceManager來report給Server端一階段的成功或者失敗的結果。那如果分支事務注冊不成功呢?將會向上拋出異常,由全局事務的發起者捕獲到,並發起全局事務回滾。
總得來說,ConnectionProxy里面的commit和rollback盡量在程序正常的情況下進行分支事務的處理,而要保證最終一致性主要還是得由全局事務在Server端的二階段處理邏輯為主。