協處理器—Coprocessor
1、 起源
Hbase 作為列族數據庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執 行求和、計數、排序等操作。比如,在舊版本的(<0.92)Hbase 中,統計數據表的總行數,需 要使用 Counter 方法,執行一次 MapReduce Job 才能得到。雖然 HBase 在數據存儲層中集成 了 MapReduce,能夠有效用於數據表的分布式計算。然而在很多情況下,做一些簡單的相 加或者聚合計算的時候,如果直接將計算過程放置在 server 端,能夠減少通訊開銷,從而獲 得很好的性能提升。於是,HBase 在 0.92 之后引入了協處理器(coprocessors),實現一些激動 人心的新特性:能夠輕易建立二次索引、復雜過濾器(謂詞下推)以及訪問控制等。
2、介紹
協處理器有兩種:observer 和 endpoint
Observer 類似於傳統數據庫中的觸發器,當發生某些事件的時候這類協處理器會被 Server 端調用。Observer Coprocessor 就是一些散布在 HBase Server 端代碼中的 hook 鈎子, 在固定的事件發生時被調用。比如:put 操作之前有鈎子函數 prePut,該函數在 put 操作執 行前會被 Region Server 調用;在 put 操作之后則有 postPut 鈎子函數
以 HBase0.92 版本為例,它提供了三種觀察者接口:
RegionObserver:提供客戶端的數據操縱事件鈎子:Get、Put、Delete、Scan 等。
WALObserver:提供 WAL 相關操作鈎子。
MasterObserver:提供 DDL-類型的操作鈎子。如創建、刪除、修改數據表等。
到 0.96 版本又新增一個 RegionServerObserver
下圖是以 RegionObserver 為例子講解 Observer 這種協處理器的原理:
1、客戶端發出 put 請求
2、該請求被分派給合適的 RegionServer 和 region
3、coprocessorHost 攔截該請求,然后在該表上登記的每個 RegionObserver 上調用 prePut()
4、如果沒有被 prePut()攔截,該請求繼續送到 region,然后進行處理
5、region 產生的結果再次被 CoprocessorHost 攔截,調用 postPut()
6、假如沒有 postPut()攔截該響應,最終結果被返回給客戶端
Endpoint 協處理器類似傳統數據庫中的存儲過程,客戶端可以調用這些 Endpoint 協處 理器執行一段 Server 端代碼,並將 Server 端代碼的結果返回給客戶端進一步處理,最常見 的用法就是進行聚集操作。如果沒有協處理器,當用戶需要找出一張表中的最大數據,即 max 聚合操作,就必須進行全表掃描,在客戶端代碼內遍歷掃描結果,並執行求最大值的 操作。這樣的方法無法利用底層集群的並發能力,而將所有計算都集中到 Client 端統一執行, 勢必效率低下。利用 Coprocessor,用戶可以將求最大值的代碼部署到 HBase Server 端,HBase 將利用底層 cluster 的多個節點並發執行求最大值的操作。即在每個 Region 范圍內執行求最 大值的代碼,將每個 Region 的最大值在 Region Server 端計算出,僅僅將該 max 值返回給客 戶端。在客戶端進一步將多個 Region 的最大值進一步處理而找到其中的最大值。這樣整體 的執行效率就會提高很多
下圖是 EndPoint 的工作原理:
3、總結
Observer 允許集群在正常的客戶端操作過程中可以有不同的行為表現
Endpoint 允許擴展集群的能力,對客戶端應用開放新的運算命令
Observer 類似於 RDBMS 中的觸發器,主要在服務端工作
Endpoint 類似於 RDBMS 中的存儲過程,主要在服務端工作
Observer 可以實現權限管理、優先級設置、監控、ddl 控制、二級索引等功能
Endpoint 可以實現 min、max、avg、sum、distinct、group by 等功能
協處理加載方式
協處理器的加載方式有兩種,我們稱之為靜態加載方式(Static Load)和動態加載方式 (Dynamic Load)。靜態加載的協處理器稱之為 System Coprocessor,動態加載的協處理器稱 之為 Table Coprocessor。
1、 靜態加載
通過修改 hbase-site.xml 這個文件來實現,啟動全局 aggregation,能過操縱所有的表上 的數據。只需要添加如下代碼:
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
為所有 table 加載了一個 cp class,可以用”,”分割加載多個 class
2、 動態加載
啟用表 aggregation,只對特定的表生效。通過 HBase Shell 來實現。
(1)停用表 disable 'guanzhu'
(2)添加協處理器 alter 'guanzhu', METHOD => 'table_att', 'coprocessor' => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoprocessorTest|1001|'
(3)啟用表 enable 'guanzhu'
3、 協處理器卸載
同樣是3步
disable 'mytable'
alter 'mytable',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'mytable'
案例(二級索引)
說明:二狗子是王寶強的粉絲
關注表:二狗子關注了王寶強 rowKey='ergouzi' cell="star:wangbaoqiang"
put 'guanzhu', 'ergouzi', 'cf:star', 'wangbaoqiang'
粉絲表:二狗子是王寶強的粉絲 rowKey="wangbaoqiang" cell="fensi:ergouzi"
put 'fans', 'wangbaoqiang', 'cf:fensi', 'ergouzi'
java實現代碼
public class HbaseCoprocessorTest extends BaseRegionObserver{ static Configuration conf = HBaseConfiguration.create(); static Connection conn = null; static Table table = null; static { conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); try { conn = ConnectionFactory.createConnection(conf); table = conn.getTable(TableName.valueOf("fans")); } catch (IOException e) { e.printStackTrace(); } } /** * 此方法是在真正的put方法調用之前進行調用 * 參數put為table.put(put)里面的參數put對象,是要進行插入的那條數據 * * 例如:要向關注表里面插入一條數據 姓名:二狗子 關注的明星:王寶強 * shell語句:put 'guanzhu','ergouzi', 'cf:star', 'wangbaoqiang' * * */ @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { //獲取put對象里面的rowkey'ergouzi' byte[] row = put.getRow(); //獲取put對象里面的cell List<Cell> list = put.get("cf".getBytes(), "star".getBytes()); Cell cell = list.get(0); //創建一個新的put對象 Put new_put = new Put(cell.getValueArray()); new_put.addColumn("cf".getBytes(), "fensi".getBytes(), row); table.put(new_put); conn.close(); } }
打成jar包,命名為guanzhu.jar,將其上傳到HDFS目錄/hbase下面
[hadoop@hadoop1 ~]$ hadoop fs -put guanzhu.jar /hbase
打開hbase shell命令,按順序呢執行(提前已經創建好guanzhu和fans表)
hbase(main):001:0> disable 'guanzhu' 0 row(s) in 2.8850 seconds hbase(main):002:0> alter 'guanzhu', METHOD => 'table_att', 'coprocessor' => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoprocessorTest|1001|' Updating all regions with the new schema... 1/1 regions updated. Done. 0 row(s) in 2.7570 seconds hbase(main):003:0> enable 'guanzhu' 0 row(s) in 2.3400 seconds hbase(main):004:0> desc 'guanzhu' Table guanzhu is ENABLED guanzhu, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://myha01/hbase/guanzhu.jar|com.study.hbase.cp.HbaseCoproce ssorTest|1001|'} COLUMN FAMILIES DESCRIPTION {NAME => 'cf', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_ BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BL OCKSIZE => '65536', REPLICATION_SCOPE => '0'} 1 row(s) in 0.0500 seconds hbase(main):005:0> put 'guanzhu', 'ergouzi', 'cf:star', 'wangbaoqiang' 0 row(s) in 0.3050 seconds hbase(main):006:0> scan 'guanzhu' ROW COLUMN+CELL ergouzi column=cf:star, timestamp=1522759023001, value=wangbaoqiang 1 row(s) in 0.0790 seconds hbase(main):007:0> scan 'fans' ROW COLUMN+CELL \x00\x00\x00\x19\x00\x00\x00 column=cf:fensi, timestamp=1522759022996, value=ergouzi \x0C\x00\x07ergouzi\x02cfsta r\x7F\xFF\xFF\xFF\xFF\xFF\xF F\xFF\x04wangbaoqiang 1 row(s) in 0.0330 seconds hbase(main):008:0>