HBase讀寫的幾種方式(一)java篇


1.HBase讀寫的方式概況

主要分為:

  1. 純Java API讀寫HBase的方式;
  2. Spark讀寫HBase的方式;
  3. Flink讀寫HBase的方式;
  4. HBase通過Phoenix讀寫的方式;

第一種方式是HBase自身提供的比較原始的高效操作方式,而第二、第三則分別是Spark、Flink集成HBase的方式,最后一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中調用。

注意:

這里我們使用HBase2.1.2版本,以下代碼都是基於該版本開發的。

2. 純Java API讀寫HBase

2.1 連接HBase

這里我們采用靜態方式連接HBase,不同於2.1.2之前的版本,無需創建HBase線程池,HBase2.1.2提供的代碼已經封裝好,只需創建調用即可:

/**
  * 聲明靜態配置
  */
static Configuration conf = null;
static Connection conn = null;
static {
       conf = HBaseConfiguration.create();
       conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
       conf.set("hbase.zookeeper.property.client", "2181");
       try{
           conn = ConnectionFactory.createConnection(conf);
       }catch (Exception e){
           e.printStackTrace();
       }
}

2.2 創建HBase的表

創建HBase表,是通過Admin來執行的,表和列簇則是分別通過TableDescriptorBuilder和ColumnFamilyDescriptorBuilder來構建。

/**
 * 創建只有一個列簇的表
 * @throws Exception
 */
public static void createTable() throws Exception{
    Admin admin = conn.getAdmin();
    if (!admin.tableExists(TableName.valueOf("test"))){
        TableName tableName = TableName.valueOf("test");
        //表描述器構造器
        TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName);
        //列族描述器構造器
        ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user"));
        //獲得列描述器
        ColumnFamilyDescriptor cfd = cdb.build();
        //添加列族
        tdb.setColumnFamily(cfd);
        //獲得表描述器
        TableDescriptor td = tdb.build();
        //創建表
        admin.createTable(td);
    }else {
        System.out.println("表已存在");
    }
    //關閉連接
}

2.3 HBase表添加數據

通過put api來添加數據

/**
 * 添加數據(多個rowKey,多個列族)
 * @throws Exception
 */
public static void insertMany() throws Exception{
    Table table = conn.getTable(TableName.valueOf("test"));
    List<Put> puts = new ArrayList<Put>();
    Put put1 = new Put(Bytes.toBytes("rowKey1"));
    put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd"));

    Put put2 = new Put(Bytes.toBytes("rowKey2"));
    put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25"));

    Put put3 = new Put(Bytes.toBytes("rowKey3"));
    put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg"));

    Put put4 = new Put(Bytes.toBytes("rowKey4"));
    put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男"));

    puts.add(put1);
    puts.add(put2);
    puts.add(put3);
    puts.add(put4);
    table.put(puts);
    table.close();
}

2.4 刪除HBase的列簇或列

/**
 * 根據rowKey刪除一行數據、或者刪除某一行的某個列簇,或者某一行某個列簇某列
 * @param tableName
 * @param rowKey
 * @throws Exception
 */
public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{
    Table table = conn.getTable(tableName);
    Delete delete = new Delete(Bytes.toBytes(rowKey));
    //①根據rowKey刪除一行數據
    table.delete(delete);
    
    //②刪除某一行的某一個列簇內容
    delete.addFamily(Bytes.toBytes(columnFamily));
    
    //③刪除某一行某個列簇某列的值
    delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
    table.close();
}

2.5 更新HBase表的列

使用Put api直接替換掉即可

/**
 * 根據RowKey , 列簇, 列名修改值
 * @param tableName
 * @param rowKey
 * @param columnFamily
 * @param columnName
 * @param columnValue
 * @throws Exception
 */
public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{
    Table table = conn.getTable(tableName);
    Put put1 = new Put(Bytes.toBytes(rowKey));
    put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue));
    table.put(put1);
    table.close();
}

2.6 HBase查詢

HBase查詢分為get、scan、scan和filter結合。filter過濾器又分為RowFilter(rowKey過濾器)、SingleColumnValueFilter(列值過濾器)、ColumnPrefixFilter(列名前綴過濾器)。

/**
 * 根據rowKey查詢數據
 * @param tableName
 * @param rowKey
 * @throws Exception
 */
public static void getResult(TableName tableName, String rowKey) throws Exception{
    Table table = conn.getTable(tableName);
    //獲得一行
    Get get = new Get(Bytes.toBytes(rowKey));
    Result set = table.get(get);
    Cell[] cells = set.rawCells();
    for (Cell cell: cells){
        System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
    }
    table.close();
}

//過濾器 LESS <  LESS_OR_EQUAL <=   EQUAL =   NOT_EQUAL <>   GREATER_OR_EQUAL >=   GREATER >   NO_OP 排除所有

/**
 * @param tableName
 * @throws Exception
 */
public static void scanTable(TableName tableName) throws Exception{
    Table table = conn.getTable(tableName);
    
    //①全表掃描
    Scan scan1 = new Scan();
    ResultScanner rscan1 = table.getScanner(scan1);
    
    //②rowKey過濾器
    Scan scan2 = new Scan();
    //str$ 末尾匹配,相當於sql中的 %str  ^str開頭匹配,相當於sql中的str%
    RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$"));
    scan2.setFilter(filter);
    ResultScanner rscan2 = table.getScanner(scan2);
    
    //③列值過濾器
    Scan scan3 = new Scan();
    //下列參數分別為列族,列名,比較符號,值
    SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
               CompareOperator.EQUAL, Bytes.toBytes("spark"));
    scan3.setFilter(filter3);
    ResultScanner rscan3 = table.getScanner(scan3);
    
    //列名前綴過濾器
    Scan scan4 = new Scan();
    ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name"));
    scan4.setFilter(filter4);
    ResultScanner rscan4 = table.getScanner(scan4);
    
    //過濾器集合
    Scan scan5 = new Scan();
    FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
              CompareOperator.EQUAL, Bytes.toBytes("spark"));
    ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name"));
    list.addFilter(filter51);
    list.addFilter(filter52);
    scan5.setFilter(list);
    ResultScanner rscan5 = table.getScanner(scan5);
    
    for (Result rs : rscan){
        String rowKey = Bytes.toString(rs.getRow());
        System.out.println("row key :" + rowKey);
        Cell[] cells = rs.rawCells();
        for (Cell cell: cells){
            System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"
                    + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"
                    + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
        }
        System.out.println("-------------------------------------------");
    }
}

3.總結

HBase連接的幾種方式(二)spark篇 查看Spark上讀寫HBase

HBase讀寫的幾種方式(三)flink篇  查看flink上讀寫HBase

github地址:

https://github.com/SwordfallYeung/HBaseDemo

參考資料:

https://hbase.apache.org/book.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM