一、seata全局鎖


所有文章

https://www.cnblogs.com/lay2017/p/12485081.html

 

正文

seata的at模式主要實現邏輯是數據源代理,而數據源代理將基於如MySQL和Oracle等關系事務型數據庫實現,基於數據庫的隔離級別為read committed。換而言之,本地事務的支持是seata實現at模式的必要條件,這也將限制seata的at模式的使用場景。

官方文檔給出了非常好的圖來說明at模式下,全局鎖與隔離相關的邏輯:https://seata.io/zh-cn/docs/dev/mode/at-mode.html

寫隔離

首先,我們理解一下寫隔離的流程

分支事務1-開始
| 
V 獲取 本地鎖
| 
V 獲取 全局鎖    分支事務2-開始
|               |
V 釋放 本地鎖     V 獲取 本地鎖
|               |
V 釋放 全局鎖     V 獲取 全局鎖
                |
                V 釋放 本地鎖
                |
                V 釋放 全局鎖

如上所示,一個分布式事務的鎖獲取流程是這樣的

1)先獲取到本地鎖,這樣你已經可以修改本地數據了,只是還不能本地事務提交

2)而后,能否提交就是看能否獲得全局鎖

3)獲得了全局鎖,意味着可以修改了,那么提交本地事務,釋放本地鎖

4)當分布式事務提交,釋放全局鎖。這樣就可以讓其它事務獲取全局鎖,並提交它們對本地數據的修改了。

 

可以看到,這里有兩個關鍵點

1)本地鎖獲取之前,不會去爭搶全局鎖

2)全局鎖獲取之前,不會提交本地鎖

這就意味着,數據的修改將被互斥開來。也就不會造成寫入臟數據。全局鎖可以讓分布式修改中的寫數據隔離。

 

beforeImage校驗全局鎖

在將StatementProxy的時候我們提到過,在執行業務sql之前。會生成一個前置數據鏡像,也就是beforeImage方法。

那么,beforeImage方法中將會生成一個 select [字段] from [表] where [條件] for update 這樣的sql來查詢鏡像數據。seata的數據源代理將會對select for update這樣的語句進行代理,代理中將會檢驗一下全局鎖是否沖突,如下所示

while (true) {
    try {
        // 執行sql
        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

        // 構建數據行
        TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
        // 構建鎖KEY
        String lockKeys = buildLockKey(selectPKRows);
        if (StringUtils.isNullOrEmpty(lockKeys)) {
            break;
        }

        if (RootContext.inGlobalTransaction()) {
            // 校驗全局鎖
            statementProxy.getConnectionProxy().checkLock(lockKeys);
        } else if (RootContext.requireGlobalLock()) {
            statementProxy.getConnectionProxy().appendLockKey(lockKeys);
        } else {
            throw new RuntimeException("Unknown situation!");
        }
        break;
    } catch (LockConflictException lce) {
        if (sp != null) {
            conn.rollback(sp);
        } else {
            conn.rollback();
        }
        // 鎖沖突,重試
        lockRetryController.sleep(lce);
    }
}

如代碼所示,select for update會做一次全局鎖校驗(checkLock會去調用Server端)。如果出現鎖沖突,那么不斷進行重試。

這樣依賴,select for update所在的本地事務只要等待全局鎖釋放,由於已經占了本地鎖,所以可以順利獲取全局鎖。而后,進行入update等業務操作,然后提交順利提交本地事務,

seata也表明,默認的事務隔離級別是read uncommitted。那么要實現read committed的話,就可以使用select for update來實現,邏輯和這里是一樣的,其實就是通過占用本地鎖,然后重試等待全局鎖來達到讀寫隔離的目的。

 

分支事務register占用鎖

在看分支事務register的時候,我們只是簡單地掃了掃BranchSession的創建,然后添加到GlobalSession中。

這其中忽略了一個要點,就是在branch的register過程,會進行全局鎖的獲取操作。客戶端會講tablename和數據行的primary key給構造成lock key傳輸到Server端。而Server端將會根據這個lock key來判斷是否能夠占用全局鎖

我們看看seata關於database方式的實現,跟進LockStoreDataBaseDao的acquireLock(List<LockDO> lockDOs)方法

方法很長,刪減以后邏輯其實很簡單。就是構造一個checkLock的sql,查查看是否已經有相關的數據。如果沒有則進行doAcquireLock占用操作,占用操作也很簡單就是進行數據插入。

前面checkLock提到的調用Server端的校驗,其實也就是構造並執行一下checkLock看看有沒數據而已

@Override
public boolean acquireLock(List<LockDO> lockDOs) {
    // ...
    try {
        // ...

        // 獲取checkLock的sql語句
        String checkLockSQL = LockStoreSqls.getCheckLockableSql(lockTable, sj.toString(), dbType);
        ps = conn.prepareStatement(checkLockSQL);
        // ...
        // 查詢是否有占用的數據
        rs = ps.executeQuery();
        String currentXID = lockDOs.get(0).getXid();
        while (rs.next()) {
            String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
            if (!StringUtils.equals(dbXID, currentXID)) {
                canLock &= false;
                break;
            }
            // ...
        }

        if (!canLock) {
            conn.rollback();
            return false;
        }

        // ...

        if (unrepeatedLockDOs.size() == 1) {
            LockDO lockDO = unrepeatedLockDOs.get(0);
            // 進行占用操作
            if (!doAcquireLock(conn, lockDO)) {
                // ...
            }
        } else {
            // 進行占用操作
            if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
                // ...
            }
        }
        conn.commit();
        return true;
    } catch (SQLException e) {
        // ...
    } finally {
        // ...
    }
}

那么checkLockSql和doAcquireLock分開兩個步驟是否會有並發問題呢?

理論上不會有的,正如我們前面一直提到的,要先獲取本地鎖,再來查詢獲取全局鎖。所以,當本地鎖還沒有獲取的時候,不會去獲取全局鎖。也就不需要考慮並發問題

如果占用全局鎖失敗怎么辦呢?客戶端會進行鎖沖突的判斷,然后進行重試操作。

 

分支事務釋放全局鎖

而分支事務在從GlobalSession中remove的時候會去unlock全局鎖,如下GlobalSession中的代碼

@Override
public void removeBranch(BranchSession branchSession) throws TransactionException {
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onRemoveBranch(this, branchSession);
    }
    branchSession.unlock();
    remove(branchSession);
}

從unlock一路跟進LockStoreDataBaseDao的unlock(String xid, Long branchId)會看看

@Override
public boolean unLock(String xid, Long branchId) {
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        // 批量刪除的sql構造並執行
        String batchDeleteSQL = LockStoreSqls.getBatchDeleteLockSqlByBranch(lockTable, dbType);
        ps = conn.prepareStatement(batchDeleteSQL);
        ps.setString(1, xid);
        ps.setLong(2, branchId);
        ps.executeUpdate();
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
    return true;
}

其實就是去刪除之前doAcquireLock方法insert進去的數據,就算解鎖了

 

@GlobalLock

有的方法它可能並不需要@GlobalTransactional的事務管理,但是我們又希望它對數據的修改能夠加入到seata機制當中。那么這時候就需要@GlobalLock了。

加上了@GlobalLock,在事務提交的時候就回去checkLock校驗一下全局鎖。

private void processLocalCommitWithGlobalLocks() throws SQLException {
    // 全局鎖校驗
 checkLock(context.buildLockKeys());
    try {
        // 提交本地事務
        targetConnection.commit();
    } catch (Throwable ex) {
        throw new SQLException(ex);
    }
    context.reset();
}

可以看到,在本地事務提交之前會調用checkLock校驗全局鎖,和之前在事務中的寫隔離一樣的邏輯。也一樣的,如果出現鎖沖突的話進行重試操作

 

總結

本文簡單看了幾個全局鎖的場景,可以感覺到只要遵循本地鎖、全局鎖的獲取和釋放的邏輯順序,將數據讀寫的操作納入seata的管理里面就可以基本做到維持數據一致性。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM