leveldb源碼分析--插入刪除流程


由於網絡上對leveldb的分析文章都比較豐富,一些基礎概念和模型都介紹得比較多,所以本人就不再對這些概念以專門的篇幅進行介紹,本文主要以代碼流程注釋的方式。

首先我們從db的插入和刪除開始以對整個體系有一個感性的認識,首先看插入:

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;        //leveldb中不管單個插入還是多個插入都是以WriteBatch的方式進行的
  batch.Put(key, value);
  return Write(opt, &batch);
}

Delete也類似,只是調用了WriteBatch 的 Delete(key), 這樣再內部會以不同的形式編碼傳遞至下一步進行處理。具體的WriteBatch的實現和編碼方式在稍后的文章中進行介紹。Delete和Put都調用了Write,,這里的Write是在DBImpl::Write中通過虛函數的形式實現對其調用的,我們接着看Write的流程

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;
    /* 
產生一個Writer對象,然后保存必要的鎖、batch、和同步寫的相關信息
*/
  MutexLock l(&mutex_);
  writers_.push_back(&w);   // 上鎖,然后放入待寫的隊列中
  while (!w.done && &w != writers_.front()) {
/* 這里設計比較特別,需要跟后面的 BuildBatchGroup結合起來看,這里大致的意思是
   一直等待到這次寫完成或者這次寫被放在隊列的最前面,BuildBatchGroup會將隊列  
   里所有sync設置相同的寫請求組成一個WriteBatch進行寫入,這里的寫請求有可能在
   別的線程完成寫操作了,而是否在隊列首的判斷是有可能此刻沒有其他線程在寫循環中,
   或者本次寫請求和前面的寫請求的同步設置不一致,那么這種情況就需要自己進入該線
   程完成寫的操作。
*/
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // 這個函數的主要作用是清理內存表和外存(磁盤)的表使內存表騰出空間插入新的數據
// 這里的設計比較復雜設計到leveldb 的很多核心設計,我們這里先大致了解其功能
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

     {
      mutex_.Unlock();BuildBatchGroup
   // 這里講組裝好的batch內容寫入log,並根據同步設置判斷是否同步到磁盤
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
    // 寫入內存表,這里采用了一個遍歷WriteBatch完成插入的方式,稍后分析
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
// 如果同步錯誤則記錄相應錯誤信息.
        RecordBackgroundError(status);
      }
    }
// 刪除在BuildBatch里面設置的零時Batch的內容 
if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
// 喚醒所有等待寫入的線程
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue,因為可能請求時不在隊首而進入了等待狀態,
// 這樣喚醒他使其成為新的隊首寫線程,進行MakeRoomForWrite等一系列操作
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

所以從流程可以清晰的看到插入刪除的流程主要為:

1. 將這條KV記錄以順序寫的方式追加到log文件末尾;

2. 將這條KV記錄插入內存中的Memtable中,在插入過程中如果剛好后台進程在compaction會短暫停頓以為后台進程compaction騰出時間及cpu

這里涉及到一次磁盤讀寫操作和內存SkipList的插入操作,但是這里的磁盤寫時文件的順序追加寫入效率是很高的,所以並不會導致寫入速度的降低;

而且從流程分析我們知道,在插入(刪除)過程中如果多線程同時進行,那么這些操作將會將操作的同步設置相同的相鄰的操作合並為一個批插入,這樣可以使整個系統的總吞吐量更大。所以一次插入記錄操作只會等待一次磁盤文件追加寫和內存SkipList插入操作,這是為何leveldb寫入速度如此高效的根本原因。

我們這里講插入和刪除以等同的方式進行了介紹,可能有的朋友會覺得奇怪,刪除不是需要查找到插入的原始記錄的么?而leveldb進行了一個巧妙的將隨機讀寫,轉換為順序讀寫的方式,那就是其並不存在立即刪除的操作,而是與插入操作相同將插入操作插入的是Key:Value值改為刪除操作插入的是“Key:刪除標記”,並不真正立即去刪除記錄,而是后台Compaction的時候才去做對應的真正的刪除操作,這又極大的提高了leveldb的效率。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM