leveldb - 並發寫入處理


在並發寫入的時候,leveldb巧妙地利用一個時間窗口做batch寫入,這部分代碼值得一讀:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  // A begin 
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;
  // A end

  // B begin 
  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }
  // B end

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

     假設同時有w1, w2, w3, w4, w5, w6 並發請求寫入。

  B部分代碼讓競爭到mutex資源的w1獲取了鎖。w1將它要寫的數據添加到了writers_隊列里去,此時隊列只有一個w1, 從而其順利的進行buildbatchgroup。當運行到34行時mutex_互斥鎖釋放,之所以這兒可以釋放mutex_,是因為其它的寫操作都不滿足隊首條件,進而不會進入log和memtable寫入階段。這時(w2, w3, w4, w5, w6)會競爭鎖,由於B段代碼中不滿足隊首條件,均等待並釋放鎖了。從而隊列可能會如(w3, w5, w2, w4).

  繼而w1進行log寫入和memtable寫入。 當w1完成log和memtable寫入后,進入46行代碼,則mutex_又鎖住,這時B段代碼中隊列因為獲取不到鎖則隊列不會修改。

  隨后59行開始,w1被pop出來,由於ready==w, 並且ready==last_writer,所以直接到71行代碼,喚醒了此時處於隊首的w3.

      w3喚醒時,發現自己是隊首,可以順利的進行進入buildbatchgroup,在該函數中,遍歷了目前所有的隊列元素,形成一個update的batch,即將w3, w5, w2, w4合並為一個batch. 並將last_writer置為此時處於隊尾的最后一個元素w4,34行代碼運行后,因為釋放了鎖資源,隊列可能隨着dbimpl::write的調用而更改,如隊列狀況可能為(w3, w5, w2, w4, w6, w9, w8).

   35-45行的代碼將w3, w5, w2, w4整個的batch寫入log和memtable. 到65行,分別對w5, w2, w4進行了一次cond signal.當判斷到完w4 == lastwriter時,則退出循環。72行則對隊首的w6喚醒,從而按上述步驟依次進行下去。

  這樣就形成了多個並發write 合並為一個batch寫入log和memtable的機制。

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM