RocksDB事務的隔離性分析【原創】


Rocksdb事務隔離性指的是多線程並發事務使用時候,事務與事務之間的隔離性,通過加鎖機制來實現,本文重點剖析Read Commited隔離級別下,Rocksdb的加鎖機制。

  1. Rocksdb事務相關類族

Rocksdb的事務相關的類圖如下圖所示。主要有兩個類族,Transaction和DB,默認采用PessimisticTransaction,而PessimisticTransaction內部的加鎖機制通過TransactionLockMgr來實現的。

 

TransactionLockMgr內部維護了LockMap。TransactionLockMgr根據每個記錄的Key計算hash值,再對num_stripes取模,在LockMap中的向量Std::vector<LockMapStripe>定位LockMapStripe,這樣減少實體鎖的競爭激烈程度,相當於鎖分解。

 

LockMap的數據成員如下

Size_t num_stripes          LockMapStripe個數,默認16個

Std::vector<LockMapStripe>   LockMapStripe數組

 

LockMapStripe的數據成員如下

std::shared_ptr<TransactionDBMutex>  stripe_mutex :   實體鎖

std::shared_ptr<TransactionDBCondVar>  stripe_cv :     實體條件變量

std::unordered_map<std::string, LockInfo>  keys :       具有相同Key hash值的每條記錄的加鎖信息,std::string為記錄的Key值。

 

LockInfo的數據成員如下

bool exclusive :                     排它鎖,還是共享鎖

uint64_t expiration_time :            鎖的過期時間

autovector<TransactionID>  txn_ids :   這把鎖阻塞的事務ID列表

 

2. Rocksdb事務流程分析

 

上述流程,是應用創建TransactionDB,然后Put一條記錄,再Commit的協作流程圖,在Put階段調用TransactionLockMgr的TryLock方法,Commit階段調用TransactionLockMgr的UnLock方法。

        TransactionLockMgr::TryLock內部的主要邏輯在AcquireLocked函數中,TransactionLockMgr::UnLock內部的主要邏輯在UnlockKey函數中,下面具體分析這兩個函數。

AcquireLocked 上鎖邏輯如下圖所示:

 

3. 總結分析

    1. 應用開啟事務后,修改類操作是寫入到PessimisticTransaction內部的WriteBatch對象,並不是直接寫入實際的存儲中,在commit階段,會調用DBimp的WriteImpl方法把WriteBatch對象寫入實際的存儲中(memtable、sstable文件)。

    2. Rocksdb支持的鎖粒度是記錄級別,粒度還是比較細的,但是記錄鎖並不是實體鎖,而是在內存中維護了每條記錄的鎖狀態。當前事務根據內存中每條記錄的鎖狀態來執行加鎖邏輯。

 

4.  源碼附錄:

 Status TransactionLockMgr::AcquireLocked(LockMap* lock_map,

                                         LockMapStripe* stripe,

                                         const std::string& key,    //記錄的Key值

Env* env,

                                         LockInfo&& txn_lock_info,  //當前事務鎖信息

                                         uint64_t* expire_time,     //鎖的過期時間

                                         autovector<TransactionID>* txn_ids)

 {

  Status result;

  auto stripe_iter = stripe->keys.find(key);  // 檢查這條記錄的Key是否已經被加鎖了。

  if (stripe_iter != stripe->keys.end()) {       // 這條記錄的Key已經被之前事務加過鎖

    LockInfo& lock_info = stripe_iter->second;

    if (lock_info.exclusive || txn_lock_info.exclusive) {   //之前事務或者當前事務加的是排他鎖,

      if (lock_info.txn_ids.size() == 1 &&

          lock_info.txn_ids[0] == txn_lock_info.txn_ids[0]) {  //之前加鎖的事務就是當前事務

        lock_info.exclusive = txn_lock_info.exclusive;

        lock_info.expiration_time = txn_lock_info.expiration_time;

      } else {       //之前加鎖的事務不是當前事務

        if (IsLockExpired(txn_lock_info.txn_ids[0], lock_info, env,

                          expire_time)) {   // 之前事務加的鎖已經過期,可以清除

          lock_info.txn_ids = txn_lock_info.txn_ids;

          lock_info.exclusive = txn_lock_info.exclusive;

          lock_info.expiration_time = txn_lock_info.expiration_time;

        } else { 

          result = Status::TimedOut(Status::SubCode::kLockTimeout);

          *txn_ids = lock_info.txn_ids;   // 返回之前事務列表

        }

      }

    } else {   //當前事務加的是共享鎖

      lock_info.txn_ids.push_back(txn_lock_info.txn_ids[0]);

      lock_info.expiration_time =

          std::max(lock_info.expiration_time, txn_lock_info.expiration_time);

    }

  } else {  // 這條記錄的Key沒有被之前事務加過鎖

    if (max_num_locks_ > 0 &&

        lock_map->lock_cnt.load(std::memory_order_acquire) >= max_num_locks_) {

      result = Status::Busy(Status::SubCode::kLockLimit);

    } else {

      // 當前事務執行加鎖操作

      stripe->keys.emplace(key, std::move(txn_lock_info));

      if (max_num_locks_) {

        lock_map->lock_cnt++;

      }

    }

  }

  return result;

}

 

UnlockKey邏輯相對簡單一些,主要是刪除加鎖的記錄,並且喚醒被阻塞的事務。

void TransactionLockMgr::UnLockKey(const PessimisticTransaction* txn,

                                   const std::string& key,

                                   LockMapStripe* stripe, LockMap* lock_map,

                                   Env* env) {

  TransactionID txn_id = txn->GetID();

  auto stripe_iter = stripe->keys.find(key);

  if (stripe_iter != stripe->keys.end()) {

    auto& txns = stripe_iter->second.txn_ids;

    auto txn_it = std::find(txns.begin(), txns.end(), txn_id);

    // Found the key we locked.  unlock it.

    if (txn_it != txns.end()) {

      if (txns.size() == 1) {

        stripe->keys.erase(stripe_iter);

      } else {

        auto last_it = txns.end() - 1;

        if (txn_it != last_it) {

          *txn_it = *last_it;

        }

        txns.pop_back();

      }

       if (max_num_locks_ > 0) {

        // Maintain lock count if there is a limit on the number of locks.

        assert(lock_map->lock_cnt.load(std::memory_order_relaxed) > 0);

        lock_map->lock_cnt--;

      }

    }

  } else {

    // This key is either not locked or locked by someone else.  This should

    // only happen if the unlocking transaction has expired.

    assert(txn->GetExpirationTime() > 0 &&

           txn->GetExpirationTime() < env->NowMicros());

  }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM