所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
seata的at模式主要實現邏輯是數據源代理,而數據源代理將基於如MySQL和Oracle等關系事務型數據庫實現,基於數據庫的隔離級別為read committed。換而言之,本地事務的支持是seata實現at模式的必要條件,這也將限制seata的at模式的使用場景。
官方文檔給出了非常好的圖來說明at模式下,全局鎖與隔離相關的邏輯:https://seata.io/zh-cn/docs/dev/mode/at-mode.html
寫隔離
首先,我們理解一下寫隔離的流程
分支事務1-開始 | V 獲取 本地鎖 | V 獲取 全局鎖 分支事務2-開始 | | V 釋放 本地鎖 V 獲取 本地鎖 | | V 釋放 全局鎖 V 獲取 全局鎖 | V 釋放 本地鎖 | V 釋放 全局鎖
如上所示,一個分布式事務的鎖獲取流程是這樣的
1)先獲取到本地鎖,這樣你已經可以修改本地數據了,只是還不能本地事務提交
2)而后,能否提交就是看能否獲得全局鎖
3)獲得了全局鎖,意味着可以修改了,那么提交本地事務,釋放本地鎖
4)當分布式事務提交,釋放全局鎖。這樣就可以讓其它事務獲取全局鎖,並提交它們對本地數據的修改了。
可以看到,這里有兩個關鍵點
1)本地鎖獲取之前,不會去爭搶全局鎖
2)全局鎖獲取之前,不會提交本地鎖
這就意味着,數據的修改將被互斥開來。也就不會造成寫入臟數據。全局鎖可以讓分布式修改中的寫數據隔離。
beforeImage校驗全局鎖
在將StatementProxy的時候我們提到過,在執行業務sql之前。會生成一個前置數據鏡像,也就是beforeImage方法。
那么,beforeImage方法中將會生成一個 select [字段] from [表] where [條件] for update 這樣的sql來查詢鏡像數據。seata的數據源代理將會對select for update這樣的語句進行代理,代理中將會檢驗一下全局鎖是否沖突,如下所示
while (true) { try { // 執行sql rs = statementCallback.execute(statementProxy.getTargetStatement(), args); // 構建數據行 TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList); // 構建鎖KEY String lockKeys = buildLockKey(selectPKRows); if (StringUtils.isNullOrEmpty(lockKeys)) { break; } if (RootContext.inGlobalTransaction()) { // 校驗全局鎖 statementProxy.getConnectionProxy().checkLock(lockKeys); } else if (RootContext.requireGlobalLock()) { statementProxy.getConnectionProxy().appendLockKey(lockKeys); } else { throw new RuntimeException("Unknown situation!"); } break; } catch (LockConflictException lce) { if (sp != null) { conn.rollback(sp); } else { conn.rollback(); } // 鎖沖突,重試 lockRetryController.sleep(lce); } }
如代碼所示,select for update會做一次全局鎖校驗(checkLock會去調用Server端)。如果出現鎖沖突,那么不斷進行重試。
這樣依賴,select for update所在的本地事務只要等待全局鎖釋放,由於已經占了本地鎖,所以可以順利獲取全局鎖。而后,進行入update等業務操作,然后提交順利提交本地事務,
seata也表明,默認的事務隔離級別是read uncommitted。那么要實現read committed的話,就可以使用select for update來實現,邏輯和這里是一樣的,其實就是通過占用本地鎖,然后重試等待全局鎖來達到讀寫隔離的目的。
分支事務register占用鎖
在看分支事務register的時候,我們只是簡單地掃了掃BranchSession的創建,然后添加到GlobalSession中。
這其中忽略了一個要點,就是在branch的register過程,會進行全局鎖的獲取操作。客戶端會講tablename和數據行的primary key給構造成lock key傳輸到Server端。而Server端將會根據這個lock key來判斷是否能夠占用全局鎖
我們看看seata關於database方式的實現,跟進LockStoreDataBaseDao的acquireLock(List<LockDO> lockDOs)方法
方法很長,刪減以后邏輯其實很簡單。就是構造一個checkLock的sql,查查看是否已經有相關的數據。如果沒有則進行doAcquireLock占用操作,占用操作也很簡單就是進行數據插入。
前面checkLock提到的調用Server端的校驗,其實也就是構造並執行一下checkLock看看有沒數據而已
@Override public boolean acquireLock(List<LockDO> lockDOs) { // ... try { // ... // 獲取checkLock的sql語句 String checkLockSQL = LockStoreSqls.getCheckLockableSql(lockTable, sj.toString(), dbType); ps = conn.prepareStatement(checkLockSQL); // ... // 查詢是否有占用的數據 rs = ps.executeQuery(); String currentXID = lockDOs.get(0).getXid(); while (rs.next()) { String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID); if (!StringUtils.equals(dbXID, currentXID)) { canLock &= false; break; } // ... } if (!canLock) { conn.rollback(); return false; } // ... if (unrepeatedLockDOs.size() == 1) { LockDO lockDO = unrepeatedLockDOs.get(0); // 進行占用操作 if (!doAcquireLock(conn, lockDO)) { // ... } } else { // 進行占用操作 if (!doAcquireLocks(conn, unrepeatedLockDOs)) { // ... } } conn.commit(); return true; } catch (SQLException e) { // ... } finally { // ... } }
那么checkLockSql和doAcquireLock分開兩個步驟是否會有並發問題呢?
理論上不會有的,正如我們前面一直提到的,要先獲取本地鎖,再來查詢獲取全局鎖。所以,當本地鎖還沒有獲取的時候,不會去獲取全局鎖。也就不需要考慮並發問題
如果占用全局鎖失敗怎么辦呢?客戶端會進行鎖沖突的判斷,然后進行重試操作。
分支事務釋放全局鎖
而分支事務在從GlobalSession中remove的時候會去unlock全局鎖,如下GlobalSession中的代碼
@Override public void removeBranch(BranchSession branchSession) throws TransactionException { for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onRemoveBranch(this, branchSession); } branchSession.unlock(); remove(branchSession); }
從unlock一路跟進LockStoreDataBaseDao的unlock(String xid, Long branchId)會看看
@Override public boolean unLock(String xid, Long branchId) { Connection conn = null; PreparedStatement ps = null; try { conn = logStoreDataSource.getConnection(); conn.setAutoCommit(true); // 批量刪除的sql構造並執行 String batchDeleteSQL = LockStoreSqls.getBatchDeleteLockSqlByBranch(lockTable, dbType); ps = conn.prepareStatement(batchDeleteSQL); ps.setString(1, xid); ps.setLong(2, branchId); ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } return true; }
其實就是去刪除之前doAcquireLock方法insert進去的數據,就算解鎖了
@GlobalLock
有的方法它可能並不需要@GlobalTransactional的事務管理,但是我們又希望它對數據的修改能夠加入到seata機制當中。那么這時候就需要@GlobalLock了。
加上了@GlobalLock,在事務提交的時候就回去checkLock校驗一下全局鎖。
private void processLocalCommitWithGlobalLocks() throws SQLException { // 全局鎖校驗 checkLock(context.buildLockKeys()); try { // 提交本地事務 targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); } context.reset(); }
可以看到,在本地事務提交之前會調用checkLock校驗全局鎖,和之前在事務中的寫隔離一樣的邏輯。也一樣的,如果出現鎖沖突的話進行重試操作
總結
本文簡單看了幾個全局鎖的場景,可以感覺到只要遵循本地鎖、全局鎖的獲取和釋放的邏輯順序,將數據讀寫的操作納入seata的管理里面就可以基本做到維持數據一致性。