所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
在閱讀seata自動配置相關的內容的時候,我們說過。客戶端會初始化一個RMClient的RPC客戶端,且同時會添加一個監聽器RmMessageListener,監聽器將監聽來自seata的server發送的RPC消息。我們再回顧一下這段代碼
public static void init(String applicationId, String transactionServiceGroup) { RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); rmRpcClient.setResourceManager(DefaultResourceManager.get()); rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); rmRpcClient.init(); }
RmMessageListener監聽到的消息將交付給TransactionMessageHandler處理
這里的DefaultRMHandler.get()將返回一個DefaultRMHandler的單例對象,DefaultRMHandler簡介實現TransactionMessageHandler,並且組合了三種handler
1)RMHandlerAT
2) RMHandlerTCC
3) RMHandlerSaga
分別對應不同的事務模式,三種處理器和DefaultRMHandler一樣都繼承了AbstractRMHandler。而AbstractRMHandler包含了兩個核心方法,doBranchCommit和doBranchRollback,分別用於二階段分支事務的提交和分支事務的回滾。
分支事務提交doBranchCommit
跟進AbstractRMHandler的doBranchCommit方法
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); // 選擇對應的ResourceManager,調用commit BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); }
doBranchCommit方法將會獲取到對應的ResourceManager,我們以AT模式為例。AT模式將會獲取到DataSourceManager這個ResourceManager。
我們跟進DataSourceManager的branchCommit方法
private ResourceManagerInbound asyncWorker; @Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData); }
branchCommit方法中交付給了一個異步線程處理,我們跟進AsyncWorker的branchCommit
@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) { LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later."); } return BranchStatus.PhaseTwo_Committed; }
異步線程中使用了一個BlockQueue來排隊處理,將會有一個Scheduler定時從BlockQueue中獲取poll出來,然后進行undoLog的批量刪除
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);
小結:所謂分支事務的二階段提交,其實就是異步刪除undoLog。因為一階段的時候已經提交了本地事務,所以二階段就非常地快速。
分支事務回滾doBranchRollback
和doBranchCommit,先跟進AbstractRMHandler的doBranchRollback方法
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); // 選擇ResourceManager,調用rollback BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); }
可以看到,一樣是調用ResourceManager的方法。我們同樣以DataSourceManager為例,跟進DataSourceManager的branchRollback方法
@Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { // 選擇Resource DataSourceProxy dataSourceProxy = get(resourceId); try { // undo補償 UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
在閱讀DataSourceProxy的時候說過,DataSourceProxy將會作為一個Resource注冊到ResourceManager當中。
而在分支事務回滾的時候,將會獲取到該Resource,也就是DataSourceProxy。並且執行對應數據源的undo補償操作。
我們跟進undo方法,看看補償操作,方法較長,這里縮減掉一些內容
@Override public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { // ... for (; ; ) { try { //... // 查找undoLog selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { // ... // 反編碼 UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { //... // 執行器處理undo AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } } if (exists) { // 刪除undoLog deleteUndoLog(xid, branchId, conn); conn.commit(); } else { //... } return; } // ... } }
我們看到,首先是查詢出分支事務的undoLog。然后反序列化出undoLog數據對象,且丟給執行器去執行。執行完畢刪除undoLog,且提交事務。
由此可見,rollback就是把StatementProxy准備地undoLog拿出來,然后進行反向地補償操作。
總結
本文我們簡單地閱讀了一下seata客戶端在二階段分支事務的commit和rollback操作做了啥。commit主要就是把undoLog刪除,rollback則是獲取了undoLog然后對數據進行反向生成。
到這里,seata的客戶端代碼閱讀部分就結束了。我們從自動配置 -> 切面 -> 數據源代理 -> 監聽器這么一個流程閱讀下來可以發現AT模式的核心要點就是在於數據源代理,由undoLog做反向補償操作。
后續,將開始server端的代碼閱讀...