目錄
簡單了解
官方幫助文檔
http://hbase.apache.org/book.html#cp
協處理器出現的原因
HBase作為列族數據庫經常被人詬病的就是無法輕易建立“二級索引”,難執行求和、計數、排序等操作。在0.92版本以前的HBase雖然在數據存儲層集成了MapReduce,能夠有效用於數據表的分布式計算,然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少通訊開銷,從而獲得很好的性能提升。所以HBase在0.92之后引入了協處理器(coprocessors),添加了一些新的特性:能夠輕易建立二次索引、復雜過了長期以及訪問控制等
協處理器的分類
Observer
對數據進行前置或者后置的攔截操作
Endpoint
主要實現數據的一些統計功能,例如 COUNT,SUM,GROUP BY 等等
Phoenix
其實就是使用了大量的協處理器來實現的
協處理器的使用
加載方式
協處理器的加載方式有兩種,靜態加載方式(Static Load) 和 動態加載方式(Dynamic Load)。 靜態加載的協處理器稱之為 System Coprocessor,動態加載的協處理器稱之為 Table Coprocessor
靜態加載
通過修改hbase-site.xml
,然后重啟hbase實現,這是對所有表都有效
cd /export/servers/hbase-1.2.0-cdh5.14.0/conf
vim hbase-site.xml
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
動態加載
動態加載可以對指定表生效
首先在禁用指定表
disable 'mytable'
然后添加aggregation
alter 'mytable', METHOD => 'table_att','coprocessor'=>
'|org.apache.Hadoop.hbase.coprocessor.AggregateImplementation||'
重啟指定表
enable 'mytable'
協處理器的卸載
禁用表
disable 'test'
卸載協處理器
alter 'test',METHOD => 'table_att_unset',NAME => 'coprocessor$1'
啟用表
enable 'test'
協處理器Observer應用實戰
需求
通過協處理器Observer實現hbase當中一張表插入數據,然后通過協處理器,將數據復制一份保存到另外一張表當中去,但是只取當第一張表當中的部分列數據保存到第二張表當中去
步驟
一、HBase當中創建第一張表proc1和第二張表proc2
create 'proc1','info'
create 'proc2','info'
二、開發HBase的協處理器
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import java.io.IOException;
public class ObserverProcessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
//連接到HBase
Configuration configuration = HBaseConfiguration.create();
//設置連接配置
configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
//
Connection connection = ConnectionFactory.createConnection(configuration);
Cell nameCell = put.get("info".getBytes(), "name".getBytes()).get(0);
Put put1 = new Put(put.getRow());
put1.add(nameCell);
Table reverseuser = connection.getTable(TableName.valueOf("proc2"));
reverseuser.put(put1);
reverseuser.close();
}
}
三、將java打成Jar包,上傳到HDFS
先將jar包上傳到linux的/export/servers
路徑下,然后執行以下命令
mv original-day12_HBaseANDMapReduce-1.0-SNAPSHOT.jar processor.jar
hdfs dfs -mkdir -p /processor
hdfs dfs -put processor.jar /processor
四、將jar包掛載到proc1表
在hbase shell
執行以下命令
describe 'proc1'
alter 'proc1',METHOD => 'table_att','Coprocessor'=>'hdfs://node01:8020/processor/processor.jar|cn.itcast.mr.demo7.ObserverProcessor|1001|'
describe 'proc1'
五、用JavaAPI想proc1表中添加數據
/** * */
@Test
public void testPut() throws Exception{
//獲取連接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01,node02");
Connection connection = ConnectionFactory.createConnection(configuration);
Table user5 = connection.getTable(TableName.valueOf("proc1"));
Put put1 = new Put(Bytes.toBytes("hello_world"));
put1.addColumn(Bytes.toBytes("info"),"name".getBytes(),"helloworld".getBytes());
put1.addColumn(Bytes.toBytes("info"),"gender".getBytes(),"abc".getBytes());
put1.addColumn(Bytes.toBytes("info"),"nationality".getBytes(),"test".getBytes());
user5.put(put1);
byte[] row = put1.getRow();
System.out.println(Bytes.toString(row));
user5.close();
}
六、查看proc1和proc2表的數據
七、如果要卸載協處理器
disable 'proc1'
alter 'proc1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'proc1'