背景:
對於其他數據存儲系統來說,統計表的行數是再基本不過的操作了,一般實現都非常簡單;但對於HBase這種key-value存儲結構的列式數據庫,統計 RowCount 的方法卻有好幾種不同的花樣,並且執行效率差別巨大!下面來研究下吧~
測試集群:HBase1.2.0 - CDH5.13.0 四台服務器
注:以下4種方法效率依次提高
一、hbase-shell的count命令
這是最簡單直接的操作,但是執行效率非常低,適用於百萬級以下的小表RowCount統計!
hbase> count 'ns1:t1' hbase> count 't1' hbase> count 't1', INTERVAL => 100000 hbase> count 't1', CACHE => 1000 hbase> count 't1', INTERVAL => 10, CACHE => 1000
此操作可能需要很長時間,來運行計數MapReduce作業。默認情況下每1000行顯示當前計數,計數間隔可自行指定。
默認情況下在計數掃描上啟用緩存,默認緩存大小為10行。
行數為 3000W 的表測試結果:
hbase(main):001:0> count 'sda_crm_calls20180102'
INTERVAL為1000000行時花了130分鍾。
二、scan方式設置過濾器循環計數(JAVA實現)
這種方式是通過添加 FirstKeyOnlyFilter 過濾器的scan進行全表掃描,循環計數RowCount,速度較慢! 但快於第一種count方式!
基本代碼如下:
public void rowCountByScanFilter(String tablename){ long rowCount = 0; try { //計時 StopWatch stopWatch = new StopWatch(); stopWatch.start(); TableName name=TableName.valueOf(tablename); //connection為類靜態變量 Table table = connection.getTable(name); Scan scan = new Scan(); //FirstKeyOnlyFilter只會取得每行數據的第一個kv,提高count速度 scan.setFilter(new FirstKeyOnlyFilter()); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { rowCount += result.size(); } stopWatch.stop(); System.out.println("RowCount: " + rowCount); System.out.println("統計耗時:" +stopWatch.getTotalTimeMillis()); } catch (Throwable e) { e.printStackTrace(); } }
耗時45分鍾!
三、利用hbase.RowCounter包執行MR任務
這種方式效率非常高!利用了hbase jar中自帶的統計行數的工具類!
通過 $HBASE_HOME/bin/hbase
命令執行:
[root@cdh1 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter 'sda_crm_calls20180102'
耗時1m40s,速度較上面兩種有了質的飛躍!
四、利用HBase協處理器Coprocessor(JAVA實現)
這是我目前發現效率最高的RowCount統計方式,利用了HBase高級特性:協處理器!
我們往往使用過濾器來減少服務器端通過網絡返回到客戶端的數據量。但HBase中還有一些特性讓用戶甚至可以把一部分計算也移動到數據的存放端,那就是協處理器 (coprocessor)。
協處理器簡介:
(節選自《HBase權威指南》)
使用客戶端API,配合篩選機制,例如,使用過濾器或限制列族的范圍,都可以控制被返回到客戶端的數據量。如果可以更進一步優化會更好,例如,數據的處理流程直接放到服務器端執行,然后僅返回一個小的處理結果集。這類似於一個小型的MapReduce框架,該框架將工作分發到整個集群。
協處理器 允許用戶在region服務器上運行自己的代碼,更准確地說是允許用戶執行region級的操作,並且可以使用與RDBMS中觸發器(trigger)類似的功能。在客戶端,用戶不用關心操作具體在哪里執行,HBase的分布式框架會幫助用戶把這些工作變得透明。
實現代碼:
public void rowCountByCoprocessor(String tablename){ try { //提前創建connection和conf Admin admin = connection.getAdmin(); TableName name=TableName.valueOf(tablename); //先disable表,添加協處理器后再enable表 admin.disableTable(name); HTableDescriptor descriptor = admin.getTableDescriptor(name); String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; if (! descriptor.hasCoprocessor(coprocessorClass)) { descriptor.addCoprocessor(coprocessorClass); } admin.modifyTable(name, descriptor); admin.enableTable(name); //計時 StopWatch stopWatch = new StopWatch(); stopWatch.start(); Scan scan = new Scan(); AggregationClient aggregationClient = new AggregationClient(conf); System.out.println("RowCount: " + aggregationClient.rowCount(name, new LongColumnInterpreter(), scan)); stopWatch.stop(); System.out.println("統計耗時:" +stopWatch.getTotalTimeMillis()); } catch (Throwable e) { e.printStackTrace(); } }
發現只花了 23秒 就統計完成!
為什么利用協處理器后速度會如此之快?
Table注冊了Coprocessor之后,在執行AggregationClient的時候,會將RowCount分散到Table的每一個Region上,Region內RowCount的計算,是通過RPC執行調用接口,由Region對應的RegionServer執行InternalScanner進行的。
因此,性能的提升有兩點原因:
1.分布式統計。將原來客戶端按照Rowkey的范圍單點進行掃描,然后統計的方式,換成了由所有Region所在RegionServer同時計算的過程。
2.使用了在RegionServer內部執行使用了InternalScanner。這是距離實際存儲最近的Scanner接口,存取更加快捷。