1.Java代碼實現
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.coprocessor.AggregateImplementation; /** * <p> * 協處理器統計HBase表數據量 * </p> * */ public class HBaseRecordsCounter { /** * HBase API添加協處理器 * */ public static void addCoprocessor(Configuration conf, String tableName) { try {
byte[] tableNameBytes = Bytes.toBytes(tableName);
HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
HTableDescriptor htd = hbaseAdmin.getTableDescriptor(tableNameBytes);
if (!htd.hasCoprocessor(AggregateImplementation.class.getName())) {
hbaseAdmin.disableTable(tableNameBytes);
htd.addCoprocessor(AggregateImplementation.class.getName());
hbaseAdmin.modifyTable(tableNameBytes, htd);
hbaseAdmin.enableTable(tableNameBytes);
}
hbaseAdmin.close();
} catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 統計表數量 * */ public static void exeCount(Configuration conf, String tableName, String family) { try { // 使用hbase提供的聚合coprocessor AggregationClient aggregationClient = new AggregationClient(conf); Scan scan = new Scan(); // 指定掃描列族,唯一值 scan.addFamily(Bytes.toBytes(family)); long start = System.currentTimeMillis(); long rowCount = aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan); System.out .println("Row count: " + rowCount + "; time cost: " + (System.currentTimeMillis() - start) + "ms"); } catch (Throwable e) { e.printStackTrace(); } } public static void main(String[] args) { String tableName = "test"; Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum", "host1,host2,host3"); conf.set("hbase.rootdir", "hdfs://host:8020/hbase"); // 提高RPC通信時長 conf.setLong("hbase.rpc.timeout", 600000); // 設置Scan緩存 conf.setLong("hbase.client.scanner.caching", 1000); addCoprocessor(conf, tableName); exeCount(conf, tableName, "info"); } }
2. 啟用協處理器
啟用協處理器方法1.
啟動全局aggregation,能過操縱所有的表上的數據。通過修改hbase-site.xml這個文件來實現,只需要添加如下代碼:
<property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>
啟用協處理器方法2.
hbase shell添加coprocessor:
disable 'member'
alter 'member',METHOD => 'table_att','coprocessor' => 'hdfs://master24:9000/user/hadoop/jars/test.jar|mycoprocessor.SampleCoprocessor|1001|'
enable 'member'
hbase shell 刪除coprocessor:
disable 'member'
alter 'member',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
enable 'member'