package com.zy; import java.io.IOException; import org.apache.commons.lang.time.StopWatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.util.Bytes; public class HbaseTable { // 聲明靜態配置 private static Configuration conf = HBaseConfiguration.create(); // 創建表(tableName 表名; family 列族列表) public static void createTable(String tableName, String[] familys) throws IOException{ HBaseAdmin admin = new HBaseAdmin(conf); if (admin.tableExists(tableName)){ System.out.println(tableName+" already exists!"); } else { HTableDescriptor descr = new HTableDescriptor(TableName.valueOf(tableName)); for (String family:familys) { descr.addFamily(new HColumnDescriptor(family)); //添加列族 } admin.createTable(descr); //建表 System.out.println(tableName+" created successfully!"); } } //插入數據(rowKey rowKey;tableName 表名;family 列族;qualifier 限定名;value 值) public static void addData(String tableName, String rowKey, String familyName, String columnName, String value) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName));//HTable負責跟記錄相關的操作如增刪改查等// Put put = new Put(Bytes.toBytes(rowKey));// 設置rowkey put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value)); table.put(put); System.out.println("Add data successfully!rowKey:"+rowKey+", column:"+familyName+":"+columnName+", cell:"+value); } //遍歷查詢hbase表(tableName 表名) public static void getResultScann(String tableName) throws IOException { Scan scan = new Scan(); ResultScanner rs = null; HTable table = new HTable(conf, Bytes.toBytes(tableName)); try { rs = table.getScanner(scan); for (Result r : rs) { for (KeyValue kv : r.list()) { System.out.println("row:" + Bytes.toString(kv.getRow())); System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out.println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } } } finally { rs.close(); } } //查詢表中的某一列( public static void getResultByColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); //獲取指定列族和列修飾符對應的列 Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out.println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } } //更新表中的某一列(tableName 表名;rowKey rowKey;familyName 列族名;columnName 列名;value 更新后的值) public static void updateTable(String tableName, String rowKey, String familyName, String columnName, String value) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value)); table.put(put); System.out.println("update table Success!"); } //刪除指定單元格 public static void deleteColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName)); Delete deleteColumn = new Delete(Bytes.toBytes(rowKey)); deleteColumn.deleteColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); table.delete(deleteColumn); System.out.println("rowkey:"+rowKey+",column:"+familyName+":"+columnName+" deleted!"); } //刪除指定的行 public static void deleteAllColumn(String tableName, String rowKey) throws IOException { HTable table = new HTable(conf, Bytes.toBytes(tableName)); Delete deleteAll = new Delete(Bytes.toBytes(rowKey)); table.delete(deleteAll); System.out.println("rowkey:"+rowKey+" are all deleted!"); } //刪除表(tableName 表名) public static void deleteTable(String tableName) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println(tableName + " is deleted!"); } //統計行數 public void RowCount(String tablename) throws Exception,Throwable{ //提前創建conf HBaseAdmin admin = new HBaseAdmin(conf); TableName name=TableName.valueOf(tablename); //先disable表,添加協處理器后再enable表 admin.disableTable(name); HTableDescriptor descriptor = admin.getTableDescriptor(name); String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation"; if (! descriptor.hasCoprocessor(coprocessorClass)) { descriptor.addCoprocessor(coprocessorClass); } admin.modifyTable(name, descriptor); admin.enableTable(name); //計時 StopWatch stopWatch = new StopWatch(); stopWatch.start(); //提高RPC通信時長 conf.setLong("hbase.rpc.timeout", 600000); //設置Scan緩存 conf.setLong("hbase.client.scanner.caching", 1000); Configuration configuration = HBaseConfiguration.create(conf); AggregationClient aggregationClient = new AggregationClient(configuration); Scan scan = new Scan(); long rowCount = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan); System.out.println(" rowcount is " + rowCount); System.out.println("統計耗時:"+stopWatch.getTime()); } public static void main(String[] args) throws Exception { // 創建表 String tableName = "test"; String[] family = { "f1", "f2" }; createTable(tableName, family); // 為表插入數據 String[] rowKey = {"r1", "r2"}; String[] columnName = { "c1", "c2", "c3" }; String[] value = {"value1", "value2", "value3", "value4", "value5", "value6",}; addData(tableName,rowKey[0],family[0],columnName[0],value[0]); addData(tableName,rowKey[0],family[0],columnName[1],value[1]); addData(tableName,rowKey[0],family[1],columnName[2],value[2]); addData(tableName,rowKey[1],family[0],columnName[0],value[3]); addData(tableName,rowKey[1],family[0],columnName[1],value[4]); addData(tableName,rowKey[1],family[1],columnName[2],value[5]); // 掃描整張表 getResultScann(tableName); // 更新指定單元格的值 updateTable(tableName, rowKey[0], family[0], columnName[0], "update value"); // 查詢剛更新的列的值 getResultByColumn(tableName, rowKey[0], family[0], columnName[0]); // 刪除一列 deleteColumn(tableName, rowKey[0], family[0], columnName[1]); // 再次掃描全表 getResultScann(tableName); // 刪除整行數據 deleteAllColumn(tableName, rowKey[0]); // 再次掃描全表 getResultScann(tableName); // 刪除表 deleteTable(tableName); } }
如果想要在本地成功運行上述的API Demo,必須滿足如下幾個條件:
1. 新建項目
本小節使用Intellij IDEA作為HBase的開發環境。安裝好工具后需新建一個名為 hbase-test 的maven項
目,並在項目目錄下的 ~/src/main/java/ 目錄下將新建一個 HtableTest.java 文件,內容為上述
的API Demo。
2. 導入jar包
將上一章節中獲取的jar包下載到本地,並將上一步新建的項目 hbase-test 與其建立依賴,也就是設定
新建項目 hbase-test 的 classpath ,用於API運行時查找jar包和配置文件。
3. 導入配置文件
若您要在本地進行開發還需要
hbase-site.xml 文件,將配置文件移入resources目錄下。這個文件在集群中任意一台服務器上的
/etc/hbase/conf/ 目錄下。
12. HBase API運行教程
本地的 hbase-site.xml 文件應放在上一步中與項目 hbase-test 建立了依賴的路徑
下。
滿足上述條件后,你就可以運行上述的API Demo了。