HBase Coprocessor 剖析與編程實踐


1.起因(Why HBase  Coprocessor)

HBase作為列族數據庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase中,統計數據表的總行數,需要使用Counter方法,執行一次MapReduce Job才能得到。雖然HBase在數據存儲層中集成了MapReduce,能夠有效用於數據表的分布式計算。然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少通訊開銷,從而獲得很好的性能提升。於是,HBase在0.92之后引入了協處理器(coprocessors),實現一些激動人心的新特性:能夠輕易建立二次索引、復雜過濾器(謂詞下推)以及訪問控制等。

2.靈感來源( Source of Inspration)

HBase協處理器的靈感來自於Jeff Dean 09年的演講( P66-67)。它根據該演講實現了類似於bigtable的協處理器,包括以下特性:

  • 每個表服務器的任意子表都可以運行代碼
  • 客戶端的高層調用接口(客戶端能夠直接訪問數據表的行地址,多行讀寫會自動分片成多個並行的RPC調用)
  • 提供一個非常靈活的、可用於建立分布式服務的數據模型
  • 能夠自動化擴展、負載均衡、應用請求路由
HBase的協處理器靈感來自bigtable,但是實現細節不盡相同。HBase建立了一個框架,它為用戶提供類庫和運行時環境,使得他們的代碼能夠在HBase region server和master上處理。

3.細節剖析(Implementation)

協處理器分兩種類型,系統協處理器可以全局導入region server上的所有數據表,表協處理器即是用戶可以指定一張表使用協處理器。協處理器框架為了更好支持其行為的靈活性,提供了兩個不同方面的插件。一個是觀察者(observer),類似於關系數據庫的觸發器。另一個是終端(endpoint),動態的終端有點像存儲過程。

 3.1觀察者(Observer)

觀察者的設計意圖是允許用戶通過插入代碼來重載協處理器框架的upcall方法,而具體的事件觸發的callback方法由HBase的核心代碼來執行。協處理器框架處理所有的callback調用細節,協處理器自身只需要插入添加或者改變的功能。

以HBase0.92版本為例,它提供了三種觀察者接口:

  • RegionObserver:提供客戶端的數據操縱事件鈎子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相關操作鈎子。
  • MasterObserver:提供DDL-類型的操作鈎子。如創建、刪除、修改數據表等。

這些接口可以同時使用在同一個地方,按照不同優先級順序執行.用戶可以任意基於協處理器實現復雜的HBase功能層。HBase有很多種事件可以觸發觀察者方法,這些事件與方法從HBase0.92版本起,都會集成在HBase API中。不過這些API可能會由於各種原因有所改動,不同版本的接口改動比較大,具體參考Java Doc

RegionObserver工作原理,如圖1所示。更多關於Observer細節請參見HBaseBook的第9.6.3章節

regionobserver.png

圖1 RegionObserver工作原理

 

3.2終端(Endpoint)

終端是動態RPC插件的接口,它的實現代碼被安裝在服務器端,從而能夠通過HBase RPC喚醒。客戶端類庫提供了非常方便的方法來調用這些動態接口,它們可以在任意時候調用一個終端,它們的實現代碼會被目標region遠程執行,結果會返回到終端。用戶可以結合使用這些強大的插件接口,為HBase添加全新的特性。終端的使用,如下面流程所示:

  1. 定義一個新的protocol接口,必須繼承CoprocessorProtocol.
  2. 實現終端接口,該實現會被導入region環境執行。
  3. 繼承抽象類BaseEndpointCoprocessor.
  4. 在客戶端,終端可以被兩個新的HBase Client API調用 。單個region:HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) 。rigons區域:HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)

整體的終端調用過程范例,如圖2所示:

rpc.png

圖2 終端調用過程范例

4.編程實踐(Code Example)

在該實例中,我們通過計算HBase表中行數的一個實例,來真實感受協處理器 的方便和強大。在舊版的HBase我們需要編寫MapReduce代碼來匯總數據表中的行數,在0.92以上的版本HBase中,只需要編寫客戶端的代碼即可實現,非常適合用在WebService的封裝上。

4.1啟用協處理器 Aggregation(Enable Coprocessor Aggregation)

我們有兩個方法:1.啟動全局aggregation,能過操縱所有的表上的數據。通過修改hbase-site.xml這個文件來實現,只需要添加如下代碼:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>

2.啟用表aggregation,只對特定的表生效。通過HBase Shell 來實現。

(1)disable指定表。hbase> disable 'mytable'

(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

(3)重啟指定表 hbase> enable 'mytable'

4.2統計行數代碼(Code Snippet)

public class MyAggregationClient { 

private static final byte[] TABLE_NAME = Bytes.toBytes("mytable");
private static final byte[] CF = Bytes.toBytes("vent");
public static void main(String[] args) throws Throwable {
Configuration customConf = new Configuration();
customConf.setStrings("hbase.zookeeper.quorum",
"node0,node1,node2");
//提高RPC通信時長
customConf.setLong("hbase.rpc.timeout", 600000);
//設置Scan緩存
customConf.setLong("hbase.client.scanner.caching", 1000);
Configuration configuration = HBaseConfiguration.create(customConf);
AggregationClient aggregationClient = new AggregationClient(
configuration);
Scan scan = new Scan();
//指定掃描列族,唯一值
scan.addFamily(CF);
long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);
System.out.println("row count is " + rowCount);

}
}

 5.參考文獻(References)

[1]Lai, et al.,(2012-02-01),"Coprocessor Introduction : Apache HBase".Available:https://blogs.apache.org/hbase/entry/coprocessor_introduction

[2]Apache.(2012-08-10),"The Apache HBase  Reference Guide".Available:http://hbase.apache.org/book.html#coprocessors

 

知識共享許可協議
本作品由VentLam創作,采用知識共享署名-非商業性使用-相同方式共享 2.5 中國大陸許可協議進行許可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM