HBase Scan流程分析
HBase的讀流程目前看來比較復雜,主要由於:
- HBase的表數據分為多個層次,HRegion->HStore->[HFile,HFile,...,MemStore]
- RegionServer的LSM-Like存儲引擎,不斷flush產生新的HFile,同時產生新的MemStore用於后續數據寫入,並且為了防止由於HFile過多而導致Scan時需要掃描的文件過多而導致的性能下降,后台線程會適時的進行Compaction,Compaction的過程會產生新的HFile,並且會刪除Compact完成的HFile
- 具體實現中的各種優化,比如lazy seek優化,導致代碼比較復雜
讀流程中充斥着各種Scanner,如下圖:
+--------------+
| |
+-----------+ RegionScanner+----------+
| +------+-------+ |
| | |
| | |
+-----v+-------+ +------v-------+ +------v+------+
| | | | | |
| StoreScanner | | StoreScanner | | StoreScanner |
| | | | | |
+--------------+ +--+---+-----+-+ +--------------+
| | |
+-----------------------+ | +----------+
| | |
| | |
+-------v---------+ +-------------v----+ +---------v------+
| | | | | |
|StoreFileScanner | |StoreFileScanner | | MemStoreScanner|
| | | | | |
+-------+---------+ +--------+---------+ +-------+--------+
| | |
| | |
| | |
| | |
+-------v---------+ +--------v---------+ +-------v--------+
| | | | | |
| HFileScanner | | HFileScanner | | HFileScanner |
| | | | | |
+-----------------+ +------------------+ +----------------+
在HBase中,一張表可以有多個Column Family,在一次Scan的流程中,每個Column Family(后續叫Store)的數據讀取由一個StoreScanner對象負責。每個Store的數據由一個內存中的MemStore和磁盤上的HFile文件組成,相對應的,StoreScanner對象雇佣一個MemStoreScanner和N個StoreFileScanner來進行實際的數據讀取。
從邏輯上看,讀取一行的數據需要
- 按照順序讀取出每個Store
- 對於每個Store,合並Store下面的相關的HFile和內存中的MemStore
實現上,這兩步都是通過堆完成。RegionScanner的讀取通過下面的多個StoreScanner組成的堆
完成,使用RegionScanner的成員變量KeyValueHeap storeHeap表示
組成StoreScanner的多個Scanner在RegionScannerImpl構造函數中獲得:
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
// 實際是StoreScanner類型
KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
scanners.add(scanner);
} else {
joinedScanners.add(scanner);
}
}
store.getScanner(scan, entry.getValue(), this.readPt)內部就是new 一個StoreScanner,邏輯都在StoreScanner的構造函數中
構造函數內部其實就是找到相關的HFile和MemStore,然后建堆,注意,這個堆是StoreScanner級別的,一個StoreScanner一個堆,堆中的元素就是底下包含的HFile和MemStore對應的StoreFileScanner和MemStoreScanner
得到相關的HFile和MemStore邏輯在StoreScanner::getScannersNoCompaction()中,內部會根據請求指定的TimeRange,KeyRange過濾掉不需要的HFile,同時也會利用bloom filter過濾掉不需要的HFIle.接着,調用
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
isParallelSeekEnabled);
對這些StoreFileScanner和MemStoreScanner分別進行seek,seekKey是matcher.getStartKey(),
如下構造
return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
Type.DeleteFamily);
Seek語義
seek是針對KeyValue的,seek的語義是seek到指定KeyValue,如果指定KeyValue不存在,則seek到指定KeyValue的下一
個。舉例來說,假設名為X的column family里有兩列a和b,文件中有兩行rowkey分別為aaa和
bbb,如下表所示.
Column Family X | ||
rowkey | column a | column b |
aaa | 1 | abc |
bbb | 2 | def |
HBase客戶端設置scan請求的start key為aaa,那么matcher.getStartKey()會被初始化為(rowkey, family, qualifier,timestamp,type)=(aaa,X,null,LATEST_TIMESTAMP,Type.DeleteFamily),根據KeyValue的比較原則,這個KeyValue比aaa行的第一個列a更
小(因為沒有qualifier),所以對這個StoreFileScanner seek時,會seek到aaa這行的第一列a
實際上
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
isParallelSeekEnabled);
有可能不會對StoreFileScanner進行實際的seek,而是進行lazy seek,seek的工作放到不得不做的時候。后續會專門說lazy seek
上面得到了請求scan涉及到的所有的column family對應的StoreScanner,隨后調用如下函數進行建堆:
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region)
throws IOException {
this.storeHeap = new KeyValueHeap(scanners, region.comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
}
}
KeyValueScanner是一個接口,表示一個可以向外迭代出KeyValue
的Scanner,StoreFileScanner,MemStoreScanner和StoreScanner都實現了該接口。這里的comparator類型為KVScannerComparator,用於比較兩個KeyValueScanner,實際上內部使用了KVComparator,它是用來比較兩個KeyValue的。從后面可以看出,實際上,這個由KeyValueScanner組成的堆,堆頂KeyValueScanner滿足的特征是: 它的堆頂(KeyValue)最小
堆用類KeyValueHeap表示,看KeyValueHeap構造函數做了什么
KeyValueHeap(List<? extends KeyValueScanner> scanners,
KVScannerComparator comparator) throws IOException {
this.comparator = comparator;
if (!scanners.isEmpty()) {
// 根據傳入的KeyValueScanner構造出一個優先級隊列(內部實現就是堆)
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator);
for (KeyValueScanner scanner : scanners) {
if (scanner.peek() != null) {
this.heap.add(scanner);
} else {
scanner.close();
}
}
//以上將元素加入堆中
// 從堆頂pop出一個KeyValueScanner放入成員變量current,那么這個堆的堆頂
// 就是current這個KeyValueScanner的堆頂,KeyValueHeap的peek()取堆頂
// 操作直接返回current.peek()
this.current = pollRealKV();
}
}
在看pollRealKV()怎么做的之前需要先看看HBase 0.94引入的Lazy Seek
Lazy Seek優化
在這個優化之前,讀取一個column family(Store),需要seek其下的所有HFile和MemStore到指定的查詢KeyValue(seek的語義為如果KeyValue存在則seek到對應位置,如果不存在,則seek到這個KeyValue的后一個KeyValue,假設Store下有3個HFile和一個MemStore,按照時序遞增記為[HFile1, HFile2, HFile3, MemStore],在lazy seek優化之前,需要對所有的HFile和MemStore進行seek,對HFile文件的seek比較慢,往往需要將HFile相應的block加載到內存,然后定位。在有了lazy seek優化之后,如果需要的KeyValue在HFile3中就存在,那么HFIle1和HFile2都不需要進行seek,大大提高速度。大體來說,思路是請求seek某個KeyValue時實際上沒有對StoreFileScanner進行真正的seek,而是對於每個StoreFileScanner,設置它的peek為(rowkey,family,qualifier,lastTimestampInStoreFile)
KeyValueHeap有兩個重要的接口,peek()和next(),他們都是返回堆頂,區別在於next()會將堆頂出堆,然后重新調整堆,對外來說就是迭代器向前移動,而peek()不會將堆頂出堆,堆頂不變。實現中,
peek()操作非常簡單,只需要調用堆的成員變量current的peek()方法操作即可.拿StoreScanner堆舉例,current要么是StoreFileScanner類型要么是MemStore,那么到底current是如何選擇出來的以及Lazy Seek是如何實現的?
下面舉個例子說明。
前提:
HBase開啟了Lazy Seek優化(實際上默認開啟)
假設:
Store下有三個HFile和MemStore,按照時間順序記作[HFile1,HFile2,HFile3,MemStore],seek KeyValue為(rowkey,family,qualifier,timestamp),記作seekKV.
並且它只在HFile3中存在,不在其他HFile和MemStore中存在
Lazy Seek過程
seekScanner()的邏輯,如果是lazy seek,則對於每個Scanner都調
用requestSeek(seekKV)方法,方法內部首先進行rowcol類型的bloom filter過濾
- 如果結果判定seekKV在StoreFile中肯定不存在,則直接設置StoreFileScanner的peek(實際上StoreFileScanner不是一個
堆只是為了統一代碼)為 kv.createLastOnRowCol(),並且將realSeekDone設置true,表示實際的seek完成.
public KeyValue createLastOnRowCol() {
return new KeyValue(
bytes, getRowOffset(), getRowLength(),
bytes, getFamilyOffset(), getFamilyLength(),
bytes, getQualifierOffset(), getQualifierLength(),
HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
}
可以看出ts設置為最小,說明這個KeyValue排在所有的同rowkey同column family同qualifier的KeyValue最后。顯然,當上層StoreScanner取堆頂時,
如果其它StoreFileScanner/MemStoreScanner中存在同rowkey同column family同qualifier的真實的KeyValue則會優先彈出。
- 如果seekKV在StoreFile中,那么會執行如下邏輯:
realSeekDone = false;
long maxTimestampInFile = reader.getMaxTimestamp();
long seekTimestamp = kv.getTimestamp();
if (seekTimestamp > maxTimestampInFile) {
// Create a fake key that is not greater than the real next key.
// (Lower timestamps correspond to higher KVs.)
// To understand this better, consider that we are asked to seek
// to
// a higher timestamp than the max timestamp in this file. We
// know that
// the next point when we have to consider this file again is
// when we
// pass the max timestamp of this file (with the same
// row/column).
cur = kv.createFirstOnRowColTS(maxTimestampInFile);
} else {
enforceSeek();
}
顯然,當kv的ts比HFile中最大的ts都更大時,那么這個HFile中顯然不存在seekKV,但是可能存在
相同rowkey,family,qualifier的不同ts的KeyValue,那么這里設置堆頂時要注意,不能把堆頂設置為比當前HFile文件中的可能真實存在的相同rowkey,family,qualifier的KeyValue大,如下:
public KeyValue createFirstOnRowColTS(long ts) {
return new KeyValue(
bytes, getRowOffset(), getRowLength(),
bytes, getFamilyOffset(), getFamilyLength(),
bytes, getQualifierOffset(), getQualifierLength(),
ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
}
Type的比較中,Type.Maximum最小,這樣產生的KeyValue保證了不會大於當前HFile文件中的可能存在的相同rowkey,family,qualifier的KeyValue,同時將seekKV保存到StoreFileScanner成員變量delayedSeekKV中,以便后續真正seek的時候獲取.
考慮一下如果seekKV的ts比當前HFile中的maxTimestamp更小怎么辦?可以設置一個ts為latest_timestamp
的KeyValue么?如果設置了,它會比其它HFile中存在實際的KeyValue先彈出,這樣順序就亂了,所以這種情況下,只能進行實際的seek,enforceSeek()函數中進行實際的seek后,將realSeekDone設置為
true.
取StoreScanner堆頂邏輯
因為HFile3的latestTimestampInStoreFile最大,所以會首先取到HFile3對應的StoreFileScanner的pee
k(KeyValue的比較原則是timestamp大的KeyValue更小),
這個時候會檢查這個KeyValueScanner是否進行了實際的seek(對於StoreFileScanner來說,通過布爾變量realSeekDone進行標記,對於MemStoreScanner來說,始終返回true),在這里,沒有進行real seek
,接着進行實際的seek操作,seek到HFile3中存在的seekKV,接着拿着seekKV去和HFile2的peek進行比較,顯然seekKV比HFile2的peek小(由於timestamp > lastTimestampInStoreFile2),故
StoreScanner的peek操作返回seekKV。
實現中,KeyValueHeap有兩個重要的接口,peek()和next(),他們都是返回堆頂,區別在於next()會將堆頂出堆,然后重新調整堆,對外來說就是迭代器向前移動,而peek()不會將堆頂出堆,堆頂不變。實現中,
peek()操作非常簡單,只需要調用堆的成員變量current的peek()方法操作即可.拿StoreScanner堆舉例,current要么是StoreFileScanner類型要么是MemStore,而current的選擇則是pollRealKV()
完成的,這個函數之所以內部有while循環就是因為考慮了Lazy Seek優化,實際上,pollRealKV()代碼的邏輯就是例子中"取StoreScanner堆頂邏輯"。pollRealKV()的返回值會賦給current
protected KeyValueScanner pollRealKV() throws IOException {
KeyValueScanner kvScanner = heap.poll();
if (kvScanner == null) {
return null;
}
while (kvScanner != null && !kvScanner.realSeekDone()) {
if (kvScanner.peek() != null) {
kvScanner.enforceSeek();
KeyValue curKV = kvScanner.peek();
if (curKV != null) {
KeyValueScanner nextEarliestScanner = heap.peek();
if (nextEarliestScanner == null) {
// The heap is empty. Return the only possible scanner.
return kvScanner;
}
// Compare the current scanner to the next scanner. We try to avoid
// putting the current one back into the heap if possible.
KeyValue nextKV = nextEarliestScanner.peek();
if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
// We already have the scanner with the earliest KV, so return it.
return kvScanner;
}
// Otherwise, put the scanner back into the heap and let it compete
// against all other scanners (both those that have done a "real
// seek" and a "lazy seek").
heap.add(kvScanner);
} else {
// Close the scanner because we did a real seek and found out there
// are no more KVs.
kvScanner.close();
}
} else {
// Close the scanner because it has already run out of KVs even before
// we had to do a real seek on it.
kvScanner.close();
}
kvScanner = heap.poll();
}
return kvScanner;
}
Store下HFile集合發生變化如何處理
內存中的Memstore被flush到文件系統或者compaction完成都會改變Store的HFile文件集合。
在每次做完一批mutate操作后,會通過HRegion::isFlushSize(newSize)檢查是否需要對當前HRegion內的memstore進行flush
其實就是判斷HRegion內的所有的memstore大小和是否大於hbase.hregion.memstore.flush.size,默認128MB,如果需要flush,會將請求放入后台flush線程(MemStoreFlusher)的隊列中,由后台flush線程處理,調用路徑HRegion::flushcache()->internalFlushcache(...)->StoreFlushContext.flushCache(...)->StoreFlushContext.commit(...)=>HStore::updateStorefiles(),這塊邏輯在HBase Snapshot原理和實現中有講到,這里不贅述。只說一下最后一步的updateStorefiles()操作,該函數主要工作是拿住HStore級別的寫鎖,然后將新產生的HFile文件插入到StoreEngine中,解寫鎖,然后釋放snapshot,最后調用
notifyChangedReadersObservers(),如下:
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
this.memstore.clearSnapshot(set);
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
// the lock.
this.lock.writeLock().unlock();
}
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
重點在於notifyChangedReadersObservers(),看看代碼:
private void notifyChangedReadersObservers() throws IOException {
for (ChangedReadersObserver o: this.changedReaderObservers) {
o.updateReaders();
}
}
實際上,每個observer類型都是StoreScanner,每次新開一個StoreScanner都會注冊在Store內部的這個observer集合中,當Store下面的HFile集合變化時,通知這些注冊上來的StoreScanner即可。
具體的通知方式就是首先拿住StoreScanner的鎖,將這個時候的堆頂保存在成員變量lastTop中,
然后將StoreScanner內部的堆置為null(this.heap=null)最后解鎖,而StoreScanner那邊next/seek/reseek時,都會首先通過函數checkReseek()函數來檢查是否this.heap為null,為null
,為null說明當前Store下的HFile集合改變了,那么調用resetScannerStack(lastTop),將當前
Store下的所有StoreFileScanner/MemStoreScanner都seek到lastTop,然后重新建StoreScanner對應的堆。checkReseek()代碼如下:
protected boolean checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
LOG.debug("Storescanner.peek() is changed where before = " + this.lastTop.toString()
+ ",and after = " + this.heap.peek());
this.lastTop = null;
return true;
}
this.lastTop = null; // gone!
}
// else dont need to reseek
return false;
}