Hbase Java API包括協處理器統計行數


package com.zy;
import java.io.IOException;

import org.apache.commons.lang.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseTable {
    // 聲明靜態配置
    private static Configuration conf = HBaseConfiguration.create();
    // 創建表(tableName 表名; family 列族列表)
    public static void createTable(String tableName, String[] familys)
    throws IOException{
      HBaseAdmin admin = new HBaseAdmin(conf);
    if (admin.tableExists(tableName)){
    System.out.println(tableName+" already exists!");
    }
    else {
    HTableDescriptor descr = new HTableDescriptor(TableName.valueOf(tableName));
    for (String family:familys) {
    descr.addFamily(new HColumnDescriptor(family)); //添加列族
    }
    admin.createTable(descr); //建表
    System.out.println(tableName+" created successfully!");
    }
    }
    //插入數據(rowKey rowKey;tableName 表名;family 列族;qualifier 限定名;value 值)
    public static void addData(String tableName, String rowKey, String familyName, String
    columnName, String value)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));//HTable負責跟記錄相關的操作如增刪改查等//
    Put put = new Put(Bytes.toBytes(rowKey));// 設置rowkey
    put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value));
    table.put(put);
    System.out.println("Add data successfully!rowKey:"+rowKey+", column:"+familyName+":"+columnName+", cell:"+value);
    }
    //遍歷查詢hbase表(tableName 表名)
    public static void getResultScann(String tableName) throws IOException {
    Scan scan = new Scan();
    ResultScanner rs = null;
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    try {
    rs = table.getScanner(scan);
    for (Result r : rs) {
    for (KeyValue kv : r.list()) {
    System.out.println("row:" + Bytes.toString(kv.getRow()));
    System.out.println("family:" + Bytes.toString(kv.getFamily()));
    System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
    System.out.println("value:" + Bytes.toString(kv.getValue()));
    System.out.println("timestamp:" + kv.getTimestamp());
    System.out.println("-------------------------------------------");
    }
    }
    } finally {
    rs.close();
    }
    }
    //查詢表中的某一列(
    public static void getResultByColumn(String tableName, String rowKey, String familyName, String
    columnName) throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Get get = new Get(Bytes.toBytes(rowKey));
    get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); //獲取指定列族和列修飾符對應的列
    Result result = table.get(get);
    for (KeyValue kv : result.list()) {
    System.out.println("family:" + Bytes.toString(kv.getFamily()));
    System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
    System.out.println("value:" + Bytes.toString(kv.getValue()));
    System.out.println("Timestamp:" + kv.getTimestamp());
    System.out.println("-------------------------------------------");
    }
    }
    //更新表中的某一列(tableName 表名;rowKey rowKey;familyName 列族名;columnName 列名;value 更新后的值)
    public static void updateTable(String tableName, String rowKey,
    String familyName, String columnName, String value)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Put put = new Put(Bytes.toBytes(rowKey));
    put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
    Bytes.toBytes(value));
    table.put(put);
    System.out.println("update table Success!");
    }
    //刪除指定單元格
    public static void deleteColumn(String tableName, String rowKey,
    String familyName, String columnName) throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
    deleteColumn.deleteColumns(Bytes.toBytes(familyName),
    Bytes.toBytes(columnName));
    table.delete(deleteColumn);
    System.out.println("rowkey:"+rowKey+",column:"+familyName+":"+columnName+" deleted!");
    }
    //刪除指定的行
    public static void deleteAllColumn(String tableName, String rowKey)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
    table.delete(deleteAll);
    System.out.println("rowkey:"+rowKey+" are all deleted!");
    }
    //刪除表(tableName 表名)
    public static void deleteTable(String tableName) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf);
    admin.disableTable(tableName);
    admin.deleteTable(tableName);
    System.out.println(tableName + " is deleted!");
    }


    //統計行數
    public void RowCount(String tablename) throws Exception,Throwable{
        //提前創建conf
        HBaseAdmin admin = new HBaseAdmin(conf);
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加協處理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //計時
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        //提高RPC通信時長
        conf.setLong("hbase.rpc.timeout", 600000);
        //設置Scan緩存
        conf.setLong("hbase.client.scanner.caching", 1000);
        Configuration configuration = HBaseConfiguration.create(conf);
        AggregationClient aggregationClient = new AggregationClient(configuration);
        Scan scan = new Scan();
        long rowCount = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
        System.out.println(" rowcount is " + rowCount);
        System.out.println("統計耗時:"+stopWatch.getTime());
        }

    public static void main(String[] args) throws Exception {
    // 創建表
    String tableName = "test";
    String[] family = { "f1", "f2" };
    createTable(tableName, family);
    // 為表插入數據
    String[] rowKey = {"r1", "r2"};
    String[] columnName = { "c1", "c2", "c3" };
    String[] value = {"value1", "value2", "value3", "value4", "value5", "value6",};
    addData(tableName,rowKey[0],family[0],columnName[0],value[0]);
    addData(tableName,rowKey[0],family[0],columnName[1],value[1]);
    addData(tableName,rowKey[0],family[1],columnName[2],value[2]);
    addData(tableName,rowKey[1],family[0],columnName[0],value[3]);
    addData(tableName,rowKey[1],family[0],columnName[1],value[4]);
    addData(tableName,rowKey[1],family[1],columnName[2],value[5]);
    // 掃描整張表
    getResultScann(tableName);
    // 更新指定單元格的值
    updateTable(tableName, rowKey[0], family[0], columnName[0], "update value");
    // 查詢剛更新的列的值
    getResultByColumn(tableName, rowKey[0], family[0], columnName[0]);
    // 刪除一列
    deleteColumn(tableName, rowKey[0], family[0], columnName[1]);
    // 再次掃描全表
    getResultScann(tableName);
    // 刪除整行數據
    deleteAllColumn(tableName, rowKey[0]);
    // 再次掃描全表
    getResultScann(tableName);
    // 刪除表
    deleteTable(tableName);
    }

}
如果想要在本地成功運行上述的API Demo,必須滿足如下幾個條件:
1. 新建項目
本小節使用Intellij IDEA作為HBase的開發環境。安裝好工具后需新建一個名為 hbase-test 的maven項
目,並在項目目錄下的 ~/src/main/java/ 目錄下將新建一個 HtableTest.java 文件,內容為上述
的API Demo。
2. 導入jar包
將上一章節中獲取的jar包下載到本地,並將上一步新建的項目 hbase-test 與其建立依賴,也就是設定
新建項目 hbase-test 的 classpath ,用於API運行時查找jar包和配置文件。
3. 導入配置文件
若您要在本地進行開發還需要 hbase-site.xml 文件,將配置文件移入resources目錄下。這個文件在集群中任意一台服務器上的
/etc/hbase/conf/ 目錄下。
12. HBase API運行教程
本地的 hbase-site.xml 文件應放在上一步中與項目 hbase-test 建立了依賴的路徑
下。
滿足上述條件后,你就可以運行上述的API Demo了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM