在Linux上leveldb的安裝和使用中我們寫了這么一段測試代碼,內容以及輸出結果如下:
#include <iostream> #include <string> #include <assert.h> #include "leveldb/db.h" using namespace std; int main(void) { leveldb::DB *db; leveldb::Options options; options.create_if_missing = true; // open leveldb::Status status = leveldb::DB::Open(options,"/tmp/testdb", &db); assert(status.ok()); string key = "name"; string value = "chenqi"; // write status = db->Put(leveldb::WriteOptions(), key, value); assert(status.ok()); // read status = db->Get(leveldb::ReadOptions(), key, &value); assert(status.ok()); cout<<value<<endl; // delete status = db->Delete(leveldb::WriteOptions(), key); assert(status.ok()); status = db->Get(leveldb::ReadOptions(),key, &value); if(!status.ok()) { cerr<<key<<" "<<status.ToString()<<endl; } else { cout<<key<<"==="<<value<<endl; } // close delete db; return 0; }
chenqi name NotFound:
Leveldb的讀數據入口為db文件夾下db_impl.cc文件中的DBImpl::Get函數,首先獲取當前的版本號,然后依次在三個數據源memtable,immutable table,和sst表中進行查找,返回之前再判斷一下是否需要啟動Compact任務.
Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != NULL) { snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; } else { //獲取版本號 snapshot = versions_->LastSequence(); } //三個查找源,memtable,immutable,sstable MemTable* mem = mem_; MemTable* imm = imm_; Version* current = versions_->current(); mem->Ref(); if (imm != NULL) imm->Ref(); current->Ref(); bool have_stat_update = false; Version::GetStats stats; // Unlock while reading from files and memtables { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { //在memtable中查找 // Done } else if (imm != NULL && imm->Get(lkey, value, &s)) {//在imutable中查找 // Done } else { s = current->Get(options, lkey, value, &stats); //在磁盤文件中查找,當前Version have_stat_update = true; } mutex_.Lock(); } //判斷是否需要調度Compact if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != NULL) imm->Unref(); current->Unref(); return s; }
首先,按照leveldb代碼的慣例線上鎖,然后生成一個SequenceNumber作為標記, 后續不管線程會不會被切出去, 結果都要相當於在這個時間點瞬間完成,memtable、immemtable以及Version都由於采用了引用計數, 因此要Ref().快照建立完了, 接下來的操作只會有單純的讀, 可以把鎖暫時釋放.查詢的順序是先找memtable, 再immemtable, 最后是SSTable.這里調用了db文件夾下dbformat.cc中的LookupKey::LookupKey, 源碼內容如下:
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { size_t usize = user_key.size(); size_t needed = usize + 13; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; } else { dst = new char[needed]; } start_ = dst; dst = EncodeVarint32(dst, usize + 8); kstart_ = dst; memcpy(dst, user_key.data(), usize); dst += usize; EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); dst += 8; end_ = dst; }
這個類主要的功能是把輸入的key轉換成用於查詢的key. 比如key是"Sherry", 實際在數據庫中的表達可能會是"6Sherry", 6是長度. 這樣比對key是否相等時速度會更快.LookupKey格式 = 長度 + key + SequenceNumber + type,這里用了兩個tricks:
1.在棧上分配一個200長度的數組, 如果運行時發現長度不夠用再從堆上new一個, 可以極大避免內存分配
2.黑科技函數"EncodeVarint32", 一般key的長度不可能用滿32bit. 大量很短的Key卻要用32bit來描述長度無疑是很浪費的. 這個函數讓小數值用更少的空間, 代價是最糟要多花一字節(8bit).EncodeVarint32的代碼出現在util文件夾下的coding.cc文件里,源碼內容如下:
char* EncodeVarint32(char* dst, uint32_t v) { // Operate on characters as unsigneds unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); static const int B = 128; if (v < (1<<7)) { *(ptr++) = v; } else if (v < (1<<14)) { *(ptr++) = v | B; *(ptr++) = v>>7; } else if (v < (1<<21)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = v>>14; } else if (v < (1<<28)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = (v>>14) | B; *(ptr++) = v>>21; } else { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = (v>>14) | B; *(ptr++) = (v>>21) | B; *(ptr++) = v>>28; } return reinterpret_cast<char*>(ptr); }
現在回到DBImpl::Get函數,memtable和immutable table都是通過內存中的skiplist進行的,磁盤文件的查找是通過db文件夾下version_set.cc中Version::Get來進行的.源碼內容如下:
Status Version::Get(const ReadOptions& options, const LookupKey& k, std::string* value, GetStats* stats) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); Status s; stats->seek_file = NULL; stats->seek_file_level = -1; FileMetaData* last_file_read = NULL; int last_file_read_level = -1; // We can search level-by-level since entries never hop across // levels. Therefore we are guaranteed that if we find data // in an smaller level, later levels are irrelevant. //查找用戶提供的key可能在的文件,通過各個level的文件的最小值,最大值來判斷 //按層查找 std::vector<FileMetaData*> tmp; FileMetaData* tmp2; for (int level = 0; level < config::kNumLevels; level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; // Get the list of files to search in this level FileMetaData* const* files = &files_[level][0]; if (level == 0) { // Level-0 files may overlap each other. Find all files that // overlap user_key and process them in order from newest to oldest. tmp.reserve(num_files); for (uint32_t i = 0; i < num_files; i++) { FileMetaData* f = files[i]; if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && ucmp->Compare(user_key, f->largest.user_key()) <= 0) { tmp.push_back(f); } } if (tmp.empty()) continue; std::sort(tmp.begin(), tmp.end(), NewestFirst); files = &tmp[0]; num_files = tmp.size(); } else { // Binary search to find earliest index whose largest key >= ikey. //查找用戶提供的key可能在的文件 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); if (index >= num_files) { files = NULL; num_files = 0; } else { tmp2 = files[index]; if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { // All of "tmp2" is past any data for user_key files = NULL; num_files = 0; } else { files = &tmp2; num_files = 1; } } } for (uint32_t i = 0; i < num_files; ++i) { if (last_file_read != NULL && stats->seek_file == NULL) { // We have had more than one seek for this read. Charge the 1st file. stats->seek_file = last_file_read; stats->seek_file_level = last_file_read_level; } FileMetaData* f = files[i]; last_file_read = f; last_file_read_level = level; Saver saver; saver.state = kNotFound; saver.ucmp = ucmp; saver.user_key = user_key; saver.value = value; //在table_cache中查找key對應的value s = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue); if (!s.ok()) { return s; } switch (saver.state) { case kNotFound: break; // Keep searching in other files case kFound: return s; case kDeleted: s = Status::NotFound(Slice()); // Use empty error message for speed return s; case kCorrupt: s = Status::Corruption("corrupted key for ", user_key); return s; } } } return Status::NotFound(Slice()); // Use an empty error message for speed }
FindFile源碼內容如下:
int FindFile(const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files, const Slice& key) { uint32_t left = 0; uint32_t right = files.size(); while (left < right) { uint32_t mid = (left + right) / 2; const FileMetaData* f = files[mid]; if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) { // Key at "mid.largest" is < "target". Therefore all // files at or before "mid" are uninteresting. left = mid + 1; } else { // Key at "mid.largest" is >= "target". Therefore all files // after "mid" are uninteresting. right = mid; } } return right; }
Version::Get函數首先查找key可能存在的sst表,然后調用table_cache->Get進行查找。即對SSTable的查詢就是對table_cache_的查詢, 這個cache是不可取消的, 解決了什么問題呢?
LevelDB的數據庫"文件"是一個文件夾, 里面包含大量的文件. 這是把復雜度甩鍋給操作系統的做法, 但很多系統資源是有限的. 比如, file handle(文件句柄). 一個程序如果開了1W個file handle會浪費大量資源. 這里做個LRU cache, 只有常用的SSTable才會開一個活躍的file handle.
另外就是索引的問題. LSMT是沒有主索引的, 只有在各個SSTable內有微縮版索引. 所以, 最最優的情況下也需要2次硬盤讀寫. 第一張SSTable就存着key, 先讀微型索引, 然后二分法找到具體位置, 再讀value.
TableCache把熱點SSTable的微型索引預先放在內存里, 這樣只要1次硬盤讀取就能取到key. 這個優化對於LSMT的數據庫來說尤為重要, 因為很可能會不止查詢一張SSTable. 情況會劣化非常快.
總結, TableCache既承擔管理資源(file handle)的作用, 又加速索引的讀取.
TableCache的實現在db文件夾下table_cache.cc中,源碼內容如下:
Status TableCache::Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const Slice& k, void* arg, void (*saver)(void*, const Slice&, const Slice&)) { Cache::Handle* handle = NULL; Status s = FindTable(file_number, file_size, &handle);//查找table,沒有則新建table結構並插入table_cache if (s.ok()) { Table* t = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table; s = t->InternalGet(options, k, arg, saver); //在table中查找 cache_->Release(handle); } return s; }
該函數流程很簡單,先從table_cache中獲取Table結構,沒有則新建Table結構加入table_cache,然后調用Table::InternalGet在具體的sst表中查找.需要注意的是,在util文件夾下查看cache.cc可以看到
virtual Handle* Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value)) { const uint32_t hash = HashSlice(key); return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter); } virtual Handle* Lookup(const Slice& key) { const uint32_t hash = HashSlice(key); return shard_[Shard(hash)].Lookup(key, hash); }
這個hash table做了兩遍hash, 先把key分片一遍, 然后再扔給真正的hash table cache(有鎖)去lookup.這么做的邏輯是可以減少鎖的使用率和提升並發.
進一步查看table_cache.cc中的Table::InternalGet函數:
Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, void (*saver)(void*, const Slice&, const Slice&)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); iiter->Seek(k);//在索引中找,是否存在某個塊可能包含這個key if (iiter->Valid()) { Slice handle_value = iiter->value(); FilterBlockReader* filter = rep_->filter; BlockHandle handle; if (filter != NULL && handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { // Not found } else { //在具體的block中找 Iterator* block_iter = BlockReader(this, options, iiter->value()); block_iter->Seek(k); if (block_iter->Valid()) { (*saver)(arg, block_iter->key(), block_iter->value()); } s = block_iter->status(); delete block_iter; } } if (s.ok()) { s = iiter->status(); } delete iiter; return s; }
Table::Get函數先在table的indexblock中查找該key所處的block,然后利用Bloom Filter來過濾,最后在具體的block中查找。在查找過程中使用了Iterator機制。
總體來說,leveldb中Get操作的流程可以用下圖來說明:
參考文獻:
1.http://blog.csdn.net/joeyon1985/article/details/47154249
2.http://masutangu.com/2017/06/leveldb_1/
3.https://zhuanlan.zhihu.com/jimderestaurant?topic=LevelDB