1、開發環境
在進行Hbase開發前,需要安裝JDK、Hadoop和HBase
根據自己的安裝環境修改版本信息,使用Maven構建項目,在pom.xml中添加hbase的依賴
2、初始化配置
設置HBase的配置,如ZooKeeper的地址、端口號等等。可以通過org.apache.hadoop.conf.Configuration.set方法手工設置HBase的配置信息,也可以直接將HBase的hbase-site.xml配置文件引入項目即可。
3、常見API的使用
HBase的常用操作包括建表、插入表數據、刪除表數據、獲取一行數據、表掃描、刪除列族、刪除表等等,下面給出具體代碼。
3.1 創建數據庫表
- // 創建數據庫表
- public static void createTable(String tableName, String[] columnFamilys) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 創建一個數據庫管理員
- HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
- if (hAdmin.tableExists(tableName)) {
- System.out.println(tableName + "表已存在");
- conn.close();
- System.exit(0);
- } else {
- // 新建一個表描述
- HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
- // 在表描述里添加列族
- for (String columnFamily : columnFamilys) {
- tableDesc.addFamily(new HColumnDescriptor(columnFamily));
- }
- // 根據配置好的表描述建表
- hAdmin.createTable(tableDesc);
- System.out.println("創建" + tableName + "表成功");
- }
- conn.close();
- }
3.2 添加一條數據
- // 添加一條數據
- public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value)
- throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 通過rowkey創建一個put對象
- Put put = new Put(Bytes.toBytes(rowKey));
- // 在put對象中設置列族、列、值
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
- // 插入數據,可通過put(List<Put>)批量插入
- table.put(put);
- // 關閉資源
- table.close();
- conn.close();
- }
3.3 獲取一條數
- // 通過rowkey獲取一條數據
- public static void getRow(String tableName, String rowKey) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 通過rowkey創建一個get對象
- Get get = new Get(Bytes.toBytes(rowKey));
- // 輸出結果
- Result result = table.get(get);
- for (Cell cell : result.rawCells()) {
- System.out.println(
- "行鍵:" + new String(CellUtil.cloneRow(cell)) + "\t" +
- "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
- "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
- "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
- "時間戳:" + cell.getTimestamp());
- }
- // 關閉資源
- table.close();
- conn.close();
- }
3.4 全表掃描
- // 全表掃描
- public static void scanTable(String tableName) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 創建一個掃描對象
- Scan scan = new Scan();
- // 掃描全表輸出結果
- ResultScanner results = table.getScanner(scan);
- for (Result result : results) {
- for (Cell cell : result.rawCells()) {
- System.out.println(
- "行鍵:" + new String(CellUtil.cloneRow(cell)) + "\t" +
- "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +
- "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +
- "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
- "時間戳:" + cell.getTimestamp());
- }
- }
- // 關閉資源
- results.close();
- table.close();
- conn.close();
- }
3.5 刪除一條數據
- // 刪除一條數據
- public static void delRow(String tableName, String rowKey) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 刪除數據
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- table.delete(delete);
- // 關閉資源
- table.close();
- conn.close();
- }
-
3.6 刪除多條數據
- // 刪除多條數據
- public static void delRows(String tableName, String[] rows) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 刪除多條數據
- List<Delete> list = new ArrayList<Delete>();
- for (String row : rows) {
- Delete delete = new Delete(Bytes.toBytes(row));
- list.add(delete);
- }
- table.delete(list);
- // 關閉資源
- table.close();
- conn.close();
- }
3.7 刪除列族
- // 刪除列族
- public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 創建一個數據庫管理員
- HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
- // 刪除一個表的指定列族
- hAdmin.deleteColumn(tableName, columnFamily);
- // 關閉資源
- conn.close();
- }
3.8 刪除數據庫表
- // 刪除數據庫表
- public static void deleteTable(String tableName) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 創建一個數據庫管理員
- HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
- if (hAdmin.tableExists(tableName)) {
- // 失效表
- hAdmin.disableTable(tableName);
- // 刪除表
- hAdmin.deleteTable(tableName);
- System.out.println("刪除" + tableName + "表成功");
- conn.close();
- } else {
- System.out.println("需要刪除的" + tableName + "表不存在");
- conn.close();
- System.exit(0);
- }
- }
3.9 追加插入
- // 追加插入(將原有value的后面追加新的value,如原有value=a追加value=bc則最后的value=abc)
- public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value)
- throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 通過rowkey創建一個append對象
- Append append = new Append(Bytes.toBytes(rowKey));
- // 在append對象中設置列族、列、值
- append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
- // 追加數據
- table.append(append);
- // 關閉資源
- table.close();
- conn.close();
- }
-
3.10 符合條件后添加數據
- // 符合條件后添加數據(只能針對某一個rowkey進行原子操作)
- public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 設置需要添加的數據
- Put put = new Put(Bytes.toBytes(rowKey));
- put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
- // 當判斷條件為真時添加數據
- boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck),
- Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);
- // 關閉資源
- table.close();
- conn.close();
- return result;
- }
-
3.11 符合條件后刪除數據
- // 符合條件后刪除數據(只能針對某一個rowkey進行原子操作)
- public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck,
- String valueCheck, String columnFamily, String column) throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 設置需要刪除的delete對象
- Delete delete = new Delete(Bytes.toBytes(rowKey));
- delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));
- // 當判斷條件為真時添加數據
- boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck),
- Bytes.toBytes(valueCheck), delete);
- // 關閉資源
- table.close();
- conn.close();
- return result;
- }
3.12 計數器
- // 計數器(amount為正數則計數器加,為負數則計數器減,為0則獲取當前計數器的值)
- public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount)
- throws IOException {
- // 建立一個數據庫的連接
- Connection conn = ConnectionFactory.createConnection(conf);
- // 獲取表
- HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
- // 計數器
- long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);
- // 關閉資源
- table.close();
- conn.close();
- return result;
- }
4、內置過濾器的使用
HBase為篩選數據提供了一組過濾器,通過這個過濾器可以在HBase中數據的多個維度(行、列、數據版本)上進行對數據的篩選操作,也就是說過濾器最終能夠篩選的數據能夠細化到具體的一個存儲單元格上(由行鍵、列名、時間戳定位)。通常來說,通過行鍵、值來篩選數據的應用場景較多。需要說明的是,過濾器會極大地影響查詢效率。所以,在數據量較大的數據表中,應盡量避免使用過濾器。
下面介紹一些常用的HBase內置過濾器的用法:
1、RowFilter:篩選出匹配的所有的行。使用BinaryComparator可以篩選出具有某個行鍵的行,或者通過改變比較運算符(下面的例子中是CompareFilter.CompareOp.EQUAL)來篩選出符合某一條件的多條數據,如下示例就是篩選出行鍵為row1的一行數據。
- // 篩選出匹配的所有的行
- Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));
2、PrefixFilter:篩選出具有特定前綴的行鍵的數據。這個過濾器所實現的功能其實也可以由RowFilter結合RegexComparator來實現,不過這里提供了一種簡便的使用方法,如下示例就是篩選出行鍵以row為前綴的所有的行。
- // 篩選匹配行鍵的前綴成功的行
- Filter pf = new PrefixFilter(Bytes.toBytes("row"));
3、KeyOnlyFilter:這個過濾器唯一的功能就是只返回每行的行鍵,值全部為空,這對於只關注於行鍵的應用場景來說非常合適,這樣忽略掉其值就可以減少傳遞到客戶端的數據量,能起到一定的優化作用。
- // 返回所有的行鍵,但值全是空
- Filter kof = new KeyOnlyFilter();
4、RandomRowFilter:按照一定的幾率(<=0會過濾掉所有的行,>=1會包含所有的行)來返回隨機的結果集,對於同樣的數據集,多次使用同一個RandomRowFilter會返回不同的結果集,對於需要隨機抽取一部分數據的應用場景,可以使用此過濾器。
- // 隨機選出一部分的行
- Filter rrf = new RandomRowFilter((float) 0.8);
5、InclusiveStopFilter:掃描的時候,我們可以設置一個開始行鍵和一個終止行鍵,默認情況下,這個行鍵的返回是前閉后開區間,即包含起始行,但不包含終止行。如果我們想要同時包含起始行和終止行,那么可以使用此過濾器。
- // 包含了掃描的上限在結果之內
- Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));
6、FirstKeyOnlyFilter:如果想要返回的結果集中只包含第一列的數據,那么這個過濾器能夠滿足要求。它在找到每行的第一列之后會停止掃描,從而使掃描的性能也得到了一定的提升。
- // 篩選出每行的第一個單元格
- Filter fkof = new FirstKeyOnlyFilter();
7、ColumnPrefixFilter:它按照列名的前綴來篩選單元格,如果我們想要對返回的列的前綴加以限制的話,可以使用這個過濾器。
- // 篩選出前綴匹配的列
- Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));
8、ValueFilter:按照具體的值來篩選單元格的過濾器,這會把一行中值不能滿足的單元格過濾掉,如下面的構造器,對於每一行的一個列,如果其對應的值不包含ROW2_QUAL1,那么這個列就不會返回給客戶端。
- // 篩選某個(值的條件滿足的)特定的單元格
- Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));
9、ColumnCountGetFilter:這個過濾器在遇到一行的列數超過我們所設置的限制值的時候,結束掃描操作。
- // 如果突然發現一行中的列數超過設定的最大值時,整個掃描操作會停止
- Filter ccf = new ColumnCountGetFilter(2);
10、SingleColumnValueFilter:用一列的值決定這一行的數據是否被過濾,可對它的對象調用setFilterIfMissing方法,默認的參數是false。其作用是,對於咱們要使用作為條件的列,如果參數為true,這樣的行將會被過濾掉,如果參數為false,這樣的行會包含在結果集中。
- // 將滿足條件的列所在的行過濾掉
- SingleColumnValueFilter scvf = new SingleColumnValueFilter(
- Bytes.toBytes("colfam1"),
- Bytes.toBytes("qual2"),
- CompareFilter.CompareOp.NOT_EQUAL,
- new SubstringComparator("BOGUS"));
- scvf.setFilterIfMissing(true);
-
11、SingleColumnValueExcludeFilter:這個過濾器與第10種過濾器唯一的區別就是,作為篩選條件的列,其行不會包含在返回的結果中。
12、SkipFilter:這是一種附加過濾器,其與ValueFilter結合使用,如果發現一行中的某一列不符合條件,那么整行就會被過濾掉。
- // 發現某一行中的一列需要過濾時,整個行就會被過濾掉
- Filter skf = new SkipFilter(vf);
13、WhileMatchFilter:使用這個過濾器,當遇到不符合設定條件的數據的時候,整個掃描結束。
- // 當遇到不符合過濾器rf設置的條件時,整個掃描結束
- Filter wmf = new WhileMatchFilter(rf);
14. FilterList:可以用於綜合使用多個過濾器。其有兩種關系: Operator.MUST_PASS_ONE表示關系AND,Operator.MUST_PASS_ALL表示關系OR,並且FilterList可以嵌套使用,使得我們能夠表達更多的需求。
- // 綜合使用多個過濾器,AND和OR兩種關系
- List<Filter> filters = new ArrayList<Filter>();
- filters.add(rf);
- filters.add(vf);
- FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);
5、HBase與MapReduce
我們知道,在偽分布式模式和完全分布式模式下的HBase是架構在HDFS之上的,因此完全可以將MapReduce編程框架和HBase結合起來使用。也就是說,將HBase作為底層存儲結構,MapReduce調用HBase進行特殊的處理,這樣能夠充分結合HBase分布式大型數據庫和MapReduce並行計算的優點。
HBase實現了TableInputFormatBase類,該類提供了對表數據的大部分操作,其子類TableInputFormat則提供了完整的實現,用於處理表數據並生成鍵值對。TableInputFormat類將數據表按照Region分割成split,即有多少個Regions就有多個splits,然后將Region按行鍵分成<key,value>對,key值對應與行鍵,value值為該行所包含的數據。
HBase實現了MapReduce計算框架對應的TableMapper類和TableReducer類。其中,TableMapper類並沒有具體的功能,只是將輸入的<key,value>對的類型分別限定為Result和ImmutableBytesWritable。IdentityTableMapper類和IdentityTableReducer類則是上述兩個類的具體實現,其和Mapper類和Reducer類一樣,只是簡單地將<key,value>對輸出到下一個階段。
HBase實現的TableOutputFormat將輸出的<key,value>對寫到指定的HBase表中,該類不會對WAL(Write-Ahead Log)進行操作,即如果服務器發生故障將面臨丟失數據的風險。可以使用MultipleTableOutputFormat類解決這個問題,該類可以對是否寫入WAL進行設置。
為了能使Hadoop集群上運行HBase程序,還需要把相關的類文件引入Hadoop集群上,不然會出現ClassNotFoundException錯誤。其具體方法是可在hadoop的環境配置文件hadoop-env.sh中引入HBASE_HOME和HBase的相關jar包,或者直接將HBase的jar包打包到應用程序文件中。
下例子是將MapReduce和HBase結合起來的WordCount程序,它首先從指定文件中搜集數據,進行統計計算,最后將結果存儲到HBase中。代碼省略。
6、HBase的Bulkload
HBase可以讓我們隨機的、實時的訪問大數據,但是怎樣有效的將數據導入到HBase呢?HBase有多種導入數據的方法,最直接的方法就是在MapReduce作業中使用TableOutputFormat作為輸出,或者使用標准的客戶端API,但是這些都不是非常有效的方法。
如果HDFS中有海量數據要導入HBase,可以先將這些數據生成HFile文件,然后批量導入HBase的數據表中,這樣可以極大地提升數據導入HBase的效率。這就是HBase的Bulkload,即利用MapReduce作業輸出HBase內部數據格式的表數據,然后將生成的StoreFiles直接導入到集群中。與使用HBase API相比,使用Bulkload導入數據占用更少的CPU和網絡資源。兩個表之間的數據遷移也可以使用這種方法。首先將HDFS中的數據文件通過MapReduce任務生成HFile文件,然后將HFile文件導入HBase數據表(該數據表已存在)。