一、將HBase的jar包及hbase-site.xml添加到IDE
1、到安裝HBase集群的任意一台機器上找到HBase的安裝目錄,到lib目錄下下載HBase需要的jar包,然后再到conf目錄下下載hbase-site.xml。
2、在ide中新建一個java項目,然后再右擊"項目名",新建2個文件夾,分別是"lib"和"conf"
3、將1步驟中下載的jar包放到2步驟中的lib目錄下,並且將hbase-site.xml放到conf目錄下,並將2個文件夾添加到classpath下。
二、使用HBase的基本API操作HBase
通過編碼(java)的形式對HBase進行一系列的管理涉及到對表的管理、數據的操作等。
下面這段是公共代碼片段:
private static Configuration conf = null;
static {
Configuration HBASE_CONFIG = new Configuration();
// 與hbase/conf/hbase-site.xml 中 hbase.zookeeper.quorum 配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.quorum", "c7004,c7003,c7005");
// 與hbase/conf/hbase-site.xml 中 hbase.zookeeper.property.clientPort 配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
conf = HBaseConfiguration.create(HBASE_CONFIG);
}
1、對表的創建、刪除、顯示以及修改等,可以用Admin, 一旦創建了表,那么可以通過HTable的 實例來訪問表,每次可以往表里增加數據。
/**
* 創建表
* @param tableName 表名稱
* @param familys 列族
* @param force 是否強制建表
* @throws Exception
*/
public static void creatTable(String tableName, String[] familys,boolean force) throws Exception {
//建立連接
Connection conn = ConnectionFactory.createConnection(conf);
//表管理類
Admin admin = conn.getAdmin();
if (admin.tableExists(TableName.valueOf(tableName))) {
if(force){
//禁用表
admin.disableTable(TableName.valueOf(tableName));
//刪除表
admin.deleteTable(TableName.valueOf(tableName));
System.out.println("開始創建表!");
}else{
System.out.println("表已存在!");
}
} else {
//定義表名
HTableDescriptor tblDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (int i = 0; i < familys.length; i++) {
//定義列族
HColumnDescriptor clmDesc = new HColumnDescriptor(familys[i]);
//將列族添加到表中
tblDesc.addFamily(clmDesc);
}
//執行建表
admin.createTable(tblDesc);
System.out.println("創建表" + tableName + "成功!");
}
//關閉表管理
admin.close();
//關閉連接
conn.close();
}
2、插入數據
創建一個Put對象,在這個Put對象里可以指定要給哪個列增加數據,以及當前的時間戳等值,然后通過調用HTable.put(Put)來 提交操作,子猴在這里提請注意的是:在創建Put對象的時候,你必須指定一個行(Row)值,在構造Put對象的時候作為參數傳入。
/**
* 向表中插入數據
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @param value
* @throws IOException
*/
public static void insertData2Tbl(String tableName,String rowKey,String family,String qualifier,String value) throws IOException{
//建立連接
Connection conn = ConnectionFactory.createConnection(conf);
//建立表連接
Table tbl=conn.getTable(TableName.valueOf(tableName));
//用行鍵實例化Put
Put put=new Put(rowKey.getBytes());
//指定列族名、列名和值
put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes());
//執行put
tbl.put(put);
//關閉表
tbl.close();
//關閉連接
conn.close();
}
3、獲取數據
要獲取數據,使用Get對 象,Get對象同Put對象一樣有好幾個構造函數,通常在構造的時候傳入行值,表示取第幾行的數據,通過HTable.get(Get)來 調用。
/**
* 從表中取值
* @param tableName 表名
* @param rowKey 行鍵
* @param family 列族
* @param qualifier 列
* @return
* @throws IOException
*/
public static String readData4Tbl(String tableName,String rowKey,String family,String qualifier) throws IOException{
//建立連接
Connection conn = ConnectionFactory.createConnection(conf);
//建立表連接
Table tbl=conn.getTable(TableName.valueOf(tableName));
//用行鍵實例化Get
Get get=new Get(rowKey.getBytes());
//增加列族名和列名條件
get.addColumn(family.getBytes(),qualifier.getBytes() );
//執行,返回結果
Result result=tbl.get(get);
//取出結果
String valStr=Bytes.toString(result.getValue(family.getBytes(), qualifier.getBytes()));
//關閉表
tbl.close();
//關閉連接
conn.close();
return valStr;
}
4、掃描
通過Scan可以對表中的行鍵范圍進行瀏覽,得到每一行的信息,比如列名,時間戳等,Scan 相當於一個游標,通過next()來瀏覽下一個,通過調用HTable.getScanner(Scan) 來返回一個ResultScanner對象。HTable.get(Get)和HTable.getScanner(Scan) 都是返回一個Result。 Result是一個KeyValue的鏈表。
/**
* 掃描行鍵范圍取值
* @param tableName 表名
* @param startRow 起始行鍵
* @param stopRow 結束行鍵
* @param family 列族
* @param qualifier 列
* @throws IOException
*/
public static void scanRowRange4Tbl(String tableName,String startRow,String stopRow,String family,String qualifier) throws IOException{
//建立連接
Connection conn = ConnectionFactory.createConnection(conf);
//建立表連接
Table tbl=conn.getTable(TableName.valueOf(tableName));
//初始化Scan實例
Scan scan=new Scan();
//指定開始行鍵
scan.setStartRow(startRow.getBytes());
//指定結束行鍵
scan.setStopRow(stopRow.getBytes());
//增加過濾條件
scan.addColumn(family.getBytes(), qualifier.getBytes());
//返回結果
ResultScanner rss=tbl.getScanner(scan);
//迭代並取出結果
for(Result rs:rss){
String valStr=Bytes.toString(rs.getValue(family.getBytes(), qualifier.getBytes()));
System.out.println(valStr);
}
//關閉表
tbl.close();
//關閉連接
conn.close();
}
5、刪除
使用Delete來 刪除記錄,通過調用HTable.delete(Delete)來 執行刪除操作。(注:刪除這里有些特別,也就是刪除並不是馬上將數據從表中刪除。)
/**
* 刪除行鍵
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void delRowKey4Tbl(String tableName, String rowKey) throws IOException {
// 建立連接
Connection conn = ConnectionFactory.createConnection(conf);
// 建立表連接
Table tbl = conn.getTable(TableName.valueOf(tableName));
// 用行鍵來實例化Delete實例
Delete del = new Delete(rowKey.getBytes());
// 執行刪除
tbl.delete(del);
// 關閉表
tbl.close();
// 關閉連接
conn.close();
}
/**
* 刪除單元格(即列)
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @throws IOException
*/
public static void delCell4Tbl(String tableName, String rowKey, String family, String qualifier) throws IOException {
// 建立連接
Connection conn = ConnectionFactory.createConnection(conf);
// 建立表連接
Table tbl = conn.getTable(TableName.valueOf(tableName));
// 用行鍵來實例化Delete實例
Delete del = new Delete(rowKey.getBytes());
// 增加過濾條件
del.addColumn(family.getBytes(), qualifier.getBytes());
// 執行刪除
tbl.delete(del);
// 關閉表
tbl.close();
// 關閉連接
conn.close();
}
6、鎖
7、新增、獲取、刪除在操作過程中會對所操作的行加一個鎖,而瀏覽卻不會。
8、簇(cluster)的訪問
客戶端代碼通過ZooKeeper來訪問找到簇,也就是說ZooKeeper quorum將被使用,那么相關的類(包)應該在客戶端的類(classes)目錄下,即客戶端一定要找到文件hbase-site.xml。
三、Hbase的高級API
HBase高級API主要分為三類:過濾器、計數器和協處理器。
1、過濾器
在設置Scal、Get的時候有一個setFilter(filter),可以在查詢時添加更多的限制條件,如正則匹配、根據列值進行匹配等。HBase內置了一些常用過濾器,用戶也可以通過實現Filter接口,編寫自己的過濾器。
過濾器是服務器端的操作。它在客戶端被創建,通過RPC傳送到服務器端,然后在服務器端進行過濾操作。
HBase內置常用過濾器如下:
- 行過濾器(RowFilter):基於行鍵過濾數據。
Filter filter=new RowFilter(CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("row-100")));
- 前綴過濾器(PrefixFilter):所有與前綴匹配的行都會返回給客戶端。
Filter filter=new PrefixFilter(Bytes.toBytes("1990"));
- 首次行鍵過濾器(FirstKeyOnlyFilter):在找到所有行第一列的值時,就會返回數據。
Filter filter=new FirstKeyOnlyFilter();
- 單列值過濾器(SingleColumnValueFilter):根據某列的值進行過濾。
Filter filter=new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("qual"),CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("value-100")));
2、計數器
由於HBase沒有二級索引,故統計功能相對較弱。為應對此種情況,HBase使用計數器(counter)用於實時統計的業務場景下。
計數器實際時HBase表中某一列的值,當進行寫操作時,使用Table類的API對該列加1即可。計數器有兩種:單計數據器和多計數器。
- 單計數據器:用戶自己設定計數器的行、列族和列。
// 建立表連接
Table tbl = conn.getTable(TableName.valueOf(tableName));
// 計數器增加1
long a=tbl.incrementColumnValue(
Bytes.toBytes("row-count"),
Bytes.toBytes("info"),
Bytes.toBytes("q1"), 1);//比0大,則按給定值增加計數器的數值,若比0小,則按值減少計數器中的值
// 返回計數器當前值
long b=tbl.incrementColumnValue(
Bytes.toBytes("row-count"),
Bytes.toBytes("info"),
Bytes.toBytes("q1"), 0);//0表示返回當前計數器的值
- 多計數器:允許用戶同時更新多個計數器的值,但這些計數器的值都必須處於同一行。
//用行鍵初始化計數器
Increment increment=new Increment(Bytes.toBytes("row-count"));
//添加多個列
increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q2"), 1);
increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q3"), 1);
Result result=tbl.increment(increment);
// 打印計數器返回的結果
for(KeyValue kv:result.raw()){
System.out.println("KV:"+kv+" Value:"+Bytes.toLong(kv.getValue()));
}
3、協處理器
協處理器(coprocessor)允許用戶在RegionServer上運行自己的代碼。協處理器框架主要有Observer和Endpoint兩大類,用戶可以繼承這些類實現自己的邏輯。
Observer:類似於RDMS的觸發器,可以重寫一些在特定事件發生時執行的回調函數。RegionObserver、MasterObserver和WALObserver三種。RegionObserver可以被用來處理數據修改事件,它發生的地點是Region;MasterObserver可以被用來管理表,如新定義表;WALObserver提供了控制WAL的回調函數。
示例:Observer實現二級索引
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
public class TestCoprocessor extends BaseRegionObserver {
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)throws IOException {
Configuration conf=new Configuration();
Connection conn=ConnectionFactory.createConnection(conf);
//索引表
Table tbl=conn.getTable(TableName.valueOf("idx_tbl"));
//取出要插入的數據
List<Cell> cells=put.get("cf".getBytes(), "info".getBytes());
Iterator<Cell> kvIt=cells.iterator();
while(kvIt.hasNext()){
Cell tmp=kvIt.next();
//用值作為行鍵
Put idxPut=new Put(tmp.getValue());
idxPut.add("cf".getBytes(),tmp.getRow(),Bytes.toBytes(System.currentTimeMillis()));
//插入索引表
tbl.put(idxPut);
}
tbl.close();
conn.close();
}
}
Endpoint:類似於RDMS的存儲過程,允許用戶將自定義操作添加到服務端。
