leveldb源碼分析--SSTable之Compaction


對於compaction是leveldb中體量最大的一部分,也應該是最為復雜的部分,為了便於理解我們首先從一些基本的概念開始。下面是一些從doc/impl.html中翻譯和整理的內容:

Level 0

當日志文件超過一定大小的閾值是 (默認為 1MB):

  • 建立一個新的memtable和日志文件,以后的操作都是用新的memtable和日志文件
  • 后台進行如下操作:
    • 將舊的 memtable寫到SSTable中(過程為先轉為immtable_table,然后遍歷寫入)
    • 廢棄舊的 memtable
    • 刪除舊的 memtable和日志文件
    • 將新的SSTable加到level 0中.

這是doc/impl.html中的說明,但是在源代碼中我們可以看到在MakeRoomForWrite函數中有邏輯,當滿足一些其他條件之后(這里的其他條件不涉及到這個閾值大小)(mem_->ApproximateMemoryUsage() > options_.write_buffer_size) 就會有

log_ = new log::Writer(lfile);
    imm_ = mem_;
    has_imm_.Release_Store(imm_);
    mem_ = new MemTable(internal_comparator_);

  這些操作,及上面描述的操作。而再查找write_buffer_size在Options::Options()  中進行了如下初始化write_buffer_size(4<<20),所以這里不知道是不是文檔過久未更新的原因,所以從代碼來看應該是閾值達到4MB時。而且這里的計算也不是已日志文件為依據的,而是以memtable的內存使用量為依據,當然這里兩個數據應該是相差不大的,只是直觀上來說應該是memtable的內存使用量。

我們再來看看compaction涉及到的一些因素:

過程

當level L中的文件的大小超過閾值時,我們在后台對其進行compact。compaction過程是先在level L+1中查找和該文件存在key 范圍有重疊(overlap)的文件,如果存在重疊的文件就將其作為compaction的輸入文件,在合並完成以后將其刪除。這里需要注意的是level-0比較特殊,因為level-0的文件本身就有可能相互重疊,所以level-0進行compaction時我們同樣選擇level-0中相互重疊的文件。

compaction就是講選擇的文件進行合並輸出為level L+1的SSTable。當文件大小超過2MB的時候我們新生成一個文件;或者當當前文件可以和level L+2中的10個文件都有重疊時,這個條件是為了保證下次compaction level L+1的時候不會選擇太多的 level L+2中的文件。

這個過程中會刪除(邏輯上)舊文件,然后將新的文件加到工作狀態(即加入到version set中)。

compaction每個level的時候我們都是以循環(以key為基准)的方式進行的,即每次compact之后我們記住compact到的key,下一次我們查找包含這個key之后的下一個key的文件,然后進行compact。

compaction會丟棄唄覆蓋的value,丟棄無用的刪除,這里的無用是指在這個key都不在更高所有的level的key range中。

Timing

Level 0的compaction最多從level 0讀取4個1MB(4個4MB?)的文件,以及所有的level 1文件(10MB),也就是我們將讀取14MB,並寫入14BM。
Level > 0的compaction,從level L選擇一個2MB的文件,最壞情況下,將會和levelL+1的12個文件有重合(10:level L+1的總文件大小是level L的10倍;邊界的2:level L的文件范圍通常不會和level L+1的文件對齊)。因此Compaction將會讀26MB,寫26MB。對於100MB/s的磁盤IO來講,compaction將最壞需要0.5秒。
如果磁盤IO更低,比如10MB/s,那么compaction就需要更長的時間5秒。如果user以10MB/s的速度寫入,我們可能生成很多level 0文件(50個來裝載5*10MB的數據)。這將會嚴重影響讀取效率,因為需要merge更多的文件。
解決方法1:為了降低該問題,我們可能想增加log切換的閾值,缺點就是,log文件越大,對應的memtable文件就越大,這需要更多的內存。
解決方法2:當level 0文件太多時,人工降低寫入速度。
解決方法3:降低merge的開銷,如把level 0文件都無壓縮的存放在cache中。

Number of files

對於更高的level我們可以創建更大的文件,而不是2MB,代價就是更多突發性的compaction。或者,我們可以考慮分區,把文件放存放多目錄中來降低這個的開銷。
然而在2011年2月4號,作者做了一個實驗,在ext3文件系統中當當前文件夾含有不同的文件數量時進行100K次打開文件,結果表明現在的文件系統其實可以不需要分區。

 

Files in directory Microseconds to open a file
1000 9
10000 10
100000 16

 

了解了compaction的一些原理和機制以后我們該回到代碼來看看具體的代碼流程是怎么樣的,首先回到DBimpl中的MakeRoomForWrite

Status DBImpl::MakeRoomForWrite(bool force) {
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
          // 當L0的文件數量要達到閾值的時候,我們每次寫入都延遲1ms,
           // 這樣可以為后台的compaction騰出一定的cpu(當后台compaction
         //和當前線程是使用的一個內核的時候)這樣可以降低寫入延遲的方差
          //因為延遲被分攤到多個寫上面,而不是在幾個甚至一個寫的時候
      env_->SleepForMicroseconds(1000);
      allow_delay = false; // 每次寫只允許延遲一次
    } else if (!force &&  //當前mmetable的占用量未達到閾值
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      break;
    } else if (imm_ != NULL) {
      // 上一次memtable的compaction尚未結束,等待后台compaction完成
       // 因為compaction的過程為 mem ->imm 完成后刪除imm
      bg_cv_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // level 0的文件數量超過閾值,等待后台compaction完成
      bg_cv_.Wait();
    } else {
      // memtable達到閾值,新生成日志和memtable,並將原先的mem轉化為imm給后台compact
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
    
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;             // Do not force another compaction if have room
      MaybeScheduleCompaction(); //觸發后台compaction
    }
  }
  return s;
}

MaybeScheduleCompaction函數只是簡單判斷后台線程是否已經啟動和一些其他的錯誤判斷,如果未啟動則啟動后台compaction線程。這個compaction線程的實現在DBImpl::BackgroundCall,這個函數也只是簡單的調用實現了compaction實際邏輯的函數BackgroundCompaction,我們這里就來仔細分析一下這個函數

void DBImpl::BackgroundCompaction() {
  if (imm_ != NULL) { //有轉化的memtable,直接將MemTable寫入SSTable即返回
    CompactMemTable();
    return;
  }
  if (is_manual) { //用戶主動(手動)觸發的compaction
    ManualCompaction* m = manual_compaction_;
   //取得進項compact的輸入文件生成compaction類
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == NULL);
    if (c != NULL) {
     //取得level中最大的一個key
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
  } else {
    c = versions_->PickCompaction();
  }
  if (c == NULL) {
  } else if (!is_manual && c->IsTrivialMove()) {
  //如果不是主動觸發的,並且level中的輸入文件與level+1中無重疊,且與level + 2中重疊不大於
  //kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接將文件移到level+1中
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);    //寫入version中,稍后分析
  } else {//否則調用DoCompactionWork進行Compact輸入文件
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    CleanupCompaction(compact);      //清理compact過程中的臨時變量
    c->ReleaseInputs();               //清除輸入文件描述符
    DeleteObsoleteFiles();            //刪除無引用的文件
  }
  delete c;
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    if (!status.ok()) {//如果compaction出錯,也將手動的compaction標記為done
      m->done = true;
    }
    if (!m->done) {//如果沒有完成也僅僅記錄基本狀態,感覺manual的形式未實現完整邏輯
      m->tmp_storage = manual_end;
      m->begin = &m->tmp_storage;
    }
    manual_compaction_ = NULL;
  }
}

當手動觸發compaction時,具體compaction哪些文件是由 versions_->CompactRange 根據,level, begin, end來計算的,下面我們來看看這個函數的實現,看看是如何取得輸入文件的

Compaction* VersionSet::CompactRange(
    int level,
    const InternalKey* begin,
    const InternalKey* end) {
 //將Level-level中的range與begin,end有重疊的SSTable描述符放入inputs中
  current_->GetOverlappingInputs(level, begin, end, &inputs);
  if (inputs.empty()) {
    return NULL;
  }

  // 一次不能compact過大的量,將前N個已經大於的保存下來,后面的文件描述符從inputs中移除.
  const uint64_t limit = MaxFileSizeForLevel(level);   //kTargetFileSize = 2 * 1048576;
  uint64_t total = 0;
  for (int i = 0; i < inputs.size(); i++) {
    uint64_t s = inputs[i]->file_size;
    total += s;
    if (total >= limit) {
      inputs.resize(i + 1);
      break;
    }
  }
//new一個Compaction類
  Compaction* c = new Compaction(level);
  c->input_version_ = current_;
  c->input_version_->Ref();
  c->inputs_[0] = inputs;
  SetupOtherInputs(c); //嘗試加入level中新的文件,條件為不再與level+1中新的文件重疊
  return c;
}

可以看到在初步得到了應該compaction的文件和范圍以后,代碼還調用了SetupOtherInputs這個函數,他的作用是為了在不影響性能的情況下盡可能多的compaction當前level的文件

void VersionSet::SetupOtherInputs(Compaction* c) {
  GetRange(c->inputs_[0], &smallest, &largest);
//上一層中oeverlap的加入inputs_[1]
  current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
  // 所有inputs的開始結束范圍
  GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

  // 看能否將level中與取出的level+1中的range重疊的也加到inputs中,
  // 而新加的文件的range都在已經加入的level+1的文件的范圍中
  if (!c->inputs_[1].empty()) {
   //取得和level+1的inputs重疊的level中的文件
    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
    const int64_t expanded0_size = TotalFileSize(expanded0);
    if (expanded0.size() > c->inputs_[0].size() &&  //level中有新文件加入,所有的大小不大於閾值
        inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
        //kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
      GetRange(expanded0, &new_start, &new_limit);
     // 取得level+1中與新的level中的輸入文件overlap的文件
      current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
                                     &expanded1);
      if (expanded1.size() == c->inputs_[1].size()) {
      //如果level+1中無新的文件加入,設置為新的inputs和范圍
        smallest = new_start;
        largest = new_limit;
        c->inputs_[0] = expanded0;
        c->inputs_[1] = expanded1; //這里應該是相等的,此句可以省略
        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
      }
    }
  }
  // 取得level+2 中重疊的文件放入grandparents_
  if (level + 2 < config::kNumLevels) {
    current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
                                   &c->grandparents_);
  }
  
  //記錄本次compact到的key,下次從這個key繼續往后compact
  compact_pointer_[level] = largest.Encode().ToString();
  c->edit_.SetCompactPointer(level, largest);
}

手動compaction時如何獲取選擇輸入文件的邏輯就分析完了,那么leveldb滿足其內部一些閾值條件后觸發的compaction是如何選擇輸入文件的呢?這個邏輯在中,下面我們來仔細的分析一下

Compaction* VersionSet::PickCompaction() {
 //每次compact完成在VersionSet::Finalize中計算每個level中TotalFileSize / MaxBytesForLevel
 // 的值,並且將最大的值最為compaction_score_ ,和compaction_level_
  const bool size_compaction = (current_->compaction_score_ >= 1);  
  //對於每個SSTable會有一個 允許seek的次數 (f->file_size / 16384)超過這么多次會將其設置為
  const bool seek_compaction = (current_->file_to_compact_ != NULL);
  // 這兩種可能導致的compaction中,我們優先compact第一種情況的
  if (size_compaction) {
    level = current_->compaction_level_;
    c = new Compaction(level);
    // 查找第一個包含比上次已經compact的最大key大的key的文件
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
      if (compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if (c->inputs_[0].empty()) {
      // 如果上次已經是最大的key,那么回到第一個文件開始compact
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {//如果是查找導致的,直接將導致compact的文件加入inputs_[0]
    level = current_->file_to_compact_level_;
    c = new Compaction(level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  } else {
    return NULL;
  }
  c->input_version_ = current_;
  c->input_version_->Ref();
  // 如果是level 0 則還需查找level 0中其他和輸入文件重疊的文件
  if (level == 0) {
    GetRange(c->inputs_[0], &smallest, &largest);
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
  }
  SetupOtherInputs(c); //嘗試加入level中新的文件,條件為不再與level+1中新的文件重疊,這個函數已經分析
  return c;
}

選擇好了需要進行Compaction的的文件以后,就該調用實際的Compaction過程了,我們來分析其邏輯,過程比較長但是只要仔細細心的閱讀,其處理的邏輯並不復雜,主要是遍歷所有輸入文件,然后將相同的可以進行合並,以及刪除一些無用的delete操作等。

Status DBImpl::DoCompactionWork(CompactionState* compact) {
    //將snapshot相關的內容記錄到compact信息中
  if (snapshots_.empty()) {
    compact->smallest_snapshot = versions_->LastSequence();
  } else {
    compact->smallest_snapshot = snapshots_.oldest()->number_;
  }
  //遍歷所有inputs文件
  Iterator* input = versions_->MakeInputIterator(compact->compaction);
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
    // 每次都判斷如果有memtable 需要compact,先compact memtable
    if (has_imm_.NoBarrier_Load() != NULL) {
      if (imm_ != NULL) { 
        CompactMemTable();
        bg_cv_.SignalAll(); // Wakeup 等待空間的線程
      }
    }
    Slice key = input->key();
    if (compact->compaction->ShouldStopBefore(key) &&
        compact->builder != NULL) { //當前(level +1)生成的文件和level + 2中有過多的重疊
      status = FinishCompactionOutputFile(compact, input); //寫當前文件到磁盤
      if (!status.ok()) {
        break;
      }
    }
    // Handle key/value, add to state, etc.
    bool drop = false;
    if (!ParseInternalKey(key, &ikey)) {
      // 解碼錯誤,清除之前的狀態
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
    } else {
      if (!has_current_user_key ||
          user_comparator()->Compare(ikey.user_key,
                                     Slice(current_user_key)) != 0) {
        // 第一次出現的key,將seq設置為最大標記新key開始
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
      }
        //因為第一次出現會將last seq設置為最大,表示上一個key的關於seq的比較結束
      if (last_sequence_for_key <= compact->smallest_snapshot) {
        // Hidden by an newer entry for same user key
        drop = true; // (A)
      } else if (ikey.type == kTypeDeletion &&  
                 ikey.sequence <= compact->smallest_snapshot &&               //無snapshot引用
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //(1)
        // For this user key:
        // (1) there is no data in higher levels
        // 而我們知道在底層的文件中seq會更大,正在被compact的相同的key會稍后標記這個為刪除(ruleA)
        drop = true;
      }
      last_sequence_for_key = ikey.sequence; 
    }

    if (!drop) {
      // 第一次進入compact或者上次文件剛剛寫到磁盤,新建一個文件和table_builder
      if (compact->builder == NULL) {
        status = OpenCompactionOutputFile(compact);
        if (!status.ok()) {
          break;
        }
      }
    //新文件,記錄當前key 為 整個文件的smallest
      if (compact->builder->NumEntries() == 0) {
        compact->current_output()->smallest.DecodeFrom(key);
      }
      //每遍歷到一個就將其記錄為largest
      compact->current_output()->largest.DecodeFrom(key);
      compact->builder->Add(key, input->value());
      // 超過level的閾值大小,將文件寫到磁盤
      if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) {
        status = FinishCompactionOutputFile(compact, input);
        if (!status.ok()) {
          break;
        }
      }
    }
    input->Next();
  }
//判斷狀態和將未寫到磁盤的數據寫入磁盤
  if (status.ok() && shutting_down_.Acquire_Load()) {
    status = Status::IOError("Deleting DB during compaction");
  }
  if (status.ok() && compact->builder != NULL) {
    status = FinishCompactionOutputFile(compact, input);
  }
  if (status.ok()) {
    status = input->status();
  }
  delete input;
  input = NULL;
  CompactionStats stats;
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
  for (int which = 0; which < 2; which++) {//計算本次Compaction讀入文件的總大小
    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
      stats.bytes_read += compact->compaction->input(which, i)->file_size;
    }
  }
  for (size_t i = 0; i < compact->outputs.size(); i++) {
    stats.bytes_written += compact->outputs[i].file_size;
  }//本次Compaction寫出文件的總大小
  mutex_.Lock();
  stats_[compact->compaction->level() + 1].Add(stats);
  if (status.ok()) {//記錄統計信息以及將Compaction導致的文件變動記錄到versionedit中
    status = InstallCompactionResults(compact);
  }
  return status;
}

SSTable的Compaction就分析完了,關於Compaction還剩下MemTable的Compaction,或者也可以將其說明為Memtable的dump為SSTable。再分析完上面的SSTable Compaction后你就發現MemTable的Compaction是如此之簡單了,我們簡單羅列一下

void DBImpl::CompactMemTable() {
  Status s = WriteLevel0Table(imm_, &edit, base);
  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
    s = versions_->LogAndApply(&edit, &mutex_);
  }
  if (s.ok()) {
    // Commit to the new state
    imm_->Unref();
    imm_ = NULL;
    has_imm_.Release_Store(NULL);
    DeleteObsoleteFiles();
  } else {
    RecordBackgroundError(s);
  }
}

這個邏輯中就一個主要的函數WriteLevel0Table,其流程如下:

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) { 
  meta.number = versions_->NewFileNumber();
  pending_outputs_.insert(meta.number);
  Iterator* iter = mem->NewIterator();
  //新生成一個Table_builder負責寫文件
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);

  // Note that if file_size is zero, the file has been deleted and 
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    if (base != NULL) {
      /* 找到一個當層未overlap 且上冊overlap 不會過多(kMaxGrandParentOverlapBytes)的層返回*/ 
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    }
    //將文件加到versionedit中
    edit->AddFile(level, meta.number, meta.file_size,
                  meta.smallest, meta.largest);
  }
  CompactionStats stats;
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
  stats_[level].Add(stats);
  return s;
}

這里有一個唯一需要注意的是——將Memtable dump到磁盤以后並不是如文檔描述的“將新的SSTable加到level 0中.”,而是會用一個函數PickLevelForMemTableOutput選擇一個最高的可以將這個SSTable放入的level中。一般來說會是level 0,但是還是存在一些特殊情況可以將其放到更高的level中,這樣可以降低Compaction的頻率。PickLevelForMemTableOutput的邏輯簡單,請讀者自行閱讀。

至此comaction流程相關的函數就分析完了,本節內容比較多,但是只要靜下心來慢慢品讀理解還是不難的。至此leveldb中剩下的還有recover,new (新建一個數據庫)、snapshot、get相關的代碼沒有分析了。我們在compaction的分析過程中涉及到了很多有關version的類、方法、結構,leveldb的vesion是整個系統極其重要的一環,而且recovery,snapshot,get在一定程度上都會依賴於version的實現,所以接下來的文章准備對version相關的內容進行介紹。敬請期待……


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM