正文前先來一波福利推薦:
福利一:
百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。
福利二:
畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。
獲取方式:
微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復 百萬年薪架構師 ,精品收藏PPT 獲取雲盤鏈接,謝謝大家支持!
------------------------正文開始---------------------------
之前項目中對於數據詳情的查詢使用的ddb技術,由於成本過高,現考慮使用開源的hbase框架,借此機會進行hbase的代碼案例記錄,之前已經對
hbase的原理進行介紹,介紹了hbase中的rowkey,列,列族,以及存儲原理等,可以參考之前的博客,現只針對hbase的java Api進行分析。
一、連接配置,拿到 connection
/** * 聲明靜態配置 */ private Configuration conf = null; private Connection connection = null; /** * 構造函數 */ public HBaseService(Configuration conf) { this.conf = conf; try { connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { log.error("獲取HBase連接失敗"); } }
@Value("${HBase.nodes}") private String nodes; @Value("${HBase.maxsize}") private String maxsize; @Bean public HBaseService getHbaseService(){ org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum",nodes ); conf.set("hbase.client.keyvalue.maxsize",maxsize); conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 60000); conf.setInt("hbase.rpc.timeout", 20000); conf.setInt("hbase.client.operation.timeout", 30000); conf.setInt("hbase.client.scanner.timeout.period", 20000); conf.setInt("hbase.client.pause", 50); conf.setInt("hbase.client.retries.number", 15); return new HBaseService(conf); }
創建表:
/** * 創建表 * @author gxy * @date 2018/7/3 17:50 * @since 1.0.0 * @param tableName 表名 * @param columnFamily 列族名 list表 * @return void */ public boolean creatTable(String tableName, List<String> columnFamily) { Admin admin = null; try { admin = connection.getAdmin(); //一個表中可以存在多個列族 List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size()); for(String cf : columnFamily) //遍歷所有列族 { familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()); //添加列族的描述 } TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) //添加表的描述 .setColumnFamilies(familyDescriptors) .build(); if (admin.tableExists(TableName.valueOf(tableName))) { log.debug("table Exists!"); } else { admin.createTable(tableDescriptor); log.debug("create table Success!"); } } catch (IOException e) { log.error(MessageFormat.format("創建表{0}失敗",tableName),e); return false; }finally { close(admin,null,null); } return true; } /** * 預分區創建表 * @param tableName 表名 * @param columnFamily 列族名的集合 * @param splitKeys 預分期region * @return 是否創建成功 */ public boolean createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) { Admin admin = null; try { if (StringUtils.isBlank(tableName) || columnFamily == null || columnFamily.size() == 0) { log.error("===Parameters tableName|columnFamily should not be null,Please check!==="); return false; } admin = connection.getAdmin(); if (admin.tableExists(TableName.valueOf(tableName))) { return true; } else { List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size()); for(String cf : columnFamily) { familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build()); } TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(familyDescriptors).build(); //指定splitkeys admin.createTable(tableDescriptor, splitKeys); log.info("===Create Table " + tableName + " Success!columnFamily:" + columnFamily.toString() + "==="); } } catch (IOException e) { log.error("",e); return false; }finally { close(admin,null,null); } return true; } /** * 自定義獲取分區splitKeys */ public static byte[][] getSplitKeys(String[] keys){ if(keys==null){ //默認為10個分區 keys = new String[] { "1|", "2|", "3|", "4|", "5|", "6|", "7|", "8|", "9|" }; } byte[][] splitKeys = new byte[keys.length][]; //升序排序 TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); for(String key : keys){ rows.add(Bytes.toBytes(key)); } Iterator<byte[]> rowKeyIter = rows.iterator(); int i=0; while (rowKeyIter.hasNext()) { byte[] tempRow = rowKeyIter.next(); rowKeyIter.remove(); splitKeys[i] = tempRow; i++; } return splitKeys; }
增加、跟新數據:
/** * 為表添加 or 更新數據 * @author gxy * @date 2018/7/3 17:26 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param columns 列名數組 * @param values 列值得數組 */ public void putData(String tableName,String rowKey, String familyName, String[] columns, String[] values) { // 獲取表 Table table= null; try { table=getTable(tableName); putDataIntoHbase(table,rowKey,tableName,familyName,columns,values); } catch (Exception e) { log.error(MessageFormat.format("為表添加 or 更新數據失敗,tableName:{0},rowKey:{1},familyName:{2}" ,tableName,rowKey,familyName),e); }finally { close(null,null,table); } } /** * 為表添加 or 更新數據 -- 多列 * @author gxy * @date 2018/7/3 17:26 * @since 1.0.0 * @param table Table * @param rowKey rowKey * @param tableName 表名 * @param familyName 列族名 * @param columns 列名數組 * @param values 列值得數組 */ private void putDataIntoHbase(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) { try { //設置rowkey Put put = new Put(Bytes.toBytes(rowKey)); if(columns != null && values != null && columns.length == values.length){ for(int i=0;i<columns.length;i++){ if(columns[i] != null && values[i] != null){ put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i])); }else{ throw new NullPointerException(MessageFormat.format("列名和列數據都不能為空,column:{0},value:{1}" ,columns[i],values[i])); } } } table.put(put); log.debug("putData add or update data Success,rowKey:" + rowKey); table.close(); } catch (Exception e) { log.error(MessageFormat.format("為表添加 or 更新數據失敗,tableName:{0},rowKey:{1},familyName:{2}" ,tableName,rowKey,familyName),e); } } /** * 為表的某個單元格賦值 -- 單列 * @author gxy * @date 2018/7/4 10:20 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param column1 列名 * @param value1 列值 */ public void setColumnValue(String tableName, String rowKey, String familyName, String column1, String value1){ Table table=null; try { // 獲取表 table=getTable(tableName); // 設置rowKey Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column1), Bytes.toBytes(value1)); table.put(put); log.debug("add data Success!"); }catch (IOException e) { log.error(MessageFormat.format("為表的某個單元格賦值失敗,tableName:{0},rowKey:{1},familyName:{2},column:{3}" ,tableName,rowKey,familyName,column1),e); }finally { close(null,null,table); } }
刪除數據:
/** * 刪除指定的單元格 * @author gxy * @date 2018/7/4 11:41 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param columnName 列名 * @return boolean */ public boolean deleteColumn(String tableName, String rowKey, String familyName, String columnName) { Table table=null; Admin admin = null; try { admin = connection.getAdmin(); if(admin.tableExists(TableName.valueOf(tableName))) //判斷表是存在的 { //獲取表 table=getTable(tableName); Delete delete = new Delete(Bytes.toBytes(rowKey)); // 設置待刪除的列 delete.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); //精確到列名 table.delete(delete); log.debug(MessageFormat.format("familyName({0}):columnName({1})is deleted!",familyName,columnName)); } }catch (IOException e) { log.error(MessageFormat.format("刪除指定的列失敗,tableName:{0},rowKey:{1},familyName:{2},column:{3}" ,tableName,rowKey,familyName,columnName),e); return false; }finally { close(admin,null,table); } return true; } /** * 根據rowKey刪除指定的行 * @author gxy * @date 2018/7/4 13:26 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @return boolean */ public boolean deleteRow(String tableName, String rowKey) { Table table=null; Admin admin = null; try { admin = connection.getAdmin(); if(admin.tableExists(TableName.valueOf(tableName))){ // 獲取表 table=getTable(tableName); Delete delete = new Delete(Bytes.toBytes(rowKey)); //精確到rowKey table.delete(delete); log.debug(MessageFormat.format("row({0}) is deleted!",rowKey)); } }catch (IOException e) { log.error(MessageFormat.format("刪除指定的行失敗,tableName:{0},rowKey:{1}" ,tableName,rowKey),e); return false; }finally { close(admin,null,table); } return true; } /** * 根據columnFamily刪除指定的列族 * @author gxy * @date 2018/7/4 13:26 * @since 1.0.0 * @param tableName 表名 * @param columnFamily 列族 * @return boolean */ public boolean deleteColumnFamily(String tableName, String columnFamily) { Admin admin = null; try { admin = connection.getAdmin(); if(admin.tableExists(TableName.valueOf(tableName))) { admin.deleteColumnFamily(TableName.valueOf(tableName), Bytes.toBytes(columnFamily)); //精確到表明中的列族名 log.debug(MessageFormat.format("familyName({0}) is deleted!",columnFamily)); } }catch (IOException e) { log.error(MessageFormat.format("刪除指定的列族失敗,tableName:{0},columnFamily:{1}",tableName,columnFamily),e); return false; }finally { close(admin,null,null); } return true; } /** * 刪除表 * @author gxy * @date 2018/7/3 18:02 * @since 1.0.0 * @param tableName 表名 */ public boolean deleteTable(String tableName){ Admin admin = null; try { admin = connection.getAdmin(); if(admin.tableExists(TableName.valueOf(tableName))) { admin.disableTable(TableName.valueOf(tableName)); //首先disable掉table admin.deleteTable(TableName.valueOf(tableName)); log.debug(tableName + "is deleted!"); } }catch (IOException e) { log.error(MessageFormat.format("刪除指定的表失敗,tableName:{0}" ,tableName),e); return false; }finally { close(admin,null,null); } return true; }
查詢數據:
/** * 獲取table * @param tableName 表名 * @return Table * @throws IOException IOException */ private Table getTable(String tableName) throws IOException { return connection.getTable(TableName.valueOf(tableName)); } /** * 查詢庫中所有表的表名 */ public List<String> getAllTableNames() { List<String> result = new ArrayList<>(); Admin admin = null; try { admin = connection.getAdmin(); TableName[] tableNames = admin.listTableNames(); for(TableName tableName : tableNames) { result.add(tableName.getNameAsString()); } }catch (IOException e) { log.error("獲取所有表的表名失敗",e); }finally { close(admin,null,null); } return result; } /** * 遍歷查詢指定表中的所有數據 * @author gxy * @date 2018/7/3 18:21 * @since 1.0.0 * @param tableName 表名 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScanner(String tableName){ Scan scan = new Scan(); return this.queryData(tableName,scan); } /** * 根據startRowKey和stopRowKey遍歷查詢指定表中的所有數據 * @author gxy * @date 2018/7/4 18:21 * @since 1.0.0 * @param tableName 表名 * @param startRowKey 起始rowKey * @param stopRowKey 結束rowKey * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){ Scan scan = new Scan(); if(StringUtils.isNoneBlank(startRowKey) && StringUtils.isNoneBlank(stopRowKey)){ scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(stopRowKey)); } return this.queryData(tableName,scan); } /** * 通過行前綴過濾器查詢數據 * @author gxy * @date 2018/7/4 18:21 * @since 1.0.0 * @param tableName 表名 * @param prefix 以prefix開始的行鍵 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){ Scan scan = new Scan(); if(StringUtils.isNoneBlank(prefix)){ Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); scan.setFilter(filter); } return this.queryData(tableName,scan); } /** * 通過列前綴過濾器查詢數據 * @author gxy * @date 2018/7/4 18:21 * @since 1.0.0 * @param tableName 表名 * @param prefix 以prefix開始的列名 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScannerColumnPrefixFilter(String tableName, String prefix){ Scan scan = new Scan(); if(StringUtils.isNoneBlank(prefix)){ Filter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix)); scan.setFilter(filter); } return this.queryData(tableName,scan); } /** * 查詢行鍵中包含特定字符的數據 * @author gxy * @date 2018/7/4 18:21 * @since 1.0.0 * @param tableName 表名 * @param keyword 包含指定關鍵詞的行鍵 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScannerRowFilter(String tableName, String keyword){ Scan scan = new Scan(); if(StringUtils.isNoneBlank(keyword)){ Filter filter = new RowFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword)); scan.setFilter(filter); } return this.queryData(tableName,scan); } /** * 查詢列名中包含特定字符的數據 * @author gxy * @date 2018/7/4 18:21 * @since 1.0.0 * @param tableName 表名 * @param keyword 包含指定關鍵詞的列名 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ public Map<String,Map<String,String>> getResultScannerQualifierFilter(String tableName, String keyword){ Scan scan = new Scan(); if(StringUtils.isNoneBlank(keyword)){ Filter filter = new QualifierFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword)); scan.setFilter(filter); } return this.queryData(tableName,scan); } /** * 通過表名以及過濾條件查詢數據 * @author gxy * @date 2018/7/4 16:13 * @since 1.0.0 * @param tableName 表名 * @param scan 過濾條件 * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> */ private Map<String,Map<String,String>> queryData(String tableName,Scan scan){ //<rowKey,對應的行數據> Map<String,Map<String,String>> result = new HashMap<>(); ResultScanner rs = null; // 獲取表 Table table= null; try { table = getTable(tableName); rs = table.getScanner(scan); for (Result r : rs) { //每一行數據 Map<String,String> columnMap = new HashMap<>(); String rowKey = null; for (Cell cell : r.listCells()) { if(rowKey == null){ rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()); } columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } if(rowKey != null){ result.put(rowKey,columnMap); } } }catch (IOException e) { log.error(MessageFormat.format("遍歷查詢指定表中的所有數據失敗,tableName:{0}" ,tableName),e); }finally { close(null,rs,table); } return result; } /** * 根據tableName和rowKey精確查詢一行的數據 * @author gxy * @date 2018/7/3 16:07 * @since 1.0.0 * @param tableName 表名 * @param rowKey 行鍵 * @return java.util.Map<java.lang.String,java.lang.String> 返回一行的數據 */ public Map<String,String> getRowData(String tableName, String rowKey){ //返回的鍵值對 Map<String,String> result = new HashMap<>(); Get get = new Get(Bytes.toBytes(rowKey)); // 獲取表 Table table= null; try { table = getTable(tableName); Result hTableResult = table.get(get); //table通過封裝rowKey的Get 獲得具體的行 拿到 Cell if (hTableResult != null && !hTableResult.isEmpty()) { for (Cell cell : hTableResult.listCells()) { // System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())); // System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); // System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); // System.out.println("Timestamp:" + cell.getTimestamp()); // System.out.println("-------------------------------------------"); result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } }catch (IOException e) { log.error(MessageFormat.format("查詢一行的數據失敗,tableName:{0},rowKey:{1}" ,tableName,rowKey),e); }finally { close(null,null,table); } return result; } /** * 根據tableName、rowKey、familyName、column查詢指定單元格的數據 * @author gxy * @date 2018/7/4 10:58 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param columnName 列名 * @return java.lang.String */ public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){ String str = null; Get get = new Get(Bytes.toBytes(rowKey)); // 獲取表 Table table= null; try { table = getTable(tableName); Result result = table.get(get); if (result != null && !result.isEmpty()) { Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); if(cell != null){ str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } } } catch (IOException e) { log.error(MessageFormat.format("查詢指定單元格的數據失敗,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}" ,tableName,rowKey,familyName,columnName),e); }finally { close(null,null,table); } return str; } /** * 根據tableName、rowKey、familyName、column查詢指定單元格多個版本的數據 * @author gxy * @date 2018/7/4 11:16 * @since 1.0.0 * @param tableName 表名 * @param rowKey rowKey * @param familyName 列族名 * @param columnName 列名 * @param versions 需要查詢的版本數 * @return java.util.List<java.lang.String> */ public List<String> getColumnValuesByVersion(String tableName, String rowKey, String familyName, String columnName,int versions) { //返回數據 List<String> result = new ArrayList<>(versions); // 獲取表 Table table= null; try { table = getTable(tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); //讀取多少個版本 get.readVersions(versions); Result hTableResult = table.get(get); if (hTableResult != null && !hTableResult.isEmpty()) { for (Cell cell : hTableResult.listCells()) { result.add(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } } } catch (IOException e) { log.error(MessageFormat.format("查詢指定單元格多個版本的數據失敗,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}" ,tableName,rowKey,familyName,columnName),e); }finally { close(null,null,table); } return result; }
關閉流:
/** * 關閉流 */ private void close(Admin admin, ResultScanner rs, Table table) { if(admin != null){ try { admin.close(); //關閉 admin } catch (IOException e) { log.error("關閉Admin失敗",e); } } if(rs != null){ //關閉 ResultScanner rs.close(); } if(table != null){ try { table.close(); //關閉 table } catch (IOException e) { log.error("關閉Table失敗",e); } } }