增刪改查工具類
class HbaseUtils { /** * 獲取管理員對象 * * @param conf 對hbase client配置一些參數 * @return 返回hbase的HBaseAdmin管理員對象 */ def getAdmin(conf: Configuration): HBaseAdmin = { val conn = ConnectionFactory.createConnection(conf) conn.getAdmin().asInstanceOf[HBaseAdmin] } /** * 根據指定的管理員,表名,列族名稱創建表 * * @param admin 創建HBaseAdmin對象 * @param tName 需要創建的表名 * @param columnFamilys 列族名稱的集合 */ def createTable(admin: HBaseAdmin, tName: String, columnFamilys: List[String]): Unit = { if (admin.tableExists(TableName.valueOf(tName))) { println("table already exists!") admin.disableTable(tName) admin.deleteTable(tName) } try { val tableDesc = new HTableDescriptor(TableName.valueOf(tName)) columnFamilys.foreach(columnFamilysName => tableDesc.addFamily(new HColumnDescriptor(columnFamilysName))) admin.createTable(tableDesc) println("create table success!") } catch { case e: Exception => e.printStackTrace() } } /** * 單條數據插入 根據表名、rowkey、列族名、列名、值、增加數據 * * @param conf 當前對象的配置信息 * @param tableName 表名 * @param rowKey 行鍵 * @param columnFamily 列族名稱 * @param column 列 * @param value 值 */ def insertData(conf: Configuration, tableName: String, rowKey: String, columnFamily: String, column: String, value: String): Unit = { val con = ConnectionFactory.createConnection(conf) val table = con.getTable(TableName.valueOf(tableName)) val put = new Put(Bytes.toBytes(rowKey)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)) table.put(put) close(table, con) print("數據插入成功") } /** * 批量插入數據 * * @param conf 當前對象的配置信息 * @param tableName 表名 * @param rowKey 行鍵 * @param columnFamily 列族 * @param column 列 * @param value 值 */ def batchInsertData(conf: Configuration, tableName: String, rowKey: String, columnFamily: String, column: String, value: String): Unit = { val con = ConnectionFactory.createConnection(conf) val table: BufferedMutator = con.getBufferedMutator(TableName.valueOf(tableName)) val p = new Put(Bytes.toBytes(rowKey)) p.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)) val mutations = new util.ArrayList[Mutation]() mutations.add(p) table.mutate(mutations) table.flush() if (con != null) con.close() if (table != null) table.close() print("數據插入成功") } /** * 刪除數據 * * @param conf 當前對象的配置信息 * @param tableName 表名 */ def deleteData(conf: Configuration, tableName: String): Unit = { val admin = getAdmin(conf) try { if (admin.tableExists(tableName)) { admin.disableTable(tableName) admin.deleteTable(tableName) } } catch { case e: Exception => e.printStackTrace() } print("刪除數據成功") } /** * 根據指定的配置信息全表掃描指定的表 * * @param conf 配置信息 * @param tableName 表名 * @return Cell單元格數組 */ def getByScan(conf: Configuration, tableName: String): ArrayBuffer[Array[Cell]] = { var arrayBuffer = ArrayBuffer[Array[Cell]]() val scanner = new Scan() val conn = ConnectionFactory.createConnection(conf) val table = conn.getTable(TableName.valueOf(tableName)) val results = table.getScanner(scanner) var res: Result = results.next() while (res != null) { arrayBuffer += res.rawCells() res = results.next() } arrayBuffer } /** * 根據行鍵獲取具體的某一個行 * * @param conf 配置信息 * @param tableName 表名 * @param row 行鍵 * @return Array[Cell] */ def getRow(conf: Configuration, tableName: String, row: String): Array[Cell] = { val con = ConnectionFactory.createConnection(conf) val table = con.getTable(TableName.valueOf(tableName)) val get = new Get(Bytes.toBytes(row)) val res = table.get(get) res.rawCells() } /** * 刪除指定表的指定row數據 * * @param conf 配置信息 * @param tableName 表名 * @param row 行鍵 */ def delRow(conf: Configuration, tableName: String, row: String): Unit = { val con = ConnectionFactory.createConnection(conf) val table = con.getTable(TableName.valueOf(tableName)) table.delete(new Delete(Bytes.toBytes(row))) println("刪除數據成功") } def close(table: Table, con: Connection): Unit = { if (table != null) table.close() if (con != null) con.close() } }
測試用例
class HbaseTest { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "master66") conf.set("hbase.zookeeper.property.clientPort", "2181") val hBaseUtils = new HbaseUtils() val admin = hBaseUtils.getAdmin(conf) /** * 創建表 */ @Test def createTable(): Unit = { // val list = List("family1", "family2")
val list = List("Stat2") hBaseUtils.createTable(admin, "PageViewStream2", list) } /** * 插入數據 */ @Test def insertData(): Unit = { hBaseUtils.insertData(conf, "test2", "rowkey1", "family1", "李四", "lisi2") } /** * 批量插入數據 */ @Test def batchInsertData: Unit = { hBaseUtils.batchInsertData(conf, "test2", "rowkey2", "family2", "name", "lisi") } /** * 獲取指定的一行 */ @Test def getRow: Unit = { val row: Array[Cell] = hBaseUtils.getRow(conf, "test2", "rowkey2") row.foreach(a => { println(new String(a.getRow()) + " " + a.getTimestamp + " " + new String(a.getFamily()) + " " + new String(a.getValue)) }) } /** * 刪除指定的一行 */ @Test def delRow: Unit = { hBaseUtils.delRow(conf, "test2", "rowkey1") } /** * 掃描全表 */ @Test def getByScan: Unit = { val all: ArrayBuffer[Array[Cell]] = hBaseUtils.getByScan(conf, "PageViewStream2") all.foreach(arrBuffer => arrBuffer.foreach(cell => { println(new String(cell.getRowArray, cell.getRowOffset, cell.getRowLength) + "-->Row") println(cell.getTimestamp + "-->timpsstamp ") println(new String(cell.getFamilyArray, cell.getFamilyOffset, cell.getFamilyLength) + "-->family ") println(new String(cell.getValueArray, cell.getValueOffset, cell.getValueLength) + "-->value ") println(new String(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength) + " -->Tags") })) } }