日志結構合並樹LSM代碼解讀


1. LSM樹的由來
  1.1. 索引結構特征
  a. 哈希存儲引擎: 是哈希表的持久化實現,支持增、刪、改以及隨機讀取操作,但不支持順序掃描,對應的存儲系統為key-value存儲系統。
  b. B樹存儲引擎是B樹: 不僅支持單條記錄的增、刪、讀、改操作,還支持順序掃描, 因此B樹是傳統關系型數據庫中索引結構的不二人選。
  但從技術角度:由於磁盤的(磁柱、磁盤、磁道、磁頭)結構與B樹結構的特點,導致傳統B樹索引存在着隨機寫效率的上限挑戰,所以當在那些索引插入頻率遠大於查詢頻率的應用場景下,如歷史記錄表和日志文件來說,B樹索引顯得捉襟見肘了。

 1.2. BTree的隨機寫特點
  一個BTree,對於在沒有緩存的Case情況下, 一個隨機寫分為兩步進行:1. 從磁盤Load目標塊節點到內存,2.修改它並寫回磁盤。所以,BTree在對於隨機key值下的平均“blind-write”操作需要兩次IO操作,其限定了BTree的隨機寫吞吐量。

   1.3. LSM“blind-write”吞吐量 
  既然隨機寫相對昂貴的,LSM采用“有序map的數據分層與延遲寫”(all sorted-map write-deferral)的策略而代替立即寫的操作;同時為了避免數據層數的過多造成對讀的性能的傷害,數據層級之間會周期性地觸發自頂向下地進行合並的操作。隨機寫的平均IO此時用式子表示:(O*R/B), 其中O表示數據的層數,R表示數據記錄的平均大小,B表示Block的大小。如:如果block的大小是16k,數據分層是10層,平均記錄大小100byte,那隨機寫的平均IO次數是0.06,當然這只是理論上的分析方法, 接下來用攜程開源的Key/Value存儲引擎SessionDB中的LSM實現的案例來分析其的基本結構,其實LSM已在許多開源的存儲引擎中存在,如Google的leveldb,Facebook的rocksdb,Hadoop中的HBase等。

2. Log Structured Merge Tree
     LSM樹是一個按數據新鮮度進行分層依次駐落在內存與磁盤的多層數據結構。

 2.1. ActiveMemTable數據結構  

      a. 數據邏輯結構:
         一個含有二級索引的三層結構,其參考圖:

      b. 數據存儲格式樣例如下:


      c. 數據dump條件與流程:
        數據下放條件有2個:a.Key的總規模數上限,如128k; b.Data的總大小上限,如128M。其數據dump下放的流程:1. 首先Check該Table是否達到存儲上限;  2. 如果達到上限,則把整個表標記為Immutable;3. 然后,采用Copy-On-Write技術將整塊Table拷貝到C0隊列中; 4. 最后,重新New一個新的ActiveInMemTable的HashMapTable, 代碼如下:   

boolean success = this.activeInMemTables[shard].put(key, value, timeToLive, createdTime, isDelete);
 
if (!success) { // overflow
    synchronized(activeInMemTableCreationLocks[shard]) {
        success = this.activeInMemTables[shard].put(key, value, timeToLive, createdTime, isDelete); // other thread may have done the creation work
        if (!success) { // move to level queue 0
            this.activeInMemTables[shard].markImmutable(true);
            LevelQueue lq0 = this.levelQueueLists[shard].get(LEVEL0);
            lq0.getWriteLock().lock();
            try {
                lq0.addFirst(this.activeInMemTables[shard]);
            } finally {
                lq0.getWriteLock().unlock();
            }
            @SuppressWarnings("resource")
            HashMapTable tempTable = new HashMapTable(dir, shard, LEVEL0, System.nanoTime());
            tempTable.markUsable(true);
            tempTable.markImmutable(false); //mutable
            tempTable.put(key, value, timeToLive, createdTime, isDelete);
            // switch on
            this.activeInMemTables[shard] = tempTable;
        }
    }

  2.2. C0層數據

     C0屬於一個索引與數據完全存於內存的二維隊列,是一個存放着熱點數據ActiveMemTable的鏈表。
     a. 歸並壓縮條件
         通過周期時間與隊列Table個數兩個維度判斷是否進行歸並壓縮:a. 心跳線程周期性檢查,如2s b. Check下隊列中Table的個數,如是否大於1個,源碼如下:

		while(!stop) {
			try {
				LevelQueue levelQueue0 = levelQueueList.get(SDB.LEVEL0);
				if (levelQueue0 != null && levelQueue0.size() >= DEFAULT_MERGE_WAYS) {
					log.info("Start running level 0 merge thread at " + DateFormatter.formatCurrentDate());
					log.info("Current queue size at level 0 is " + levelQueue0.size());

					long start = System.nanoTime();
					LevelQueue levelQueue1 = levelQueueList.get(SDB.LEVEL1);
					mergeSort(levelQueue0, levelQueue1, DEFAULT_MERGE_WAYS, sdb.getDir(), shard);
					stats.recordMerging(SDB.LEVEL0, System.nanoTime() - start);

					log.info("Stopped running level 0 merge thread at " + DateFormatter.formatCurrentDate());
				} else {
					Thread.sleep(MAX_SLEEP_TIME);
				}
			} catch (Exception ex) {
				log.error("Error occured in the level0 merge dumper", ex);
			}

		}
		this.countDownLatch.countDown();
		log.info("Stopped level 0 merge thread " + this.getName());	

      b. 歸並Compaction與dump過程   

           其合並過程:1. 首先按Key值進行排序; 2. 依次放入C1隊列的隊首;3. 持久化並且放入C1層數據,對於每一個Sorted表,其持久化文件命名規則:shard + "-" + level + "-" + createdTime。

    2.3. C1與C2層數據

       C1與C2層數據都擱置於磁盤上。
    a. 歸並壓縮條件
       通過周期時間與隊列Table個數兩個維度判斷是否進行歸並壓縮:1. 心跳線程周期性檢查,如5s;2. Check下隊列中上一層C1中Table的個數,如是否大於4個,源碼如下:

		while(!stop) {
			try {
				boolean merged = false;
				LevelQueue lq1 = levelQueueList.get(SDB.LEVEL1);
				LevelQueue lq2 = levelQueueList.get(SDB.LEVEL2);
				boolean hasLevel2MapTable = lq2.size() > 0;
				if ((!hasLevel2MapTable && lq1.size() >= DEFAULT_MERGE_WAYS) ||
					(hasLevel2MapTable && lq1.size() >= DEFAULT_MERGE_WAYS - 1)) {
					log.info("Start running level 1 merging at " + DateFormatter.formatCurrentDate());
					log.info("Current queue size at level 1 is " + lq1.size());
					log.info("Current queue size at level 2 is " + lq2.size());

					long start = System.nanoTime();
					mergeSort(lq1, lq2, DEFAULT_MERGE_WAYS, sdb.getDir(), shard);
					stats.recordMerging(SDB.LEVEL1, System.nanoTime() - start);
					merged = true;
					log.info("End running level 1 to 2 merging at " + DateFormatter.formatCurrentDate());
				}

				if (!merged) {
					Thread.sleep(MAX_SLEEP_TIME);
				}
			} catch (Exception ex) {
				log.error("Error occured in the level 1 to 2 merger", ex);
			}
		}
		this.countDownLatch.countDown();
		log.info("Stopped level 1 to 2 merge thread " + this.getName());	

     b. 歸並Compaction過程

      其與C0->C1層數據的合並流程類似,持久化的樣例結構如下:

 2.4 分層結構特征對比
       參考鏈接給出其各數據層的對比結構:

    2.5 寫操作 
       Put操作發生且僅發生在當前活躍的ActiveMapTable,操作涉及一次內存映射文件寫入和一次內存Hashmap的寫入,代碼流程如下: 

{
// 1.write index_metadata
ByteBuffer tempIndexBuf = ByteBuffer.allocate(INDEX_ITEM_LENGTH);
tempIndexBuf.putLong(IMapEntry.INDEX_ITEM_IN_DATA_FILE_OFFSET_OFFSET, tempToAppendDataFileOffset);
tempIndexBuf.putInt(IMapEntry.INDEX_ITEM_KEY_LENGTH_OFFSET, key.length);
tempIndexBuf.putInt(IMapEntry.INDEX_ITEM_VALUE_LENGTH_OFFSET, value.length);
tempIndexBuf.putLong(IMapEntry.INDEX_ITEM_TIME_TO_LIVE_OFFSET, timeToLive);
tempIndexBuf.putLong(IMapEntry.INDEX_ITEM_CREATED_TIME_OFFSET, createdTime);
tempIndexBuf.putInt(IMapEntry.INDEX_ITEM_KEY_HASH_CODE_OFFSET, keyHash);
byte status = 1; // mark in use
if (markDelete) {
  status = (byte) (status + 2); // binary 11
}
if (compressed && !markDelete) {
  status = (byte) (status + 4);
}
tempIndexBuf.put(IMapEntry.INDEX_ITEM_STATUS, status); // mark in use

//2. write local_offset index
int offsetInIndexFile = INDEX_ITEM_LENGTH * (int)tempToAppendIndex;
ByteBuffer localIndexBuffer = this.localIndexMappedByteBuffer.get();
localIndexBuffer.position(offsetInIndexFile);
//indexBuf.rewind();
localIndexBuffer.put(tempIndexBuf);

//3. write key/value
ByteBuffer localDataBuffer = this.localDataMappedByteBuffer.get();
localDataBuffer.position((int)tempToAppendDataFileOffset);
localDataBuffer.put(ByteBuffer.wrap(key));
localDataBuffer.position((int)tempToAppendDataFileOffset + key.length);
localDataBuffer.put(ByteBuffer.wrap(value));

this.hashMap.put(new ByteArrayWrapper(key), new InMemIndex((int)tempToAppendIndex));
return new MMFMapEntryImpl((int)tempToAppendIndex, localIndexBuffer, localDataBuffer);
}

    2.6 讀操作

  輸入一個給定的key值,其訪問流程如下:

  a. 首先會查找C0層:HashMap->Index_Meta->SequentialMapData
  b. 當C0層查找失敗時,然后訪問C1層:BloomFilter->Index_Meta->SortedMapData
  c. 當C1層查找失敗時,最后訪問C2層:BloomFilter->Index_Meta->SortedMapData

 

   參考: 
  1. http://120.52.72.41/paperhub.s3.amazonaws.com/c3pr90ntcsf0/18e91eb4db2114a06ea614f0384f2784.pdf
  2. https://github.com/google/leveldb
  3. https://github.com/facebook/rocksdb
  4. https://github.com/ctriposs/sessdb


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM