一、簡介
在上一篇文章 HBase 基礎入門 中,我們已經介紹了 HBase 的一些基本概念,以及如何安裝使用的方法。
那么,作為一名 Javaer,自然是希望用 Java 的方式來與 HBase 進行對話了。
所幸的是,HBase 本身就是用 Java 編寫的,天生自帶了 Java 原生API。 我們可以通過 hbase-client 來實現 HBase 數據庫的操作。
所以,這次主要介紹該組件的基本用法。
在使用 hbase-client 之前,有幾個要點需要注意:
- 客戶端需要能訪問 Zoopkeeper,再獲得 HMaster、RegionServer 實例進行操作
- 客戶端需運行在HBase/Hadoop 集群內,HBase會使用 hostname 來定位節點,因此要求客戶端能訪問到對應的主機名(或子域名)
如果是遠程客戶端則需要配置本地的hosts文件。
下面這個圖,有助於理解 Client 與 HBase 集群的交互架構:

下面開始介紹 client 的使用。
二、hbase-client 引入
在 Maven 的 pom.xml 中添加依賴:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.5</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>2.1.5</version>
</dependency>
這里需要注意的是,客戶端版本和 HBase 版本需要保持一致,否則可能會遇到不兼容的問題。
三、連接操作
示例代碼:
/**
* 建立連接
*
* @return
*/
public static Connection getConnection() {
try {
//獲取配置
Configuration configuration = getConfiguration();
//檢查配置
HBaseAdmin.checkHBaseAvailable(configuration);
return ConnectionFactory.createConnection(configuration);
} catch (IOException | ServiceException e) {
throw new RuntimeException(e);
}
}
/**
* 獲取配置
*
* @return
*/
private static Configuration getConfiguration() {
try {
Properties props = PropertiesLoaderUtils.loadAllProperties("hbase.properties");
String clientPort = props.getProperty("hbase.zookeeper.property.clientPort");
String quorum = props.getProperty("hbase.zookeeper.quorum");
logger.info("connect to zookeeper {}:{}", quorum, clientPort);
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", clientPort);
config.set("hbase.zookeeper.quorum", quorum);
return config;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
四、表操作
增刪改查方法封裝如下:
/**
* 創建表
* @param connection
* @param tableName
* @param columnFamilies
* @throws IOException
*/
public static void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {
Admin admin = null;
try {
admin = connection.getAdmin();
if (admin.tableExists(tableName)) {
logger.warn("table:{} exists!", tableName.getName());
} else {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (String columnFamily : columnFamilies) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamily));
}
admin.createTable(builder.build());
logger.info("create table:{} success!", tableName.getName());
}
} finally {
if (admin != null) {
admin.close();
}
}
}
/**
* 插入數據
*
* @param connection
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @param data
* @throws IOException
*/
public static void put(Connection connection, TableName tableName,
String rowKey, String columnFamily, String column, String data) throws IOException {
Table table = null;
try {
table = connection.getTable(tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
table.put(put);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* 根據row key、column 讀取
*
* @param connection
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @throws IOException
*/
public static String getCell(Connection connection, TableName tableName, String rowKey, String columnFamily, String column) throws IOException {
Table table = null;
try {
table = connection.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
Result result = table.get(get);
List<Cell> cells = result.listCells();
if (CollectionUtils.isEmpty(cells)) {
return null;
}
String value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8");
return value;
} finally {
if (table != null) {
table.close();
}
}
}
/**
* 根據rowkey 獲取一行
*
* @param connection
* @param tableName
* @param rowKey
* @return
* @throws IOException
*/
public static Map<String, String> getRow(Connection connection, TableName tableName, String rowKey) throws IOException {
Table table = null;
try {
table = connection.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
List<Cell> cells = result.listCells();
if (CollectionUtils.isEmpty(cells)) {
return Collections.emptyMap();
}
Map<String, String> objectMap = new HashMap<>();
for (Cell cell : cells) {
String qualifier = new String(CellUtil.cloneQualifier(cell));
String value = new String(CellUtil.cloneValue(cell), "UTF-8");
objectMap.put(qualifier, value);
}
return objectMap;
} finally {
if (table != null) {
table.close();
}
}
}
/**
* 掃描權標的內容
*
* @param connection
* @param tableName
* @param rowkeyStart
* @param rowkeyEnd
* @throws IOException
*/
public static List<Map<String, String>> scan(Connection connection, TableName tableName, String rowkeyStart, String rowkeyEnd) throws IOException {
Table table = null;
try {
table = connection.getTable(tableName);
ResultScanner rs = null;
try {
Scan scan = new Scan();
if (!StringUtils.isEmpty(rowkeyStart)) {
scan.withStartRow(Bytes.toBytes(rowkeyStart));
}
if (!StringUtils.isEmpty(rowkeyEnd)) {
scan.withStopRow(Bytes.toBytes(rowkeyEnd));
}
rs = table.getScanner(scan);
List<Map<String, String>> dataList = new ArrayList<>();
for (Result r : rs) {
Map<String, String> objectMap = new HashMap<>();
for (Cell cell : r.listCells()) {
String qualifier = new String(CellUtil.cloneQualifier(cell));
String value = new String(CellUtil.cloneValue(cell), "UTF-8");
objectMap.put(qualifier, value);
}
dataList.add(objectMap);
}
return dataList;
} finally {
if (rs != null) {
rs.close();
}
}
} finally {
if (table != null) {
table.close();
}
}
}
/**
* 刪除表
*
* @param connection
* @param tableName
* @throws IOException
*/
public static void deleteTable(Connection connection, TableName tableName) throws IOException {
Admin admin = null;
try {
admin = connection.getAdmin();
if (admin.tableExists(tableName)) {
//現執行disable
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
} finally {
if (admin != null) {
admin.close();
}
}
}
五、運行測試
最后,我們仍然以上一篇文章中的設備數據作為例子:
- 建立 DeviceState 表;
- 定義 name/state 兩個列簇;
- 寫入列數據;
- 讀取列、行,范圍讀取;
- 刪除操作
最終實現的代碼如下:
private static final Logger logger = LoggerFactory.getLogger(HBaseTest.class);
public static void main(String[] args) {
Connection connection = null;
try {
connection = getConnection();
TableName tableName = TableName.valueOf("DeviceState");
//創建DeviceState表
createTable(connection, tableName, "name", "state");
logger.info("創建表 {}", tableName.getNameAsString());
//寫入數據
put(connection, tableName, "row1", "name", "c1", "空調");
put(connection, tableName, "row1", "state", "c2", "打開");
put(connection, tableName, "row2", "name", "c1", "電視機");
put(connection, tableName, "row2", "state", "c2", "關閉");
logger.info("寫入數據.");
String value = getCell(connection, tableName, "row1", "state", "c2");
logger.info("讀取單元格-row1.state:{}", value);
Map<String, String> row = getRow(connection, tableName, "row2");
logger.info("讀取單元格-row2:{}", JsonUtil.toJson(row));
List<Map<String, String>> dataList = scan(connection, tableName, null, null);
logger.info("掃描表結果-:\n{}", JsonUtil.toPrettyJson(dataList));
//刪除DeviceState表
deleteTable(connection, tableName);
logger.info("刪除表 {}", tableName.getNameAsString());
logger.info("操作完成.");
} catch (Exception e) {
logger.error("操作出錯", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
logger.error("error occurs", e);
}
}
}
}
執行代碼,控制台輸出如下:
INFO -createTable(HBaseTest.java:89) - create table:[68, 101, 118, 105, 99, 101, 83, 116, 97, 116, 101] success!
INFO -main(HBaseTest.java:32) - 創建表 DeviceState
INFO -main(HBaseTest.java:40) - 寫入數據.
INFO -main(HBaseTest.java:43) - 讀取單元格-row1.state:打開
INFO -main(HBaseTest.java:46) - 讀取單元格-row2:{"c1":"電視機","c2":"關閉"}
INFO -main(HBaseTest.java:49) - 掃描表結果-:
[ {
"c1" : "空調",
"c2" : "打開"
}, {
"c1" : "電視機",
"c2" : "關閉"
} ]
INFO -HBaseAdmin$9.call(HBaseAdmin.java:1380) - Started disable of DeviceState
INFO -HBaseAdmin$DisableTableFuture.postOperationResult(HBaseAdmin.java:1409) - Disabled DeviceState
INFO -HBaseAdmin$DeleteTableFuture.postOperationResult(HBaseAdmin.java:965) - Deleted DeviceState
INFO -main(HBaseTest.java:53) - 刪除表 DeviceState
INFO -main(HBaseTest.java:55) - 操作完成.
此時Java Client已經完成制作。
FAQ
- 提示報錯 找不到winutils程序
Failed to locate the winutils binary in the hadoop binary path
原因是在Windows下依賴一個winutils.exe程序,該程序通過${HADOOP_HOME}/bin 來查找。
該報錯不影響程序執行,但如果要規避問題,需要下載hadoop-commons-master,再配置變量HADOOP_HOME
可參考地址:https://blog.csdn.net/ycf921244819/article/details/81706119
- 提示報錯,UnknownHostException,無法找到節點..
原因是客戶端無法解析HMaster實例節點的主機名
需要編輯 C:\Windows\System32\drivers\etc\hosts 文件,添加對應的映射,如下:
47.xx.8x.xx izwz925kr63w5jitjys6dtt
參考文檔
官方文檔
https://hbase.apache.org/book.html#quickstart
Java HBase客戶端API
https://www.baeldung.com/hbase
