版權聲明:本文為博主原創文章,遵循版權協議,轉載請附上原文出處鏈接和本聲明。
在介紹HBASE flush源碼之前,我們先在邏輯上大體梳理一下,便於后續看代碼。flush的整體流程分三個階段
1.第一階段:prepare階段,這個階段主要是將當前memstore的內存結構做snapshot。HBASE寫入內存的數據結構(memstore以及snapshot)是跳躍表,用的是jdk自帶的ConcurrentSkipListMap結構。這個過程其實就是將memstore賦值給snapshot,並構造一個新的memstore。
2.第二階段:flushcache階段,這個階段主要是將第一階段生成的snapshot flush到disk,但是注意這里是將其flush到temp文件,此時並沒有將生成的hfile move到store實際對應的cf路徑下,move是發生在第三階段。
3.第三階段:commit階段。這個階段主要是將第二階段生成的hfile move最終正確的位置。
上面是HBASE flush的邏輯流程,flush是region級別,涉及到的類很多,下面我們開始介紹一下Flush相關的操作。
flush線程啟動
- 在regionserver啟動時,會調用startServiceThread方法啟動一些服務線程,其中
// Cache flushing
protected MemStoreFlusher cacheFlusher;
。。。。。省略。。。。。。
private void startServiceThreads() throws IOException { 。。。。其他代碼省略。。。 this.cacheFlusher.start(uncaughtExceptionHandler); }
- 而cacheFlusher是MemStoreFlusher類的實例,在梳理上述邏輯之前首先介紹兩個MemStoreFlusher的變量
-
//該變量是一個BlockingQueue<FlushQueueEntry>類型的變量。 // 主要存儲了FlushRegionEntry類型刷新請求實例,以及一個喚醒隊列WakeupFlushThread實例對象。 private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>(); //同時也會把加入到flushqueue中的requst加入到regionsInQueue中。 private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>(); - MemStoreFlusher的start方法如下:
synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
server.getServerName().toShortString() + "-MemStoreFlusher", eh);
for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]);
flushHandlers[i].start();
}
}
會根據配置flusher.handler.count生成相應個數的flushHandler線程。然后對每一個flushHandler線程調用start方法。我們繼續看一下flushHandler。
private class FlushHandler extends HasThread {
private FlushHandler(String name) {
super(name);
}
@Override
public void run() {
//如果server正常沒有stop
while (!server.isStopped()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
//阻塞隊列的poll方法,如果沒有會阻塞在這
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) {
// 如果沒有flush request或者flush request是一個全局flush的request。
if (isAboveLowWaterMark()) {
// 檢查所有的memstore是否超過max_heap * hbase.regionserver.global.memstore.lowerLimit配置的值,默認0.35
// 超過配置的最小memstore的值,flush最大的一個memstore的region
LOG.debug("Flush thread woke up because memory above low water="
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
if (!flushOneForGlobalPressure()) {
// 如果沒有任何Region需要flush,但已經超過了lowerLimit。
// 這種情況不太可能發生,除非可能會在關閉整個服務器時發生,即有另一個線程正在執行flush regions。
// 只里只需要sleep一下,然后喚醒任何被阻塞的線程再次檢查。
// Wasn't able to flush any region, but we're above low water mark
// This is unlikely to happen, but might happen when closing the
// entire server - another thread is flushing regions. We'll just
// sleep a little bit to avoid spinning, and then pretend that
// we flushed one, so anyone blocked will check again
Thread.sleep(1000);
wakeUpIfBlocking();
}
// Enqueue another one of these tokens so we'll wake up again
wakeupFlushThread();
}
//阻塞超時后也會繼續continue
continue;
}
// 如果是正常的flush request
// 單個region memstore大小超過hbase.hregion.memstore.flush.size配置的值,默認128M,執行flush操作
FlushRegionEntry fre = (FlushRegionEntry) fqe;
if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
LOG.error("Cache flusher failed for entry " + fqe, ex);
if (!server.checkFileSystem()) {
break;
}
}
}
//結束MemStoreFlusher的線程調用,通常是regionserver stop,這個是在while循環之外的
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}
// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
LOG.info(getName() + " exiting");
}
現在我們看是看梳理一下FlusherHandler的run方法的邏輯
- 只要rs不掛,就一直循環判斷有沒有flushrequest
- 通過flushqueue.poll來阻塞,應該flushqueue是阻塞隊列,當隊列為空時會阻塞,直到超時。
- 如果不為空,取出一個request,調用MemStoreFlusher.flushRegion(fre)
Flush流程
可見是調用的MemStoreFlusher.flushRegion方法進行flush的,我們繼續跟進flushRegion一探究竟。
private boolean flushRegion(final FlushRegionEntry fqe) {
//在FlushQueueEntry中取出region信息
HRegion region = fqe.region;
//如果region不是metaregion並且含有太多的storefile,則隨機blcoking.
//tooManyStoreFiles默認的閾值時7,同時也要看hbase.hstore.blockingStoreFiles配置的值,沒有配置取默認值7
if (!region.getRegionInfo().isMetaRegion() &&
isTooManyStoreFiles(region)) {
//判斷是否已經wait了設置的時間
if (fqe.isMaximumWait(this.blockingWaitTime)) {
LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
"ms on a compaction to clean up 'too many store files'; waited " +
"long enough... proceeding with flush of " +
region.getRegionNameAsString());
} else {
// If this is first time we've been put off, then emit a log message.
//如果當前flush是第一次加入到flush queue
if (fqe.getRequeueCount() <= 0) {
// Note: We don't impose blockingStoreFiles constraint on meta regions
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
//flush前判斷該region是否需要split,如果不需要split,同時因為又太多的storefiles,因此調用過一次compact
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
this.server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
} catch (IOException e) {
LOG.error(
"Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
RemoteExceptionHandler.checkIOException(e));
}
}
}
// Put back on the queue. Have it come back out of the queue
// after a delay of this.blockingWaitTime / 100 ms.
//如果有too manyfile的region已經超過了隨機延遲的時間,加入flushqueue隊列,喚醒handler開始flush
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
// Tell a lie, it's not flushed but it's ok
return true;
}
}
//正常情況下的flush
return flushRegion(region, false, fqe.isForceFlushAllStores());
}
該方法中會判斷要flush的region是否有過多的hfile,如果是則隨機wait一定的時間。wait完成后加入flushqueue喚醒handler開始flush。在正常的情況下最終是調用MemStoreFlusher的重載函數flushRgion(region,flase, isForceFlushAllStores),那我們繼續跟進該重載函數。
private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
boolean forceFlushAllStores) {
long startTime = 0;
//枷鎖
synchronized (this.regionsInQueue) {
//在regioninQueue中移除該region
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null) {
startTime = fqe.createTime;
}
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
if (startTime == 0) {
// Avoid getting the system time unless we don't have a FlushRegionEntry;
// shame we can't capture the time also spent in the above synchronized
// block
startTime = EnvironmentEdgeManager.currentTime();
}
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
//最終是調用region的flushcache
HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
server.metricsRegionServer.updateFlushTime(endTime - startTime);
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
}
return true;
}
其他無關的代碼這里不再細說,之間看標紅的位置,核心邏輯在這里,可以看到是調用的region.flushcache(isForceFlushAllStores),因此flush是region級別。同時在flush完成后會判斷是否需要進行split,如果不需要split會將判斷是否需要compact。繼續跟進看下里面做了啥。
//flush cache,參數意義為是否需要flush所有的store
public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
// fail-fast instead of waiting on the lock
//判斷當前region是否處於closing狀態,
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
LOG.debug(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
//此處加了鎖
lock.readLock().lock();
try {
if (this.closed.get()) {
String msg = "Skipping flush on " + this + " because closed";
LOG.debug(msg);
status.abort(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
}
// TODO: this should be managed within memstore with the snapshot, updated only after flush
// successful
if (numMutationsWithoutWAL.get() > 0) {
numMutationsWithoutWAL.set(0);
dataInMemoryWithoutWAL.set(0);
}
synchronized (writestate) {
//此次flush之前 該region並沒有在flush,是否還處於write狀態
if (!writestate.flushing && writestate.writesEnabled) {
this.writestate.flushing = true;
} else {//否則表示該region正處於flushing狀態或者不可寫,abort flush
if (LOG.isDebugEnabled()) {
LOG.debug("NOT flushing memstore for region " + this
+ ", flushing=" + writestate.flushing + ", writesEnabled="
+ writestate.writesEnabled);
}
String msg = "Not flushing since "
+ (writestate.flushing ? "already flushing"
: "writes not enabled");
status.abort(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
}
try {
//根據參數forceFlushAllStores判斷是否需要所有的store都進行flush,否側按照flush策略進行選擇
//非全局flush的選擇策略:flushSizeLowerBound是參數hbase.hregion.percolumnfamilyflush.size.lower.bound,默認16M或者不滿足大小,
//但是該memstore足夠老
Collection<Store> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
//調用internalFlushcache進行flush
FlushResult fs = internalFlushcache(specificStoresToFlush, status);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
coprocessorHost.postFlush();
}
status.markComplete("Flush successful");
return fs;
} finally {
synchronized (writestate) {
writestate.flushing = false;
this.writestate.flushRequested = false;
writestate.notifyAll();
}
}
} finally {
lock.readLock().unlock();
status.cleanup();
}
}
核心邏輯在FlushResult fs = internalFlushcache(specificStoresToFlush, status);里面涉及到了具體的三個階段,其中prepare的第一階段是調用了region.internalPrepareFlushCache()實現的,第二階段flush以及第三階段commit階段,是通過internalFlushAndCommit()進行的。我們現在看下具體的internalFlushCache方法的邏輯:
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
//internalPrepareFlushCache執行snapshot,打快照
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, false);
//返回的result中的result是null.因此會執行internalFlushchacheAndCommit方法執行第二和第三階段。
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}
現在我們看一下第一階段: internalPrepareFlushCache。里面有一把region級別的updatelock。,這個里面代碼比較多,可以先忽略不重要的部分
//該方法用來執行flush的prepare階段
protected PrepareFlushResult internalPrepareFlushCache(
final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
MonitoredTask status, boolean isReplay)
throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
}
//便於計算flush耗時,記錄開始時間
final long startTime = EnvironmentEdgeManager.currentTime();
// If nothing to flush, return, but we need to safely update the region sequence id
//如果當前memstroe為空,不執行flush,但是要更新squenid
if (this.memstoreSize.get() <= 0) {
// Take an update lock because am about to change the sequence id and we want the sequence id
// to be at the border of the empty memstore.
MultiVersionConsistencyControl.WriteEntry w = null;
this.updatesLock.writeLock().lock();
try {
if (this.memstoreSize.get() <= 0) {
// Presume that if there are still no edits in the memstore, then there are no edits for
// this region out in the WAL subsystem so no need to do any trickery clearing out
// edits in the WAL system. Up the sequence number so the resulting flush id is for
// sure just beyond the last appended region edit (useful as a marker when bulk loading,
// etc.)
// wal can be null replaying edits.
if (wal != null) {
w = mvcc.beginMemstoreInsert();
long flushSeqId = getNextSequenceId(wal);
FlushResult flushResult = new FlushResult(
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushSeqId, "Nothing to flush");
w.setWriteNumber(flushSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
w = null;
return new PrepareFlushResult(flushResult, myseqid);
} else {
return new PrepareFlushResult(
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"),
myseqid);
}
}
} finally {
this.updatesLock.writeLock().unlock();
if (w != null) {
mvcc.advanceMemstore(w);
}
}
}
if (LOG.isInfoEnabled()) {
LOG.info("Started memstore flush for " + this + ", current region memstore size "
+ StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
+ stores.size() + " column families' memstores are being flushed."
+ ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
// only log when we are not flushing all stores.
//當不是flush所有的store時,打印log
if (this.stores.size() > storesToFlush.size()) {
for (Store store : storesToFlush) {
LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+ " which was occupying "
+ StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
}
}
}
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
// allow updates again so its value will represent the size of the updates received
// during flush
//停止寫入,直到memstore的snapshot完成。
MultiVersionConsistencyControl.WriteEntry w = null;
// We have to take an update lock during snapshot, or else a write could end up in both snapshot
// and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
//獲取update的寫鎖
this.updatesLock.writeLock().lock();
status.setStatus("Preparing to flush by snapshotting stores in " +
getRegionInfo().getEncodedName());
//用於統計flush的所有的store的memtore內存大小之和
long totalFlushableSizeOfFlushableStores = 0;
//記錄所有flush的store的cfname
Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
for (Store store : storesToFlush) {
flushedFamilyNames.add(store.getFamily().getName());
}
//storeFlushCtxs,committedFiles,storeFlushableSize,比較重要的是storeFlushCtxs和committedFiles。他們都被定義為以CF做key的TreeMap,
// 分別代表了store的CF實際執行(StoreFlusherImpl)和最終刷寫的HFlile文件。
//其中storeFlushContext的實現類StoreFlusherImpl里包含了flush相關的核心操作:prepare,flushcache,commit,abort等。
//所以這里保存的是每一個store的flush實例,后面就是通過這里的StoreFlushContext進行flush的
TreeMap<byte[], StoreFlushContext> storeFlushCtxs
= new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
//用來存儲每個store和它對應的hdfs commit路徑的映射
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to
// createFlushContext to use as the store file's sequence id.
long flushOpSeqId = HConstants.NO_SEQNUM;
long flushedSeqId = HConstants.NO_SEQNUM;
// The max flushed sequence id after this flush operation. Used as completeSequenceId which is
// passed to HMaster.
byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
long trxId = 0;
try {
try {
w = mvcc.beginMemstoreInsert();
if (wal != null) {
if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
// This should never happen.
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
status.setStatus(msg);
return new PrepareFlushResult(new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
// no oldestUnflushedSeqId means we flushed all stores.
// or the unflushed stores are all empty.
flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
: oldestUnflushedSeqId - 1;
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushedSeqId = flushOpSeqId = myseqid;
}
//循環遍歷region下面的storeFile,為每個storeFile生成了一個StoreFlusherImpl類,
// 生成MemStore的快照就是調用每個StoreFlusherImpl的prepare方法生成每個storeFile的快照,
// 至於internalFlushCacheAndCommit中的flush和commti行為也是調用了region中每個storeFile的flushCache和commit接口。
for (Store s : storesToFlush) {
//用於統計flush的所有的store的memtore內存大小之和,而不是snapshot的getCellsCount()
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
//為每一個store生成自己的storeFlushImpl
storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
//此時還沒有生成flush的hfile路徑
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
}
// write the snapshot start to WAL
if (wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// no sync. Sync is below where we do not hold the updates lock
//這里只是向wal中寫入begin flush的marker,真正的sync在后面做,因為這里加了update的寫鎖,所有耗時操作都不在這里進行
trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
}
// Prepare flush (take a snapshot)這里的StoreFlushContext就是StoreFlusherImpl
for (StoreFlushContext flush : storeFlushCtxs.values()) {
//迭代region下的每一個store,把memstore下的kvset復制到memstore的snapshot中並清空kvset的值
//把memstore的snapshot復制到HStore的snapshot中
flush.prepare();//其prepare方法就是調用store的storeFlushImpl的snapshot方法生成快照
}
} catch (IOException ex) {
if (wal != null) {
if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(t));
// ignore this since we will be aborting the RS with DSE.
}
}
// we have called wal.startCacheFlush(), now we have to abort it
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
throw ex; // let upper layers deal with it.
}
} finally {
//做完snapshot釋放鎖,此時不會阻塞業務的讀寫操作了
this.updatesLock.writeLock().unlock();
}
String s = "Finished memstore snapshotting " + this +
", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// sync unflushed WAL changes
// see HBASE-8208 for details
if (wal != null) {
try {
wal.sync(); // ensure that flush marker is sync'ed
} catch (IOException ioe) {
LOG.warn("Unexpected exception while wal.sync(), ignoring. Exception: "
+ StringUtils.stringifyException(ioe));
}
}
// wait for all in-progress transactions to commit to WAL before
// we can start the flush. This prevents
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
w.setWriteNumber(flushOpSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
w = null;
} finally {
if (w != null) {
// in case of failure just mark current w as complete
mvcc.advanceMemstore(w);
}
}
return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
flushedSeqId, totalFlushableSizeOfFlushableStores);
在具體看StoreFlushContext.prepare()之前,我們先看一下StoreFlushContext接口的說明,如上所述,StoreFlushImpl是Store的內部類,繼承自StoreFlushContext。
interface StoreFlushContext {
void prepare();
void flushCache(MonitoredTask status) throws IOException;
boolean commit(MonitoredTask status) throws IOException;
void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) throws IOException;
void abort() throws IOException;
List<Path> getCommittedFiles();
}
現在我們回過頭來繼續看internalPrepareFlushcache中標紅的flush.prepare();
public void prepare() {
//在region調用storeFlusherImpl的prepare的時候,前面提到是在region的update.write.lock中的,因此這里面所有的耗時操作都會影響業務正在進行的讀寫操作.
//在snapshot中的邏輯中只是將memstore的跳躍表賦值給snapshot的跳躍表,在返回memstoresnapshot的時候,調用的snapshot的size()方法
this.snapshot = memstore.snapshot();
//MemstoreSnapshot的getCellsCount方法即在memstore的shapshot中返回的MemStoresnapshot中傳入的snapshot.size()值,時間復雜度是o(n)
this.cacheFlushCount = snapshot.getCellsCount();
this.cacheFlushSize = snapshot.getSize();
committedFiles = new ArrayList<Path>(1);
}
我們看下memstore的snapshot方法
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
this.snapshotId = EnvironmentEdgeManager.currentTime();
//memstore使用的mem大小
this.snapshotSize = keySize();
if (!this.cellSet.isEmpty()) {
//這里的cellset就是memstore內存中的數據
this.snapshot = this.cellSet;
//構造一個新的cellset存儲數據
this.cellSet = new CellSkipListSet(this.comparator);
this.snapshotTimeRangeTracker = this.timeRangeTracker;
this.timeRangeTracker = new TimeRangeTracker();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
this.snapshotAllocator = this.allocator;
// Reset allocator so we get a fresh buffer for the new memstore
if (allocator != null) {
String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
new Class[] { Configuration.class }, new Object[] { conf });
}
timeOfOldestEdit = Long.MAX_VALUE;
}
}
prepare中的snapshot.getCellsCount();我們重點說一下,hbase的內存存儲寫入的數據使用的是跳躍表的數據結構,實現是使用jdk自帶的ConcurrentSkipListMap。在hbase的MemStore(默認是DefaultMemStore)實現中有兩個環境變量,分別是ConcurrentSkipListMap類型的cellset和snapshot。cellset用來存儲寫入到memstore的數據,snapshot是在flush的第一階段是將cellset賦值用的。因此這個的getCellsCount()方法最終調用的是concurrentSkipListMap.size(),concurrentSkipListMap並沒有一個原子變量來報錯map的大小,因為這里為了並發,同時該操作也不常用。因此concurrentSkipListMap.size()是遍歷整個跳躍表獲取size大小。
繼續回到internalPrepareFlushCache中,對每一個store調用完prepare后,就將updatelock進行unlock。並返回一個PrepareFlushResult。繼續往上走,
回到internalFlushCache方法。執行完internalPrepareFlushcache后走的是internalFlushAndCommit方法。繼續跟進:
protected FlushResult internalFlushCacheAndCommit(
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
final Collection<Store> storesToFlush)
throws IOException {
// prepare flush context is carried via PrepareFlushResult
//進行flush的store的cf:storeFlushImpl映射
TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
//flush生成的hfile的路徑,當前key是有的,為cf,但是List<Path>為null,是在internalPrepareFlushCache中初始化的
TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
long startTime = prepareResult.startTime;
long flushOpSeqId = prepareResult.flushOpSeqId;
long flushedSeqId = prepareResult.flushedSeqId;
long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
String s = "Flushing stores of " + this;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// Any failure from here on out will be catastrophic requiring server
// restart so wal content can be replayed and put back into the memstore.
// Otherwise, the snapshot content while backed up in the wal, it will not
// be part of the current running servers state.
boolean compactionRequested = false;
try {
// A. Flush memstore to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file. The new flushed file is still in the
// tmp directory.
//迭代region下的每一個store,調用HStore.storeFlushImpl.flushCache方法,把store中snapshot的數據flush到hfile中,當然這里是flush到temp文件中,最終是通過commit將其移到正確的路徑下
//
//
for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
Iterator<Store> it = storesToFlush.iterator();
// stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
}
committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
}
storeFlushCtxs.clear();
// Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
if (wal != null) {
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
// The wal needs to be replayed so its content is restored to memstore.
// Currently, only a server restart will do this.
// We used to only catch IOEs but its possible that we'd get other
// exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
// all and sundry.
if (wal != null) {
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
StringUtils.stringifyException(ex));
// ignore this since we will be aborting the RS with DSE.
}
wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
Bytes.toStringBinary(getRegionName()));
dse.initCause(t);
status.abort("Flush failed: " + StringUtils.stringifyException(t));
throw dse;
}
// If we get to here, the HStores have been written.
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
// Record latest flush time
for (Store store : storesToFlush) {
this.lastStoreFlushTimeMap.put(store, startTime);
}
// Update the oldest unflushed sequence id for region.
this.maxFlushedSeqId = flushedSeqId;
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
synchronized (this) {
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
long time = EnvironmentEdgeManager.currentTime() - startTime;
long memstoresize = this.memstoreSize.get();
String msg = "Finished memstore flush of ~"
+ StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
+ totalFlushableSizeOfFlushableStores + ", currentsize="
+ StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ " for region " + this + " in " + time + "ms, sequenceid="
+ flushOpSeqId + ", compaction requested=" + compactionRequested
+ ((wal == null) ? "; wal=null" : "");
LOG.info(msg);
status.setStatus(msg);
return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
}
我們就只看其中兩個方法:flush.flushcache和flush.commit。這里的flush即StoreFlushImpl。flushcache方法是用來執行第二階段,commit用來執行第三階段。
public void flushCache(MonitoredTask status) throws IOException {
//返回的是snapshotflush到臨時文件后,最終需要移到的正確路徑
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
}
轉到store的flushcache方法
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
// Retry after catching exception when flushing, otherwise server will abort
// itself
StoreFlusher flusher = storeEngine.getStoreFlusher();
IOException lastException = null;
for (int i = 0; i < flushRetriesNumber; i++) {
try {
//調用StoreFlusher.flushsnapshot方法將snapshotflush到temp文件
List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
lastPathName = pathName;
validateStoreFile(pathName);
}
return pathNames;
} catch (Exception e) {
LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
if (e instanceof IOException) {
lastException = (IOException) e;
} else {
lastException = new IOException(e);
}
}
} catch (IOException e) {
LOG.warn("Failed flushing store file, retrying num=" + i, e);
lastException = e;
}
if (lastException != null && i < (flushRetriesNumber - 1)) {
try {
Thread.sleep(pauseTime);
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
iie.initCause(e);
throw iie;
}
}
}
throw lastException;
}
其中標紅的部分是主要的邏輯。首先通過storeEngine.getStoreFlusher獲取flush的實例,實際包括了sync到disk的writer以及append等操作。這里不再展開說明。我們重點看一下for循環中的flusher.flushSnapshot方法,涉及到一個重要的環境變量cellsCount。
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status) throws IOException {
ArrayList<Path> result = new ArrayList<Path>();
//這里會調用snapshot的getCellsCount方法,之所以這里提了這個方法,是因為其實一個prepare階段耗時較大的過程。
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
StoreFile.Writer writer;
try {
// TODO: We can fail in the below block before we complete adding this flush to
// list of store files. Add cleanup of anything put on filesystem if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
//這里傳入的cellsCount實際並沒有用,可能是預置的變量?
writer = store.createWriterInTmp(
cellsCount, store.getFamily().getCompression(), false, true, true);
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
IOException e = null;
try {
//真正的將snapshot寫入臨時文件
performFlush(scanner, writer, smallestReadPoint);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
throw ioe;
} finally {
if (e != null) {
writer.close();
} else {
finalizeWriter(writer, cacheFlushId, status);
}
}
}
} finally {
scanner.close();
}
LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
+ StringUtils.humanReadableInt(snapshot.getSize()) +
", hasBloomFilter=" + writer.hasGeneralBloom() +
", into tmp file " + writer.getPath());
result.add(writer.getPath());
return result;
}
可以看到store.createWriterInTmp中使用了該變量,繼續跟進
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
throws IOException {
。。。。。忽略不重要邏輯。。。。。
//這里傳入的maxkeyCount沒有用
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
this.getFileSystem())
.withFilePath(fs.createTempName())
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
.build();
return w;
}
可見將cellscount以參數的形式傳給了writer。然后執行performFlush方法,該方法通過scanner遍歷,然后使用hfile.writer將數據羅盤。我們看一下Writer中將cellscount用來干啥了。在整個writer中只有這兩個地方用到了
generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
conf, cacheConf, bloomType,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
this.deleteFamilyBloomFilterWriter = BloomFilterFactory
.createDeleteBloomAtWrite(conf, cacheConf,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
繼續跟進這兩個
public static BloomFilterWriter createDeleteBloomAtWrite(Configuration conf,
CacheConfig cacheConf, int maxKeys, HFile.Writer writer) {
if (!isDeleteFamilyBloomEnabled(conf)) {
LOG.info("Delete Bloom filters are disabled by configuration for "
+ writer.getPath()
+ (conf == null ? " (configuration is null)" : ""));
return null;
}
float err = getErrorRate(conf);
int maxFold = getMaxFold(conf);
// In case of compound Bloom filters we ignore the maxKeys hint.
CompoundBloomFilterWriter bloomWriter = new CompoundBloomFilterWriter(getBloomBlockSize(conf),
err, Hash.getHashType(conf), maxFold, cacheConf.shouldCacheBloomsOnWrite(),
KeyValue.RAW_COMPARATOR);
writer.addInlineBlockWriter(bloomWriter);
return bloomWriter;
}
可見maxKeys沒有使用,另一個方法同理,所以這里的cellscount變量在flush的第二階段沒有使用。
到現在為止我們判斷出在第二階段cellcount沒有使用,我們繼續跟進第三階段:回到internalFlushAndCOmmit中的flush.commit(status)
public boolean commit(MonitoredTask status) throws IOException {
if (this.tempFiles == null || this.tempFiles.isEmpty()) {
return false;
}
List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
for (Path storeFilePath : tempFiles) {
try {
storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
} catch (IOException ex) {
LOG.error("Failed to commit store file " + storeFilePath, ex);
// Try to delete the files we have committed before.
for (StoreFile sf : storeFiles) {
Path pathToDelete = sf.getPath();
try {
sf.deleteReader();
} catch (IOException deleteEx) {
LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
Runtime.getRuntime().halt(1);
}
}
throw new IOException("Failed to commit the flush", ex);
}
}
for (StoreFile sf : storeFiles) {
if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
}
committedFiles.add(sf.getPath());
}
HStore.this.flushedCellsCount += cacheFlushCount;
HStore.this.flushedCellsSize += cacheFlushSize;
// Add new file to store files. Clear snapshot too while we have the Store write lock.
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
}
第三階段比較簡單,將flush的文件移動到hdfs正確的路徑下。同時可見在這里用到了cellscount。這里是賦值給store的flushedCellsCount,這里主要是用來進行metric收集flushedCellsSize的。根據經驗這個metric可忽略,未使用過。
總結
這里之所以總是提到cellscount變量,是因為給其賦值調用ConcurrentSkipListMap.size()方法在flush的第一階段中最耗時的,同時持有hbase region 級別的updatelock,但是通過梳理並沒有太大的用處,可以干掉。否則會因此一些毛刺,pct99比較高。已有patch,但是是應用在2.+的版本的、
整個flush的流程就結束了,如有不對的地方,歡迎指正。歡迎加微信相互交流:940184856
