LevelDB的源碼閱讀(四) Compaction操作


leveldb的數據存儲采用LSM的思想,將隨機寫入變為順序寫入,記錄寫入操作日志,一旦日志被以追加寫的形式寫入硬盤,就返回寫入成功,由后台線程將寫入日志作用於原有的磁盤文件生成新的磁盤數據.Leveldb在內存中維護一個數據結構memtable,采用skiplist來實現,保存當前寫入的數據,當數據達到一定規模后變為不可寫的內存表immutable table.新的寫入操作會寫入新的memtable,而immutable table會被后台線程寫入到數據文件.Leveldb的數據文件是按層存放的,默認配置的最高層級是7,即level0,level1,…,level7.內存中的immutable總是寫入level0,除level0之外的各個層leveli的所有數據文件的key范圍都是互相不相交的.當滿足一定條件時,leveli的數據文件會和leveli+1的數據文件進行merge,產生新的leveli+1層級的文件,這個磁盤文件的merge過程和immutable的dump過程叫做Compaction,在leveldb中是由一個單獨的后台線程來完成的.

進行Compaction操作的條件如下:

1.產生了新的immutable table需要寫入數據文件

2.某個level的數據規模過大

3.某個文件被無效查詢的次數過多(在文件i中查詢key,沒有找到key,這次查詢稱為文件i的無效查詢)

4.手動compaction

滿足以上條件會啟動Compaction過程,接下來分析詳細的Compaction過程.

 Leveldb進行Compaction的入口函數是db文件夾下db_impl.cc文件中的DBImpl::MaybeScheduleCompaction,該函數在每次leveldb進行讀寫操作時都有可能被調用.源碼內容如下:

void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (bg_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == NULL &&
             manual_compaction_ == NULL &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    bg_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);  //新建后台任務並進行調度
  }
}

首先調用db文件夾下version_set.h中的NeedsCompaction()判斷是否需要啟動Compact任務.源碼內容如下:

// Returns true iff some level needs a compaction.
  bool NeedsCompaction() const {
    Version* v = current_;
    return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
  }

version_set.cc中compaction_score_ 的計算如下:

void VersionSet::Finalize(Version* v) {
  // Precomputed best level for next compaction
  int best_level = -1;
  double best_score = -1;

  for (int level = 0; level < config::kNumLevels-1; level++) {
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
      score = v->files_[level].size() /
          static_cast<double>(config::kL0_CompactionTrigger);
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}

注意,這里同時預計算了進行compaction的最佳level.

確認需要啟動compaction之后,調用util文件夾下env_posix.cc文件中的PosixEnv::Schedule函數啟動Compact過程.

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
  if (!started_bgthread_) {
    started_bgthread_ = true;
    PthreadCall(
        "create thread",
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
  }

  // If the queue is currently empty, the background thread may currently be
  // waiting.
  if (queue_.empty()) {
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

 如果沒有后台線程,則創建后台線程,否則新建一個后台執行任務BGItem壓入后台線程任務隊列,然后調用PosixEnv::BGThreadWrapper喚醒后台線程: 

static void* BGThreadWrapper(void* arg) {
    reinterpret_cast<PosixEnv*>(arg)->BGThread();
    return NULL;
  }

 BGThreadWrapper調用PosixEnv::BGThread,不斷地從后台任務隊列中拿到任務,然后執行任務

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

回到DBImpl::MaybeScheduleCompaction,方便理解起見這里再重復一遍源碼

void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (bg_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == NULL &&
             manual_compaction_ == NULL &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    bg_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);  //新建后台任務並進行調度
  }
}

之前分析了env_->Schedule進行的調度過程,現在來分析實際進行后台任務的DBImpl::BGWork.DBImpl::BGWork在db文件夾下db_impl.cc文件中.

void DBImpl::BGWork(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

DBImpl::BGWork調用DBImpl::BackgroundCall(),合並完成后可能導致有的level的文件數過多,因此會再次調用MaybeScheduleCompaction()判斷是否需要繼續進行合並.

void DBImpl::BackgroundCall() {
  MutexLock l(&mutex_);
  assert(bg_compaction_scheduled_);
  if (shutting_down_.Acquire_Load()) {
    // No more background work when shutting down.
  } else if (!bg_error_.ok()) {
    // No more background work after a background error.
  } else {
    BackgroundCompaction();
  }

  bg_compaction_scheduled_ = false;

  // Previous compaction may have produced too many files in a level,
  // so reschedule another compaction if needed.
  MaybeScheduleCompaction();
  bg_cv_.SignalAll();
}

DBImpl::BackgroundCall()調用 BackgroundCompaction(),在BackgroundCompaction()中分別完成三種不同的Compaction:對Memtable進行合並、 trivial Compaction(直接將文件移動到下一層)以及一般的合並,調用DoCompactionWork()實現.

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  if (imm_ != NULL) {
    CompactMemTable();//1、對Memtable進行合並  
    return;
  }

  Compaction* c;
  bool is_manual = (manual_compaction_ != NULL);//manual_compaction默認為NULL,則is_manual默認為false  
  InternalKey manual_end;
  if (is_manual) { //取得手動compaction對象
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == NULL);
    if (c != NULL) {
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
    Log(options_.info_log,
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
        m->level,
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  } else {   //取得自動compaction對象
    c = versions_->PickCompaction();
  }

  Status status;
  if (c == NULL) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {//2、IsTrivialMove 返回 True,trivial Compaction,則直接將文件移入 level + 1 層即可
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    VersionSet::LevelSummaryStorage tmp;
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
        static_cast<unsigned long long>(f->number),
        c->level() + 1,
        static_cast<unsigned long long>(f->file_size),
        status.ToString().c_str(),
        versions_->LevelSummary(&tmp));
  } else { //3、一般的合並  
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact); //進行compaction
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();      // input的文件引用計數減少1
    DeleteObsoleteFiles();   //刪除無用文件
  }
  delete c;

  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
    Log(options_.info_log,
        "Compaction error: %s", status.ToString().c_str());
  }

  if (is_manual) {
    ManualCompaction* m = manual_compaction_;   //標記手動compaction任務完成
    if (!status.ok()) {
      m->done = true;
    }
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
      m->tmp_storage = manual_end;
      m->begin = &m->tmp_storage;
    }
    manual_compaction_ = NULL;
  }
}

 首行mutex_.AssertHeld(),Mutex的AssertHeld函數實現默認為空,在很多函數的實現內有調用,其作用如下: 

As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.

Memtable的合並

Compaction首先檢查imm_,及時將已寫滿的memtable寫入磁盤sstable文件,對Memtable的合並,調用DBImpl::CompactMemTable()完成:

void DBImpl::CompactMemTable() {
  mutex_.AssertHeld();
  assert(imm_ != NULL);//imm_不能為空
  VersionEdit edit;
  Version* base = versions_->current();
  base->Ref();
  Status s = WriteLevel0Table(imm_, &edit, base);//將Memtable轉化為.sst文件,寫入level0 sst table,並寫入到edit中
  base->Unref();  
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    s = versions_->LogAndApply(&edit, &mutex_);//應用edit中記錄的變化,來生成新的版本 
  }

  if (s.ok()) {
      // Commit to the new state
    imm_->Unref();
    imm_ = NULL;
    has_imm_.Release_Store(NULL);
    DeleteObsoleteFiles();  
  } else {
    RecordBackgroundError(s);
  }
}

其中CompactMemTable()主要調用了兩個函數:WriteLevel0Table()和versions_->LogAndApply()

CompactMemTable()首先調用WriteLevel0Table(),源碼內容如下:

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
                                Version* base) {
  mutex_.AssertHeld();
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();//獲取新生成的.sst文件的編號
  pending_outputs_.insert(meta.number);
  Iterator* iter = mem->NewIterator();//用於遍歷Memtable中的數據

  Status s;
  {
    mutex_.Unlock();
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//創建.sst文件,並將其相關信息記錄在meta中
    mutex_.Lock();
  }

  delete iter;  //iter用完之后一定要刪除
  pending_outputs_.erase(meta.number);

  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    if (base != NULL) {
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);//為合並的輸出文件選擇合適的level
    }
    edit->AddFile(level, meta.number, meta.file_size,meta.smallest, meta.largest);//將生成的.sst文件加入到該level
  }
  return s;
}

WriteLevel0Table()首先調用BuildTable()將Immutable Memtable中所有的數據寫入到一個.sst文件中,並將.sst文件的信息(文件編號,Key值范圍,文件大小)記錄到變量meta中.由於Memtable是基於Skiplist的,是一個有序表,因此在寫入.sst文件時,Key值也是從小到大來排列的.可以發現,將Memtable中的數據轉換為SSTable時,是將所有記錄都寫入SSTable的,要刪除的記錄也一樣.刪除操作會在更高level的Compaction中完成.因此level 0中可能會存在Key值相同的記錄. 

Status BuildTable(const std::string& dbname,
                  Env* env,
                  const Options& options,
                  TableCache* table_cache,
                  Iterator* iter,
                  FileMetaData* meta) {
  Status s;
  meta->file_size = 0;
  iter->SeekToFirst();  
  std::string fname = TableFileName(dbname, meta->number);//獲得新建表名字
  if (iter->Valid()) {
    WritableFile* file;    
    s = env->NewWritableFile(fname, &file);   //建立新的表文件,后續寫入數據
    if (!s.ok()) {
      return s;
    }
    TableBuilder* builder = new TableBuilder(options, file); //建立TableBuilder
    meta->smallest.DecodeFrom(iter->key());
    for (; iter->Valid(); iter->Next()) {    //將key/value對加入builder
      Slice key = iter->key();
      meta->largest.DecodeFrom(key);
      builder->Add(key, iter->value());
    }

    // Finish and check for builder errors
    s = builder->Finish(); //構建indexhandler,metahandler,寫入文件
    if (s.ok()) {
      meta->file_size = builder->FileSize();
      assert(meta->file_size > 0);
    }
    delete builder;

    // Finish and check for file errors
    if (s.ok()) {
      s = file->Sync();  //寫入文件
    }
    if (s.ok()) {
      s = file->Close();
    }
    delete file;
    file = NULL;

    if (s.ok()) {
      // Verify that the table is usable
      Iterator* it = table_cache->NewIterator(ReadOptions(),
                                              meta->number,
                                              meta->file_size); //將表結構加入表緩存
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

  if (s.ok() && meta->file_size > 0) {
    // Keep it
  } else {
    env->DeleteFile(fname);
  }
  return s;
}

 該函數利用iter向TableBuilder中加入key/value對,然后寫入文件並同步,將新生成的Table結構加入tablecache以備后用.

table_builder文件在table文件夾下,其中TableBuilder::Add函數流程如下:

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  if (r->pending_index_entry) {//新的block開始
    assert(r->data_block.empty());
    r->options.comparator->FindShortestSeparator(&r->last_key, key);
    std::string handle_encoding;
    r->pending_handle.EncodeTo(&handle_encoding);
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r->pending_index_entry = false;
  }
  //計算filter
  if (r->filter_block != NULL) {
    r->filter_block->AddKey(key);
  }
  //加入blockbuilder
  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, value);
  // block大於配置的尺寸(默認為4k)則結束該block,輸出后開啟新的Block。
  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

 將Block結構寫入文件的TableBuilder::WriteBlock函數流程如下:

void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  // File format contains a sequence of blocks where each block has:
  //    block_data: uint8[n]
  //    type: uint8
  //    crc: uint32
  assert(ok());
  Rep* r = rep_;
  Slice raw = block->Finish(); //取得block格式化數據

  Slice block_contents;
    //獲取是否壓縮配置選項
  CompressionType type = r->options.compression;
  // TODO(postrelease): Support more compression options: zlib?
  switch (type) {
    case kNoCompression:
      block_contents = raw;
      break;

    case kSnappyCompression: {
      std::string* compressed = &r->compressed_output;
      if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
          compressed->size() < raw.size() - (raw.size() / 8u)) {
        block_contents = *compressed;
      } else {
        // Snappy not supported, or compressed less than 12.5%, so just
        // store uncompressed form
        block_contents = raw;
        type = kNoCompression;
      }
      break;
    }
  }
  //進行壓縮后,然后寫入文件,blockdata+type+crc32
  WriteRawBlock(block_contents, type, handle);
  r->compressed_output.clear();
  block->Reset();
}

而TableBuilder::Finish的函數定義如下:

Status TableBuilder::Finish() {
  Rep* r = rep_;
  Flush();//將block數據寫入,可能不是滿的block
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != NULL) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != NULL) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}

以上代碼中調用的flush源碼內容如下:

void TableBuilder::Flush() {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->data_block.empty()) return;
  assert(!r->pending_index_entry);
  WriteBlock(&r->data_block, &r->pending_handle);
  if (ok()) {
    r->pending_index_entry = true;
    r->status = r->file->Flush();
  }
  if (r->filter_block != NULL) {
    r->filter_block->StartBlock(r->offset);
  }
}

然后WriteLevel0Table()調用PickLevelForMemTableOutput()為Memtable合並的輸出文件選擇合適的level,並調用edit->AddFile()將生成的.sst文件加入到該level中.

WriteLevel0Table()結束后,CompactMemTable()調用db文件夾下version_set.cc文件中的versions_->LogAndApply()基於當前版本和更改edit來得到一個新版本.之后會對versions_->LogAndApply()進行分析.

Trivial Compaction

由之前的分析可知,is_manual默認為false,會調用PickCompaction()來選出要進行合並的level和相應的輸入文件.當c->IsTrivialMove()滿足時,則直接將文件移動到下一level.

  c = versions_->PickCompaction();

  Status status;
  if (c == NULL) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);  //將文件從該層刪除
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,   //將該文件加入到下一level
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);  //應用更改,創建新的Version
  } 

首先調用db文件夾下version_set.cc文件中的VersionSet::PickCompaction()為接下來的Compaction操作准備輸入數據,由之前對Compaction的數據結構分析可知,Compaction操作有兩種觸發方式:某一level的文件數太多和某一文件的查找次數超過允許值,在進行合並時,將優先考慮文件數過多的情況. 

Compaction* VersionSet::PickCompaction() {
  Compaction* c;
  int level;

  const bool size_compaction = (current_->compaction_score_ >= 1);//文件數過多
  const bool seek_compaction = (current_->file_to_compact_ != NULL);//某一文件的查找次數太多
  if (size_compaction) {//文件數太多優先考慮
    level = current_->compaction_level_;  //要進行Compaction的level
    c = new Compaction(level);
    //每一層有一個compact_pointer,用於記錄compaction key,這樣可以進行循環compaction
    for (size_t i = 0; i < current_->files_[level].size(); i++) { //從待合並的level中選擇合適的文件完成合並操作
      FileMetaData* f = current_->files_[level][i];  //level層中的第i個文件
      if (compact_pointer_[level].empty() || //compact_pointer_中記錄的是下次合並的起始Key值,為空時都可以進行合並
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { //或者f的最大Key值大於起始值
        c->inputs_[0].push_back(f);//則該文件可以參與合並,將其加入到level輸入文件中
        break;
      }
    }
    if (c->inputs_[0].empty()) { //若level輸入為空,則將level的第一個文件加入到輸入中
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  } else if (seek_compaction) {//然后考慮查找次數過多的情況
    level = current_->file_to_compact_level_;
    c = new Compaction(level);
    c->inputs_[0].push_back(current_->file_to_compact_);//將待合並的文件作為level層的輸入
  } else {
    return NULL;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  //level 0中的Key值是可以重復的,因此Key值范圍可能相互覆蓋,把所有重疊都找出來,一起做compaction
  if (level == 0) {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);//待合並的level層的文件的Key值范圍
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }
  SetupOtherInputs(c);//獲取待合並的level+1層的輸入
  return c;
}

 然后判斷是否為trivial Compaction,當為trivial Compaction時,只需要簡單的將level層的文件移動到level +1 層即可

bool Compaction::IsTrivialMove() const {
  return (num_input_files(0) == 1 &&   //level層只有1個文件
          num_input_files(1) == 0 &&   //level+1層沒有文件
          TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);//level+2層文件總大小不超過最大覆蓋范圍,否則會導致后面的merge需要很大的開銷
}

最終完成完成Compaction操作

c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,f->smallest, f->largest);
status = versions_->LogAndApply(c->edit(), &mutex_);  

一般的合並

一般的合並調用DBImpl::DoCompactionWork()完成,compact是調用VersionSet::PickCompacttion()得到的,與之前的trivial Compaction相同.不同level之間,可能存在Key值相同的記錄,但是記錄的seq不同.由之前的分析可知,最新的數據存放在較低的level中,其對應的seq也一定比level+1中的記錄的seq要大,因此當出現相同Key值的記錄時,只需要記錄第一條記錄,后面的都可以丟棄.level 0中也可能存在Key值相同的數據,其后面的seq也不同.數據越新,其對應的seq越大,且記錄在level 0中的記錄是按照user_key遞增,seq遞減的方式存儲的,則相同user_key對應的記錄是聚集在一起的,且按照seq遞減的方式存放的.在更高層的Compaction時,只需要處理第一條出現的user_key相同的記錄即可,后面的相同user_key的記錄都可以丟棄.因此合並后的level +1層的文件中不會存在Key值相同的記錄.刪除記錄的操作也會在此時完成,刪除數據的記錄會被丟棄,而不會被寫入到更高level的文件中. 

Status DBImpl::DoCompactionWork(CompactionState* compact) {
  if (snapshots_.empty()) {
    compact->smallest_snapshot = versions_->LastSequence();
  } else {
    compact->smallest_snapshot = snapshots_.oldest()->number_;
  }
  mutex_.Unlock();
  //生成iterator:遍歷要compaction的數據
  Iterator* input = versions_->MakeInputIterator(compact->compaction);//用於遍歷待合並的每一個文件
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
    if (has_imm_.NoBarrier_Load() != NULL) {  //immutable memtable的優先級最高
      mutex_.Lock();
      if (imm_ != NULL) {   //當imm_非空時,合並Memtable
        CompactMemTable();
        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
      }
      mutex_.Unlock();
    }

    Slice key = input->key();
    if (compact->compaction->ShouldStopBefore(key) &&   //是否需要停止Compaction,中途輸出compaction的結果,避免compaction結果和level N+2 files有過多的重疊
        compact->builder != NULL) {
      status = FinishCompactionOutputFile(compact, input);
    }

    bool drop = false;
    if (!ParseInternalKey(key, &ikey)) {
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
    } else {
      if (!has_current_user_key ||    //獲取當前的user_key和sequence
          user_comparator()->Compare(ikey.user_key,
          Slice(current_user_key)) != 0) { //可能存在Key值相同但seq不同的記錄
        // 此時是這個Key第一次出現
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;//則將其seq設為最大值,表示第一次出現
      }

      if (last_sequence_for_key <= compact->smallest_snapshot) {//表示key已經出現過,否則seq應為KMaxSequenceNumber
        drop = true;    // (A)   //之前已經存在Key值相同的記錄,丟棄
      } else if (ikey.type == kTypeDeletion &&   //要刪除該記錄
              ikey.sequence <= compact->smallest_snapshot &&  //記錄的序號比數據庫之前的最小序號還小
              compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //高的level中沒有數據
        drop = true;   //此時要丟棄該記錄
      }
      last_sequence_for_key = ikey.sequence;//上次出現的記錄對應的sequence,用於判斷后面出現相同Key值的情況
    }

    if (!drop) {   //如果不需要丟棄該記錄
      if (compact->builder == NULL) {
        status = OpenCompactionOutputFile(compact);//若需要,則創建一個.sst文件,用於存放合並后的數據
      }
      if (compact->builder->NumEntries() == 0) {
        compact->current_output()->smallest.DecodeFrom(key);
      }
      compact->current_output()->largest.DecodeFrom(key);
      compact->builder->Add(key, input->value());//將記錄寫入.sst文件

      if (compact->builder->FileSize() >=
          compact->compaction->MaxOutputFileSize()) {   //當.sst文件超過最大值時
        status = FinishCompactionOutputFile(compact, input);//完成Compaction輸出文件
      }
    }
    input->Next();  //處理下一個文件
  }

  if (status.ok() && compact->builder != NULL) {
    status = FinishCompactionOutputFile(compact, input);
  }
  if (status.ok()) {
    status = input->status();
  }
  delete input;
  input = NULL;
  
 //更新compaction的一些統計數據
  CompactionStats stats;
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
  for (int which = 0; which < 2; which++) {
    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
      stats.bytes_read += compact->compaction->input(which, i)->file_size;
    }
  }
  for (size_t i = 0; i < compact->outputs.size(); i++) {
    stats.bytes_written += compact->outputs[i].file_size;
  }

  mutex_.Lock();
  stats_[compact->compaction->level() + 1].Add(stats);

  if (status.ok()) {
    status = InstallCompactionResults(compact);//完成合並
  }
  if (!status.ok()) {
    RecordBackgroundError(status);
  }
  VersionSet::LevelSummaryStorage tmp;
  Log(options_.info_log,
      "compacted to: %s", versions_->LevelSummary(&tmp));
  return status;

}

 首先將可以留下的記錄寫入到.sst文件中,並將相關信息保存在變量compact中,然后調用InstallCompactionResults()將所做的改動加入到VersionEdit中,再調用LogAndApply()來得到新的版本. 

Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  mutex_.AssertHeld();
  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1,
      static_cast<long long>(compact->total_bytes));

  // Add compaction outputs
  compact->compaction->AddInputDeletions(compact->compaction->edit());
  const int level = compact->compaction->level();
  for (size_t i = 0; i < compact->outputs.size(); i++) {
    const CompactionState::Output& out = compact->outputs[i];
    compact->compaction->edit()->AddFile(
        level + 1,
        out.number, out.file_size, out.smallest, out.largest);
  }
  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}

 

LogAndApply()

在上面三種不同的Compaction操作中,最終當對當前版本的更改VersionEdit全部完成后,都會調用VersionSet::LogAndApply()來應用更改,創建新版本.edit中保存了level和level+1層要刪除和增加的文件.

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {

  Version* v = new Version(this);  //創建一個新Version
  {
    Builder builder(this, current_);//基於當前Version創建一個builder變量
    builder.Apply(edit);//將edit中記錄的要增加、刪除的文件加入到builder類中
    builder.SaveTo(v);//然后將edit中的記錄保存到新創建的Version中,這樣就得到了一個新的版本
  }
  Finalize(v);//根據各層文件數來判斷是否還需要進行Compaction

  std::string new_manifest_file;
  Status s;
  if (descriptor_log_ == NULL) {   //只會在第一次調用時進入
    assert(descriptor_file_ == NULL);
    new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);//創建一個新的Manifest文件
    edit->SetNextFile(next_file_number_);
    s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    if (s.ok()) {
      descriptor_log_ = new log::Writer(descriptor_file_);
      s = WriteSnapshot(descriptor_log_);//快照,系統開始時完整記錄數據庫的所有信息
    }
  }
  {
    mu->Unlock();
    if (s.ok()) {
      std::string record;
      edit->EncodeTo(&record);
      s = descriptor_log_->AddRecord(record);//將數據庫的變化記錄到Manifest文件中
      if (s.ok()) {
        s = descriptor_file_->Sync();
      }
    }
    if (s.ok() && !new_manifest_file.empty()) {
      s = SetCurrentFile(env_, dbname_, manifest_file_number_);
    }
    mu->Lock();
  }

  if (s.ok()) {
    AppendVersion(v);  //將新得到的Version插入到所有Version形成的雙向鏈表的尾部
    log_number_ = edit->log_number_;
    prev_log_number_ = edit->prev_log_number_;
  }
  }
  return s;
}

為了重啟之后能恢復數據庫之前的狀態,就需要將數據庫的歷史變化信息記錄下來,這些信息都是記錄在Manifest文件中的.為了節省空間和時間,leveldb采用的是在系統開始完整的所有數據庫的信息(WriteSnapShot()),以后則只記錄數據庫的變化,即VersionEdit中的信息(descriptor_log_->AddRecord()).恢復時,只需要根據Manifest中的信息就可以一步步的恢復到上次的狀態.

VersionSet::LogAndApply首先創建一個新的Version,然后調用builder.Apply(edit)將edit中所有要刪除、增加的文件編號記錄下來,其源碼如下:

  // Apply all of the edits in *edit to the current state.
  void Apply(VersionEdit* edit) {
    // 更新每一層下次合並的起始Key值
    for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
      const int level = edit->compact_pointers_[i].first;
      vset_->compact_pointer_[level] =
          edit->compact_pointers_[i].second.Encode().ToString();
    }
    //將所有要刪除的文件加入到levels_[level].deleted_files變量中
    const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
    for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
         iter != del.end();++iter) {
      const int level = iter->first;
      const uint64_t number = iter->second;
      levels_[level].deleted_files.insert(number);
    }
    // 將所有新增加的文件加入到levels_[level].added_files中
    for (size_t i = 0; i < edit->new_files_.size(); i++) {
      const int level = edit->new_files_[i].first;
      FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
      f->refs = 1;
      f->allowed_seeks = (f->file_size / 16384);
      if (f->allowed_seeks < 100) f->allowed_seeks = 100;
      levels_[level].deleted_files.erase(f->number);
      levels_[level].added_files->insert(f);
    }
  }

然后VersionSet::LogAndApply再調用builder.SaveTo(v)將更改保存到新的Version中,其源碼如下:

  void SaveTo(Version* v) {
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
    for (int level = 0; level < config::kNumLevels; level++) {
      const std::vector<FileMetaData*>& base_files = base_->files_[level];//當前Version中原有的各個level的.sst文件
      std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
      std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
      const FileSet* added = levels_[level].added_files;//對應level新增加的文件
      v->files_[level].reserve(base_files.size() + added->size());
      for (FileSet::const_iterator added_iter = added->begin();
           added_iter != added->end();++added_iter) {
        // 將原有文件中編號比added小的加入到新的Version
        for (std::vector<FileMetaData*>::const_iterator bpos
                 = std::upper_bound(base_iter, base_end, *added_iter, cmp);
             base_iter != bpos;++base_iter) {
          MaybeAddFile(v, level, *base_iter);
        }
        MaybeAddFile(v, level, *added_iter);//再將新增的文件依次加入到新的Version
      }
      for (; base_iter != base_end; ++base_iter) {
        MaybeAddFile(v, level, *base_iter);//再將原有文件中剩余的部分加入到新的Version
      }
    }
  }

bpos = std::upper_bound(base_iter,base_end,*added_iter,cmp); // 返回base_iter到base_end之間,第一個大於*added_iter的iter.假設原有文件的編號為1、3、4、6、8,新增文件的編號為2、5、7,則第一次循環時,bpos為3對應的迭代器,因此base_iter只遍歷一個元素,即將編號1加入到新的Version中.總體對新增文件來說,就是首先加入base中編號比它小的,然后再將其加入,然后再繼續比那里下一個新增文件,因此最終得到的文件編號順序是 1、2、3、4、5、6、7、8,即每一層的.sst文件都是按照編號從小到大排列的.這樣就得到了新的Version的每一層的所有文件.

 

參考文獻:

1.http://blog.csdn.net/u012658346/article/details/45787233

2.http://blog.csdn.net/u012658346/article/details/45788939

3.http://blog.csdn.net/joeyon1985/article/details/47154249

4.http://www.blogjava.net/sandy/archive/2012/03/15/leveldb6.html

5.http://www.pandademo.com/2016/04/compaction-of-sstable-leveldb-part-1-source-dissect-9/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM