對應 HBase 版本0.94.1,對照了開源的版本和工作使用的某發行版
問題:在 HBase shell 里面輸入 flush 'table_or_region_name'
之后,發生了什么?具體的實現是怎么樣的?對於現有的某個表,我如何在做操作之前估算 flush 執行的時間?
1. HBase shell 入口
HBase shell 使用 ruby 實現,在 putty 敲hbase shell
,調用的是${HBASE_HOME}/bin/hbase
這個 bash 腳本,根據shell
這個參數,觸發調用 ruby 代碼,相關的部分如下:
if [ "$COMMAND" = "shell" ] ; then if [ "$JRUBY_HOME" != "" ] ; then CLASSPATH="$JRUBY_HOME/lib/jruby.jar:$CLASSPATH" HBASE_OPTS="$HBASE_OPTS -Djruby.home=$JRUBY_HOME -Djruby.lib=$JRUBY_HOME/lib" fi CLASS="org.jruby.Main -X+O ${JRUBY_OPTS} ${HBASE_HOME}/bin/hirb.rb"
在 hirb.rb 里面,引入相關的包(${HBASE_HOME}/lib/ruby
目錄下),然后啟動一個運行的 CLI 環境。
進入正題了。
在 hbase shell 里面,所有執行的命令,都在${HBASE_HOME}/lib/ruby/shell/commands
目錄下,有對應的${COMMAND}.rb
的對應文件。
找到 flush.rb,核心代碼如下:
def command(table_or_region_name) format_simple_command do admin.flush(table_or_region_name) end end
這里調用了 admin.rb 這個文件里面的方法:
@admin = org.apache.hadoop.hbase.client.HBaseAdmin.new(configuration) def flush(table_or_region_name) @admin.flush(table_or_region_name) end
到這里,就找到了 Java 程序的入口,調用了 HBaseAdmin.flush(table_or_region_name)
這個方法。
后續幾部分的類圖如下:
2. HBaseAdmin 包裝
HBaseAdmin 類下面包含了三個 flush 方法:
public void flush(String tableNameOrRegionName) throws IOException, InterruptedException {} public void flush(byte[] tableNameOrRegionName) throws IOException, InterruptedException {} private void flush(ServerName sn, HRegionInfo hri) throws IOException {}
- 第一個,作為入口,將 String 參數轉化為 byte[],交給第二個
- 第二個,主要的工作方法,按輸入參數是 region 名、分區表、不分區表,分別進行處理
- 第三個,單獨針對 region 進行 flush
第一個略過。
第二個,邏輯清晰:
- 如果是參數為 Region,就調用第三個 flush 處理
- 如果不是分區表,就獲取該表包含的所有 Region,挨個調用第三個 flush 處理,
- 如果地分區表,處理方式與其他的不同,調用了一個分區表公共處理方法 execPartitionTableAction 訂制實現了匿名類 PartitionTableActionCallableFactory,進行單獨處理。
注意
- 對於沒有預分區的表,簡單地在一個 for 循環里面,串行處理
- 對於分區表,execPartitionTableAction中使用了並發數據結構 Future,對分區是並行處理
第三個,對每個 Region 進行 flush,實際上是第二個 flush 中所有 case 最終的歸宿。
在第三個 flush 中,實現代碼如下:
HRegionInterface rs = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort()); rs.flushRegion(hri);
HRegionInterface 是一個抽象接口,flushRegion 是一個抽象方法。在0.94.1這個版本下,只有 HRegionServer 實現了 HRegionInterface 接口,所以要在 HRegionServer 里面找到具體的代碼實現。
3. HRegionServer 包裝
在 HRegionServer 類里面,包含了三個 flush 的實現:
public void flushRegion(byte[] regionName) throws IllegalArgumentException, IOException {} public void flushRegion(byte[] regionName, long ifOlderThanTS) throws IllegalArgumentException, IOException {} @QosPriority (priority=100) public void flushRegion(HRegionInfo regionInfo) throws NotServingRegionException, IOException {}
- 第一個,簡單地傳入 regionName,確定 Region 在線,然后調用
region.flushcache()
- 第二個,傳入 regionName 和 超時時間戳 ifOlderThanTS ,確定 Region 在線,且未超時的情況下,將數據 flush 出去
- 第三個,@QosPriority (priority=100)標記,使用了自定義聲明,給該方法賦值 rpc 調用的優先級;方法體
checkOpen()
檢查 RegionServer 在線后,調用region.flushcache()
接下來,查看看下 HRegion 類下面flushcache()
的實現。
4. HRegion 實現
flushcache只是個入口方法,會做一些 flush 之前的准備工作,包括:建立任務狀態監控、判斷 Coprocessor、處理未 WAL 的 put 、寫加鎖等。之后,調用內部方法internalFlushcache
開始flush。
在 internalFlushcache 方法實現中,做了 MVCC 的一些工作,最終,調用了StoreFlusher的flushCache方法實現。
internalFlushcache 為了保證數據一致性做了很多的檢查、校驗、加鎖,目前功力不夠,先標記下,進入下一層。
看下 StoreFlusher 的實現。
5. StoreFlusher 實現
StoreFlusher 是個接口,在0.94.1這個版本里面,只有 Store.StoreFlusherImpl 一個實現類。
在 StoreFlusher 接口里面可以看到,flush 操作執行的過程中包含3個部分:
- prepare,這是個短操作,創建 snapshot,這個過程中會暫停寫操作
- flushCache,flush 執行的過程中,是不會阻塞該 store 上的任何操作(讀寫)
- commit,將 flush 出的文件添加到 store 目錄下,清除 memstore 快照,短操作,會足暫停 scan
6. StoreFlusherImpl 實現
StoreFlusherImpl 是 Store 類的內部私有類,前面提到的 StoreFlusher 的3個方法,由 StoreFlusherImpl實現后,prepare 是自己實現,flushCache和 commit 都是調用外部 Store類的方法來完成。
6.1 prepare
public void prepare() { memstore.snapshot(); this.snapshot = memstore.getSnapshot(); this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); }
調用了 MemStore 的方法,做快照。
6.2 flushCache
從 StoreFlusherImpl 調用 Store 類的flushCache方法,包裝了internalFlushCache
方法來實現。
邏輯比較清晰:
- 啟動一個 StoreScanner,根據時間戳和ScanType 參數,找出需要被 Flush 的行
- 啟動一個StoreFile Writer,把讀出來的數據,寫入到一個 StoreFile 中,並將該 StoreFile 的路徑返回,供后續 commit 階段使用
6.3 commit
StoreFlusherImpl 類的 commit 方法首先調用外部 Store類的commitFile
方法,主要做的事情有兩件:
- 將 flushCache 生成的 StoreFile 移動到 Store所在目錄下
- 更新 Store 的相關統計參數
然后會調用外部 Store類的updateStorefiles
更新 Store 類的 storefile,更新文件后,需要調用needsCompaction()
,查看下是否因為本次 flush 執行造成的文件變化會觸發 Compaction。如果觸發 Compaction,會啟動 Compaction 相關的一套機制繼續執行,后續再單獨介紹。
至此,手動 flush 操作背后的實現,初步梳理完畢。前面只是一個調用路徑的梳理,后面繼續豐富和補充。