本文結合HBase 0.94.1版本源碼,對HBase的Block Cache實現機制進行分析,總結學習其Cache設計的核心思想。
1. 概述
HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用於讀。
- 寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低於限制。
- 讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。由於BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。
一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,否則HBase不能正常啟動。
默認配置下,BlockCache為0.2,而Memstore為0.4。在注重讀響應時間的應用場景下,可以將 BlockCache設置大些,Memstore設置小些,以加大緩存的命中率。
HBase RegionServer包含三個級別的Block優先級隊列:
- Single:如果一個Block第一次被訪問,則放在這一優先級隊列中;
- Multi:如果一個Block被多次訪問,則從Single隊列移到Multi隊列中;
- InMemory:如果一個Block是inMemory的,則放到這個隊列中。
以上將Cache分級思想的好處在於:
- 首先,通過inMemory類型Cache,可以有選擇地將in-memory的column families放到RegionServer內存中,例如Meta元數據信息;
- 通過區分Single和Multi類型Cache,可以防止由於Scan操作帶來的Cache頻繁顛簸,將最少使用的Block加入到淘汰算法中。
默認配置下,對於整個BlockCache的內存,又按照以下百分比分配給Single、Multi、InMemory使用:0.25、0.50和0.25。
注意,其中InMemory隊列用於保存HBase Meta表元數據信息,因此如果將數據量很大的用戶表設置為InMemory的話,可能會導致Meta表緩存失效,進而對整個集群的性能產生影響。
2. 源碼分析
下面是對HBase 0.94.1中相關源碼(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析過程。
2.1加入Block Cache
/** Concurrent map (the cache) */ private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map; /** * Cache the block with the specified name and buffer. * <p> * It is assumed this will NEVER be called on an already cached block. If * that is done, an exception will be thrown. * @param cacheKey block's cache key * @param buf block buffer * @param inMemory if block is in-memory */ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) { CachedBlock cb = map.get(cacheKey); if(cb != null) { throw new RuntimeException("Cached an already cached block"); } cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); elements.incrementAndGet(); if(newSize > acceptableSize() && !evictionInProgress) { runEviction(); } } /** * Cache the block with the specified name and buffer. * <p> * It is assumed this will NEVER be called on an already cached block. If * that is done, it is assumed that you are reinserting the same exact * block due to a race condition and will update the buffer but not modify * the size of the cache. * @param cacheKey block's cache key * @param buf block buffer */ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) { cacheBlock(cacheKey, buf, false); }
1) 這里假設不會對同一個已經被緩存的BlockCacheKey重復放入cache操作;
2) 根據inMemory標志創建不同類別的CachedBlock對象:若inMemory為true則創建BlockPriority.MEMORY類型,否則創建BlockPriority.SINGLE;注意,這里只有這兩種類型的Cache,因為BlockPriority.MULTI在Cache Block被重復訪問時才進行創建,見CachedBlock的access方法代碼:
/** * Block has been accessed. Update its local access time. */ public void access(long accessTime) { this.accessTime = accessTime; if(this.priority == BlockPriority.SINGLE) { this.priority = BlockPriority.MULTI; } }
3) 將BlockCacheKey和創建的CachedBlock對象加入到全局的ConcurrentHashMap map中,同時做一些更新計數操作;
4) 最后判斷如果加入后的Block Size大於設定的臨界值且當前沒有淘汰線程運行,則調用runEviction()方法啟動LRU淘汰過程:
/** Eviction thread */ private final EvictionThread evictionThread; /** * Multi-threaded call to run the eviction process. */ private void runEviction() { if(evictionThread == null) { evict(); } else { evictionThread.evict(); } }
其中,EvictionThread線程即是LRU淘汰的具體實現線程。下面將給出詳細分析。
2.2淘汰Block Cache
EvictionThread線程主要用於與主線程的同步,從而完成Block Cache的LRU淘汰過程。
/* * Eviction thread. Sits in waiting state until an eviction is triggered * when the cache size grows above the acceptable level.<p> * * Thread is triggered into action by {@link LruBlockCache#runEviction()} */ private static class EvictionThread extends HasThread { private WeakReference<LruBlockCache> cache; private boolean go = true; public EvictionThread(LruBlockCache cache) { super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread"); setDaemon(true); this.cache = new WeakReference<LruBlockCache>(cache); } @Override public void run() { while (this.go) { synchronized(this) { try { this.wait(); } catch(InterruptedException e) {} } LruBlockCache cache = this.cache.get(); if(cache == null) break; cache.evict(); } } public void evict() { synchronized(this) { this.notify(); // FindBugs NN_NAKED_NOTIFY } } void shutdown() { this.go = false; interrupt(); } }
EvictionThread線程啟動后,調用wait被阻塞住,直到EvictionThread線程的evict方法被主線程調用時執行notify(見上面的代碼分析過程,通過主線程的runEviction方法觸發調用),開始執行LruBlockCache的evict方法進行真正的淘汰過程,代碼如下:
/** * Eviction method. */ void evict() { // Ensure only one eviction at a time if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size.get(); long bytesToFree = currentSize - minSize(); if (LOG.isDebugEnabled()) { LOG.debug("Block cache LRU eviction started; Attempting to free " + StringUtils.byteDesc(bytesToFree) + " of total=" + StringUtils.byteDesc(currentSize)); } if(bytesToFree <= 0) return; // Instantiate priority buckets BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); // Scan entire map putting into appropriate buckets for(CachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE: { bucketSingle.add(cachedBlock); break; } case MULTI: { bucketMulti.add(cachedBlock); break; } case MEMORY: { bucketMemory.add(cachedBlock); break; } } } PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; long bytesFreed = 0; BlockBucket bucket; while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } if (LOG.isDebugEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); LOG.debug("Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc(this.size.get()) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }
1)首先獲取鎖,保證同一時刻只有一個淘汰線程運行;
2)計算得到當前Block Cache總大小currentSize及需要被淘汰釋放掉的大小bytesToFree,如果bytesToFree小於等於0則不進行后續操作;
3) 初始化創建三個BlockBucket隊列,分別用於存放Single、Multi和InMemory類Block Cache,其中每個BlockBucket維護了一個CachedBlockQueue,按LRU淘汰算法維護該BlockBucket中的所有CachedBlock對象;
4) 遍歷記錄所有Block Cache的全局ConcurrentHashMap,加入到相應的BlockBucket隊列中;
5) 將以上三個BlockBucket隊列加入到一個優先級隊列中,按照各個BlockBucket超出bucketSize的大小順序排序(見BlockBucket的compareTo方法);
6) 遍歷優先級隊列,對於每個BlockBucket,通過Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)計算出需要釋放的空間大小,這樣做可以保證盡可能平均地從三個BlockBucket中釋放指定的空間;具體實現過程詳見BlockBucket的free方法,從其CachedBlockQueue中取出即將被淘汰掉的CachedBlock對象:
public long free(long toFree) { CachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { freedBytes += evictBlock(cb); if (freedBytes >= toFree) { return freedBytes; } } return freedBytes; }
7) 進一步調用了LruBlockCache的evictBlock方法,從全局ConcurrentHashMap中移除該CachedBlock對象,同時更新相關計數:
protected long evictBlock(CachedBlock block) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); elements.decrementAndGet(); stats.evicted(); return block.heapSize(); }
8) 釋放鎖,完成善后工作。
3. 總結
以上關於Block Cache的實現機制,核心思想是將Cache分級,這樣的好處是避免Cache之間相互影響,尤其是對HBase來說像Meta表這樣的Cache應該保證高優先級。