[已完結]CMU數據庫(15-445)實驗2-B+樹索引實現(下)


[已完結]CMU數據庫(15-445)實驗2-B+樹索引實現(下)

4. Index_Iterator實現#

這里就是需要實現迭代器的一些操作,比如begin、end、isend等等

下面是對於IndexIterator的構造函數

其中idx表示當前page中的第幾個tuple

INDEXITERATOR_TYPE::IndexIterator(LeafPage *leftmost_leaf, int idx, BufferPoolManager *buffer_pool_manager)
    : curr_page(leftmost_leaf), curr_index(idx), bpm(buffer_pool_manager) {}

1. 首先我們來看begin函數的實現#

  1. 利用key值找到葉子結點
  2. 然后獲取當前key值的index就是begin的位置
Page *page = FindLeafPage(KeyType{}, true);  // leftmost_leaf pinned
  LeafPage *leftmost_leaf = reinterpret_cast<LeafPage *>(page->GetData());
  buffer_pool_manager_->UnpinPage(leftmost_leaf->GetPageId(), false);
  return INDEXITERATOR_TYPE(leftmost_leaf, 0, buffer_pool_manager_);

2. end函數的實現#

  1. 找到最開始的結點
  2. 然后一直向后遍歷直到nextPageId=-1結束
  3. 這里注意需要重載!===

end函數

// find the right most page
  Page *page = FindLeafPage(KeyType{}, true);  // page pinned
  LeafPage *leaf = reinterpret_cast<LeafPage *>(page->GetData());

  while (leaf->GetNextPageId() != INVALID_PAGE_ID) {
    page_id_t next_page_id = leaf->GetNextPageId();
    buffer_pool_manager_->UnpinPage(leaf->GetPageId(), false);  // page unpinned

    Page *next_page = buffer_pool_manager_->FetchPage(next_page_id);  // next_page pinned
    leaf = reinterpret_cast<LeafPage *>(next_page->GetData());
  }

  return INDEXITERATOR_TYPE(leaf, leaf->GetSize(), buffer_pool_manager_);

==和 !=函數

這里注意在!= 那里不能寫成itr != *this

INDEX_TEMPLATE_ARGUMENTS
bool INDEXITERATOR_TYPE::operator==(const IndexIterator &itr) const {
  return itr.curr_page == curr_page && itr.curr_index == curr_index;
}

INDEX_TEMPLATE_ARGUMENTS
bool INDEXITERATOR_TYPE::operator!=(const IndexIterator &itr) const { return !(itr == *this); }

3. 重載++和*(解引用符號)#

  1. 重載++

簡單的index++然后設置nextPageId即可

INDEX_TEMPLATE_ARGUMENTS
INDEXITERATOR_TYPE &INDEXITERATOR_TYPE::operator++() {
  curr_index++;
  if (curr_index == curr_page->GetSize() && curr_page->GetNextPageId() != INVALID_PAGE_ID) {
    page_id_t next_pid = curr_page->GetNextPageId();
    Page *next_page = bpm->FetchPage(next_pid);  // pined page

    LeafPage *next_node = reinterpret_cast<LeafPage *>(next_page->GetData());
    curr_page = next_node;
    bpm->UnpinPage(next_page->GetPageId(), false);
    curr_index = 0;
  }
  return *this;
}
  1. 重載*

return array[index]即可

const MappingType &INDEXITERATOR_TYPE::operator*() { return curr_page->GetItem(curr_index); }

5. 並發機制的實現#

0. 首先復習一下讀寫🔒機制#

  1. 讀操作是可以多個進程之間共享latch的而寫操作則必須互斥
  2. 加入MaxReader數就是為了防止等待的⌛️寫進程飢餓

首先來看如果沒有🔒機制多線程會發生什么問題

  1. 線程T1想要刪除44。
  2. 線程T2 想要查找41

image-20210126184533688

  1. 假設T2在執行到D位置的時候又切換到線程T1
  2. 這個時候T1進行重新分配,會把41借到I結點上
  3. T1執行完成切換回T2這時候T2再去原來的執行尋找41就會找不到

image-20210126184727498

就會出現下面的情況。❓

image-20210126184901306

由此我們需要讀寫🔒的存在

  1. 對於find操作

由於我們是只讀操作,所以我們到下一個結點的時候就可以釋放上一個結點的Latch

image-20210126185917549

剩下的操作都是一樣的

對於delete則不一樣

因為我們需要寫操作

這里我們不能釋放結點A的Latch。因為我們的刪除操作可能會合並根節點。

image-20210126190112632

到D的時候。我們會發現D中的38刪除之后不需要進行合並,所以對於A和B的寫Write是可以安全釋放了

image-20210126190229333

對於Insert操作

這里我們就可以安全的釋放掉A的鎖。因為B中還有空位,我們插入是不會對A造成影響的

image-20210126190452937

當我們執行到D這里發現D中已經滿了。所以此時我們不會釋放B的鎖,因為我們會對B進行寫操作

image-20210126190613339

上面的算法雖然是正確的但是有瓶頸問題。由於只有一個線程可以獲得寫Latch。而插入和刪除的時候都需要對頭結點加寫Latch。所以多線程在有許多個插入或者刪除操作的時候,性能就會大打折扣
img

這里要引入樂觀🔒

樂觀的假設大部分操作是不需要進行合並和分裂的。因此在我們向下的時候都是讀Latch而不是寫Latch。只有在葉子結點才是write Latch

  1. 從上到下都是讀Latch。而且逐步釋放
  2. 到葉子結點需要修改的時候才為寫Latch。這個刪除是安全的所以直接結束

image-20210126192408392

  1. 當我們到最后一步發現不安全的時候。則需要像上面我們沒有引入樂觀🔒的時候一樣。重新執行一遍

image-20210126192548748

延遲更新父結點

這里用一個🌟來標記這里需要被更新但是還沒有執行

image-20210126195848104

這個時候我們執行其他操作也是正確的比如查找31

image-20210126200003320

這里我們執行insert 33

當執行到結點C的時候。因為這個時候有另一個線程持有了write Latch。所以這個時候🌟操作要執行。隨后在插入33

img

最后一點補充關於掃描操作的

  1. 線程1在C結點上持有write Latch
  2. 線程2已經掃描完了結點B想要獲得結點C的read Latch

這時候會發生問題,因為線程2無法拿到read Latch

這里有幾種解決方法

  1. 可以等到T1的寫操作完成
  2. 可以重新執行T2
  3. 可以直接讓線程T2停止搶得這個Latch。

img

注意這里的LatchLock並不一樣

img

6. 輔助函數分析

1. 輔助函數UnlockUnpinPages的實現#

  1. 如果是讀操作則釋放read鎖
  2. 否則釋放write鎖
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::
UnlockUnpinPages(Operation op, Transaction *transaction) {
  if (transaction == nullptr) {
    return;
  }

  for (auto page:*transaction->GetPageSet()) {
    if (op == Operation::READ) {
      page->RUnlatch();
      buffer_pool_manager_->UnpinPage(page->GetPageId(), false);
    } else {
      page->WUnlatch();
      buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
    }
  }
  transaction->GetPageSet()->clear();

  for (const auto &page_id: *transaction->GetDeletedPageSet()) {
    buffer_pool_manager_->DeletePage(page_id);
  }
  transaction->GetDeletedPageSet()->clear();

  // if root is locked, unlock it

  node_mutex_.unlock();
  }

四個自帶的解鎖和上鎖操作

/** Acquire the page write latch. */
inline void WLatch() { rwlatch_.WLock(); }

/** Release the page write latch. */
inline void WUnlatch() { rwlatch_.WUnlock(); }

/** Acquire the page read latch. */
inline void RLatch() { rwlatch_.RLock(); }

/** Release the page read latch. */
inline void RUnlatch() { rwlatch_.RUnlock(); }

這里的rwlatch是自己實現的讀寫鎖類下面來探究一下這個類

由於c++ 並發編程我現在還不太會。。。所以就簡單看一下啦后面學完並發編程再補充

  1. WLock函數

    1. 首先獲取一個鎖
    2. 用一個記號writer_entered表示是否有寫操作
    3. 如果之前已經有了現在的操作就需要等(這個線程處於阻塞狀態)
    4. 當前如果有其他線程執行讀操作。則仍需要阻塞(別人讀的時候你不能寫)
    void WLock() {
      std::unique_lock<mutex_t> latch(mutex_);
      while (writer_entered_) {
        reader_.wait(latch);
      }
      writer_entered_ = true;
      while (reader_count_ > 0) {
        writer_.wait(latch);
      }
    }
    
  2. WunLock函數

    1. 寫標記置為false
    2. 然后通知所有的線程
    void WUnlock() {
      std::lock_guard<mutex_t> guard(mutex_);
      writer_entered_ = false;
      reader_.notify_all();
    }
    
  3. RLock函數

    1. 如果當前有人在寫或者已經有最多的人讀了則阻塞
    2. 否則只需要讓讀的計數++

    因為是允許多個線程一起讀這樣並不會出錯

    void RLock() {
      std::unique_lock<mutex_t> latch(mutex_);
      while (writer_entered_ || reader_count_ == MAX_READERS) {
        reader_.wait(latch);
      }
      reader_count_++;
    }
    
  4. RUnLatch函數

    1. 計數--
    2. 如果當前有人在寫並且無人讀的話需要通知所有其他線程
    3. 如果在計數--之前達到了最大讀數,釋放這個鎖之后需要通知其他線程,現在又可以讀了。
    void RUnlock() {
      std::lock_guard<mutex_t> guard(mutex_);
      reader_count_--;
      if (writer_entered_) {
        if (reader_count_ == 0) {
          writer_.notify_one();
        }
      } else {
        if (reader_count_ == MAX_READERS - 1) {
          reader_.notify_one();
        }
      }
    }
    

7. 並發索引實現

1. FindLeafPageRW的實現

1. 1 整體思路

對於並發控制的實現,采用最簡單的latch crabing方法實現,也就是上面講的那種方法, 這種方法需要在找葉子結點的時候,從根節點到葉子結點的過程需要逐步加鎖,然后檢測是否能夠釋放。由於我們的插入和刪除操作都需要先找到葉子結點,所以之前使用的無鎖版本的FindLeafPage函數在並發條件下就並不適用了。因此這里需要實現一個逐步加鎖 + 逐步釋放的新函數

  1. 整體思路和之前的findLeafPage幾乎一樣,只是多了幾次判斷
  2. 如果是讀操作,那則直接加鎖,然后對上一層釋放鎖
  3. 如果是寫操作,釋放鎖之前則要判斷一下是否安全。
INDEX_TEMPLATE_ARGUMENTS
Page *BPLUSTREE_TYPE::FindLeafPageRW(const KeyType &key, bool left_most, enum OpType op, Transaction *transaction) {
  Page *page = buffer_pool_manager_->FetchPage(root_page_id_);  // now root page is pin
  BPlusTreePage *node = reinterpret_cast<BPlusTreePage *>(page->GetData());
  while (!node->IsLeafPage()) {
    if (op == OpType::Read) {
      page->RLatch();
      UnlatchAndUnpin(op,transaction);
    } else {
      // else is write op
      page->WLatch();
      if (IsSafe(node, op)) {
        UnlatchAndUnpin(op, transaction);
      }
    }
    transaction->AddIntoPageSet(page);
    InternalPage *internal_node = reinterpret_cast<InternalPage *>(node);
    page_id_t next_page_id = left_most ? internal_node->ValueAt(0) : internal_node->Lookup(key, comparator_);
    Page *next_page = buffer_pool_manager_->FetchPage(next_page_id);  // next_level_page pinned
    BPlusTreePage *next_node = reinterpret_cast<BPlusTreePage *>(next_page);
    page = next_page;
    node = next_node;
  }
  return page;
}

1.2 判斷是否安全的函數

  1. 如果是插入操作,則只要當前node的size處於安全狀態即 + 1 之后不會產生分裂,則為安全
  2. 如果是刪除狀態。則只要當前node的size - 1 之后不會重分配或者合並,則為安全
  3. 對於根節點需要進行特殊判斷,如果這個根節點是葉子結點則為安全(這種情況隨便刪)。否則根節點的大小必須大於2(因為如果等於2 ,減去1之后還是1。則是一個沒有有效key值的結點,不安全)
INDEX_TEMPLATE_ARGUMENTS
template <typename N>
bool BPLUSTREE_TYPE::IsSafe(N *node, enum OpType op) {
  // insert
  if (op == OpType::Insert) {
    return node->GetSize() < maxSize(node);
  }

  // remove
  if (node->IsRootPage()) {
    // If root is a leaf node, no constraint
    // If root is an internal node, it must have at least two pointers;
    if (node->IsLeafPage()) {
      return true;
    }

    return node->GetSize() > 2;
  }

  return node->GetSize() > minSize(node);
}

1.3 把釋放鎖和unpin操作合並

這兩個操都要對page做,與其多寫幾行不如寫個函數給他合並在一起做。

transaction->GetPageSet(); 就是之前訪問過的page集合

INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::UnlatchAndUnpin(enum OpType op,Transaction *transaction) const {
  if (transaction == nullptr) {
    return;
  }

  auto pages = transaction->GetPageSet();
  for (auto page : *pages) {
    page_id_t page_id = page->GetPageId();

    if (op == OpType::Read) {
      page->RUnlatch();
      buffer_pool_manager_->UnpinPage(page_id, false);
    } else {
      page->WUnlatch();
      buffer_pool_manager_->UnpinPage(page_id, true);
    }
  }

  pages->clear();
}

2. 支持並發的讀寫操作

其實只需要之前博客1、2的非並發版本上做一些小小的改動

2.1 支持並發讀

2.2 支持並發寫

這里要支持插入和刪除兩種寫操作

1. 插入

  1. 根據實驗提示,首先需要獲取對於根節點的鎖。我個人認為是為了防止下面這種情況發生

    如果理解的有問題,歡迎大家指出,互相討論

    consider the following case

    txn A read "A" from tree but not hold mutex (if "A" not in the tree)

    before A crab page0 latch , txnB crab page0 and insert "A" into the tree then unlatch page0

    txnA crab page0 get false result

  2. FindLeafRW替換之前的FindLeaf函數即可

  3. UnLatchAndPin替換之前簡單的unpin操作。

完整代碼就不貼了,在之前的insert上改一下就行了

2. 刪除

  1. 對於刪除首先要在remove上做和insert一樣的處理
  2. 在核心函數CoalesceOrRedistribute中對兄弟結點做修改之前,先加寫鎖結束之后釋放寫鎖就ok


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM