本文內容:利用SpringBoot整合HBase,基於HBaseJavaAPI的二次封裝,可以直接引用jar包使用,目前測試已支持HBase1.1.2和HBase1.4.6兩個版本。下文內容為該項目的使用方式,同時也介紹了HBaseJavaAPI的基本使用。
項目地址:碼雲: https://gitee.com/Yao_Qi/HBaseComponent
HBase 組件接口文檔
基本概念
table: 表
columnFamily:列族,一個表下可以有多個列族,但是不建議設置多個列族,HBase建議設計長窄型的表而不是短寬型。
qualifier:列,一個列族下可以有多列,一個表中的列可以是不對齊的,但是這樣效率不高,同一張表中的列最好是相同的。
cell:一列數據下的一個單元格,一個列下可以有多個單元格,根據版本號區分,默認每次讀取最新版本的數據,cell下的存儲是數據本身。
row: 行,多列數據組成一行,一行中有多個qualifier。
rowKey: 行健,用於唯一標識一行數據,一行下有多列,行健的設計直接關系到查詢的效率。
HBase配置
以下配置為最基礎配置,缺一不可。
HBase:
conf:
quorum: 192.168.80.234:2181,192.168.80.235:2181,192.168.80.241:2181
znodeParent: /hbase-unsecure
#如果有更多配置,寫在config下,例如:
#config:
# key: value
# key: value
如果需要更多配置,需要在config中配置,以key-value的形式書寫。
參數說明
quorum是HBase中zookeeper的配置,znodeParent是HBase配置在zookeeper中的路徑。
簡單示例
引入組件jar包:
<dependency>
<groupId>com.semptian.hbase.component</groupId>
<artifactId>hbase-component</artifactId>
<version>1.0.1-SNAPSHOT</version>
</dependency>
在需要的地方注入HBaseOperations接口,該接口的實現類是HBaseTemplate,通過這個類來操作HBase。
@Autowired
private HBaseOperations hBaseDao;
查詢一條數據,通過rowKey查詢:
public void testQueryTable() {
Result result = hBaseDao.queryByTableNameAndRowKey(
"LBS", 9223372036854775803L);
System.out.println(result.isEmpty());
result.listCells().forEach(cell -> {
System.out.println(
"row:" + Bytes.toLong(CellUtil.cloneRow(cell)) +
",family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
", qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
", value:" + Bytes.toString(CellUtil.cloneValue(cell)));
});
}
表的基本操作
新建表
創建表通過HBaseTemplate就可以實現,HBaseTemplate類中帶有這個方法。
操作示例:
hBaseDao.createTable("HBASE-COMPONENT_1", "CF1", "CF2");
上述代碼創建了一張表,HBASE-COMPONENT_1 是表名,CF1,CF2代表這個表有兩個列族。
如果有多個列族可以往后面加,列族不建議設置很多個。
刪除表
hBaseDao.dropTable("HBASE-COMPONENT_1");
參數是表名,通過表名刪除表。
判斷表是否存在
hBaseDao.tableExists("lbs");
這里的表名是區分大小寫的。返回值:boolean。
新增數據
新增一條數據
需要注意的是在HBase中的存儲的數據是不分格式的,都是以字節數組的形式存儲,因此在存儲一條數據時需要將數據都轉化成字節數組。
String格式的數據能直接轉換為字節數組getBytes(),但是其他格式的數據需要借助工具作轉換。
這里需要格外注意rowKey的格式,用什么格式存就決定了用什么格式取。
hBaseDao.put("HBase-component", "1534154424340", "CF1", "test_1", Bytes.toBytes("testData"));
參數說明:
(1) tableName 目標數據表
(2) rowName rowKey
(3) familyName 列族名
(4) qualifier 列名
(5) data 字節數組類型的數據
這里新增一條數據是填充數據到一個cell中去。
批量新增數據
String rowKey = String.valueOf(System.currentTimeMillis());
Put put = new Put(rowKey.getBytes());
String defaultColumn = "CF1";
String column1 = "col1";
String column2 = "col2";
String column3 = "col3";
String value = "test";
put.addColumn(defaultColumn.getBytes(), column1.getBytes(), value.getBytes());
put.addColumn(defaultColumn.getBytes(), column2.getBytes(), value.getBytes());
put.addColumn(defaultColumn.getBytes(), column3.getBytes(), value.getBytes());
List<Put> putList = new ArrayList<>();
putList.add(put);
putList.add(put);
putList.add(put);
putList.add(put);
putList.add(put);
hBaseDao.putBatch("HBase-component", putList);
批量插入數據就是使用多個Put對象,putBatch(...)方法的參數:表名,putList(多個put的集合)。
注意批量插入數據也都是插入字節數組格式的數據。
刪除數據
刪除一條數據
hBaseDao.delete("HBase-component", "1534210201115", "CF1", "col2");
參數說明:
(1) 表名
(2) rowKey
(3) 列族名
(4) 列名
這里刪除是刪除一個cell下的數據
批量刪除數據
String tableName = "HBase-component";
String rowKey1 = "1534164113922";
String rowKey2 = "1534168248328";
List<Delete> deleteList = new ArrayList<>();
Delete delete = new Delete(rowKey1.getBytes());
Delete delete1 = new Delete(rowKey2.getBytes());
deleteList.add(delete);
deleteList.add(delete1);
hBaseDao.deleteBatch(tableName, deleteList);
批量刪除需要借助Delete對象。
查詢
單條結果查詢
Result result = hBaseDao.queryByTableNameAndRowKey("LBS", 9223372036854775803L);
System.out.println(result.isEmpty());
result.listCells().forEach(cell -> {
System.out.println(
" row:" + Bytes.toLong(CellUtil.cloneRow(cell)) +
" family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
" qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
" value:" + Bytes.toString(CellUtil.cloneValue(cell)));
});
queryByTableNameAndRowKey()該方法是通過表名和rowKey查詢數據,這里的rowKey支持多種類型,Long,double,Integer幾種類型。
至於這里傳什么類型的參數,取決於插入數據時rowKey的類型,雖然HBase里存儲的都是字節數組,但是對類型是敏感的,如果類型對不上可能會出錯。
批量掃描
// 構建scan
Scan scan = new Scan();
// 設置時間戳,計算時間差
Long timeDifference = 2L * 30L * 24L * 60L * 60L * 1000L;
Long endTime = System.currentTimeMillis();
Long fromTime = endTime - timeDifference;
// 設置時間過濾器
FilterList filterList = new FilterList();
Filter startTimeFilter = new SingleColumnValueFilter(
DEFAULT_COLUMN_FAMILY.getBytes(),
DATA_CREATE_TIME.getBytes(),
CompareFilter.CompareOp.GREATER,
Bytes.toBytes(fromTime)
);
Filter endTimeFilter = new SingleColumnValueFilter(
DEFAULT_COLUMN_FAMILY.getBytes(),
DATA_CREATE_TIME.getBytes(),
CompareFilter.CompareOp.LESS,
Bytes.toBytes(endTime)
);
filterList.addFilter(startTimeFilter);
filterList.addFilter(endTimeFilter);
scan.setFilter(filterList);
// 獲取結果集
ResultScanner resultScanner = hBaseTemplate.queryByScan(TABLE_NAME, scan);
// 遍歷結果集
try{
if (resultScanner != null) {
resultScanner.forEach(result -> {
List<Cell> cellList = result.listCells();
...
}
}
}finally{
if (resultScanner != null) {
resultScanner.close();
}
}
批量查詢可以通過queryByScan()方法實現,第一個參數是表名,第二個參數是scan,通過構建不同的scan來查詢,過濾器也是在構建scan對象是添加的,可以添加多個過濾器。
需要注意的是這里的ResultScanner類,在遍歷結果集時需要使用try-finally結構,在使用完resultScanner對象之后關閉該對象。HBase官方文檔上強調了這一點。因此在使用ResultScanner對象時需要格外注意。
常見過濾器:
行健過濾器:RowFilter
列族過濾器:FamilyFilter
值過濾器:ValueFilter
列過濾器:QualifierFilter
單列值過濾器:SingleColumnValueFilter(會返回滿足條件的行)
單列值排除過濾器:SingleColumnExcludeFilter(返回排除了該列的結果,與單列值過濾器相反)
前綴過濾器:PrefixFilter(這個過濾器是針對行健的,在構造方法中傳入字節數組形式的內容,過濾器會去匹配行健)
頁數過濾器:PageFilter(使用pageFilter過濾器的時候需要注意,並不是設置了頁數大小就能返回相應數目的結果)
String tableName = "RECOMMEND_ENGINE_DATA_MODEL";
Scan scan = new Scan();
PageFilter pageFilter = new PageFilter(1);
scan.setFilter(pageFilter);
ResultScanner resultScanner = hBaseDao.queryByScan(tableName, scan);
try{
resultScanner.forEach(result -> {
result.listCells().forEach(cell -> {
// process
});
}finally{
if (resultScanner != null) {
resultScanner.close();
}
}
上面這段代碼中設置了頁面大小為1,預期是返回一條數據,但是結果會返回兩條數據,這時返回的結果數會取決於regionServer的數量。
如果是FilterList,FilterList的順序會影響PageFilter的效果。
一般比較型過濾器,需要用CompareFilter.CompareOp中的比較運算符。所有的過濾器都是用Scan對象去設置。
多過濾器查詢
String tableName = "HBase-component";
Scan scan = new Scan();
PageFilter pageFilter = new PageFilter(1);
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"CF1".getBytes(),
"col1".getBytes(),
CompareFilter.CompareOp.EQUAL,
new SubstringComparator("group"));
singleColumnValueFilter.setFilterIfMissing(true);
FilterList filterList = new FilterList();
filterList.addFilter(singleColumnValueFilter);
filterList.addFilter(pageFilter);
scan.setFilter(filterList);
ResultScanner resultScanner = hBaseDao.queryByScan(tableName, scan);
try {
resultScanner.forEach(result -> {
result.listCells().forEach(cell -> {
System.out.println(
" row:" + Bytes.toString(CellUtil.cloneRow(cell)) +
" family:"+ Bytes.toString(CellUtil.cloneFamily(cell)) +
" qualifier: " + Bytes.toString(CellUtil.cloneQualifier(cell))+
" value:" + Bytes.toString(CellUtil.cloneValue(cell)));
});
});
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
多過濾器需要用到FilterList,也是直接設置到Scan對象中。多過濾器的時候需要注意過濾器的順序問題,例如上面代碼中如果將兩個過濾器調換順序,查詢的結果也是不一樣的。
結果集的映射
在HBase中,默認所有的順序都是按照字母序排列,例如CF1列族下有多個列:col1、col2、col3,那么在遍歷結果集時,listCells()中的cell的順序總是按照列名的字母序來排列的。
所以cellList.get(0)就是對應col1中的數據,cellList.get(1)就是對應col2中的數據,cellList.get(2)就是對應col3中的數據。
如果列名為a、b、c那分別對應的下標為cellList.get(0)、cellList.get(1)、cellList.get(2)