HBase操作工具類


聲明

本文轉自:https://www.cnblogs.com/jonban/p/10805971.html

正文

添加依賴

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.5</version>
        </dependency>

工具類

package javax.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

/**
 * Hbase 操作工具類
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-03
 *
 */
public class HbaseUtils {

    // ===============Common=====================================

    /**
     * 根據表名獲取Table對象
     * 
     * @param name 表名,必要時可指定命名空間,比如:“default:user”
     * @return Hbase Table 對象
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static Table getTable(String name) throws IOException {
        TableName tableName = TableName.valueOf(name);
        Connection connection = ConnectionFactory.createConnection();
        return connection.getTable(tableName);
    }

    // =============== Put =====================================

    /**
     * 根據rowKey生成一個Put對象
     * 
     * @param rowKey rowKey
     * @return Put對象
     */
    public static Put createPut(String rowKey) {
        return new Put(Bytes.toBytes(rowKey));
    }

    /**
     * 在Put對象上增加Cell
     * 
     * @param put Put對象
     * @param cell cell對象
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void addCellOnPut(Put put, Cell cell) throws IOException {
        put.add(cell);
    }

    /**
     * 在Put對象上增加值
     * 
     * @param put Put對象
     * @param family 列簇
     * @param qualifier 列
     * @param value 字符串類型的值
     */
    public static void addValueOnPut(Put put, String family, String qualifier, String value) {
        addValueOnPut(put, family, qualifier, Bytes.toBytes(value));
    }

    /**
     * 在Put對象上增加值
     * 
     * @param put Put對象
     * @param family 列簇
     * @param qualifier 列
     * @param value 字節數組類型的值,可以是任意對象序列化而成
     */
    public static void addValueOnPut(Put put, String family, String qualifier, byte[] value) {
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
    }

    /**
     * 在Put對象上增加值
     * 
     * @param put Put對象
     * @param family 列簇
     * @param qualifier 列
     * @param ts Timestamp時間戳
     * @param value 字符串類型的值
     */
    public static void addValueOnPut(Put put, String family, String qualifier, long ts, String value) {
        addValueOnPut(put, family, qualifier, ts, Bytes.toBytes(value));
    }

    /**
     * 在Put對象上增加值
     * 
     * @param put Put對象
     * @param family 列簇
     * @param qualifier 列
     * @param ts Timestamp時間戳
     * @param value 字節數組類型的值,可以是任意對象序列化而成
     */
    public static void addValueOnPut(Put put, String family, String qualifier, long ts, byte[] value) {
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), ts, value);
    }

    /**
     * 按表名插入一個Put對象包含的數據
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param put 要插入的數據對象
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void put(String tableName, Put put) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {

            table.put(put);
        }
    }

    /**
     * 按表名批量插入Put對象包含的數據
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param puts 要插入的數據對象集合
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void put(String tableName, List<Put> puts) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {

            table.put(puts);
        }
    }

    // =============== Get =====================================

    /**
     * 根據rowKey生成一個查詢的Get對象
     * 
     * @param rowKey rowKey
     * @return Get 對象
     */
    public static Get createGet(String rowKey) {
        return new Get(Bytes.toBytes(rowKey));
    }

    /**
     * 對查詢的Get對象增加指定列簇
     * 
     * @param get
     * @param family
     */
    public static void addFamilyOnGet(Get get, String family) {
        get.addFamily(Bytes.toBytes(family));
    }

    /**
     * 對查詢的Get對象增加指定列簇和列
     * 
     * @param get
     * @param family
     * @param qualifier
     */
    public static void addColumnOnGet(Get get, String family, String qualifier) {
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
    }

    /**
     * 根據表名和rowKey查詢結果(包含全部列簇和列)
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param rowKey 查詢rowKey
     * @return 查詢結果Result
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static Result get(String tableName, String rowKey) throws IOException {
        Get get = createGet(rowKey);
        return get(tableName, get);
    }

    /**
     * 根據表名和rowKey數組批量查詢結果(包含全部列簇和列)
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param rowKeys 查詢rowKey數組
     * @return 查詢結果Result數組
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static Result[] get(String tableName, String[] rowKeys) throws IOException {
        List<Get> gets = new ArrayList<Get>();
        for (String rowKey : rowKeys) {
            gets.add(createGet(rowKey));
        }
        return get(tableName, gets);
    }

    /**
     * 根據表名和Get對象查詢結果
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param get Hbase查詢對象
     * @return 查詢結果Result
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static Result get(String tableName, Get get) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {

            return table.get(get);
        }
    }

    /**
     * 根據表名和Get對象數組查詢結果
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param gets 多個Hbase查詢對象組成的數組
     * @return 查詢結果Result數組
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static Result[] get(String tableName, List<Get> gets) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {
            return table.get(gets);
        }
    }

    // =============== Scan =====================================

    /**
     * 根據startRow和stopRow創建掃描對象
     * 
     * @param startRow 掃描開始行,結果包含該行
     * @param stopRow 掃描結束行,結果不包含該行
     * @return Scan對象
     */
    public static Scan createScan(String startRow, String stopRow) {
        Scan scan = new Scan();
        scan.withStartRow(Bytes.toBytes(startRow));
        scan.withStopRow(Bytes.toBytes(stopRow));
        return scan;
    }

    /**
     * 對掃描對象設置列簇
     * 
     * @param scan 掃描對象
     * @param family 列簇
     */
    public static void addFamilyOnScan(Scan scan, String family) {
        scan.addFamily(Bytes.toBytes(family));
    }

    /**
     * 對掃描對象設置列
     * 
     * @param scan 掃描對象
     * @param family 列簇
     * @param qualifier 列簇下對應的列
     */
    public static void addColumnOnScan(Scan scan, String family, String qualifier) {
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
    }

    /**
     * 根據表名和掃描對象掃描數據
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param scan 掃描對象
     * @return 掃描結果集對象ResultScanner
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static ResultScanner scan(String tableName, Scan scan) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {
            return table.getScanner(scan);
        }
    }

    /**
     * 根據表名、開始行和結束行掃描數據(結果包含開始行,不包含結束行,半開半閉區間[startRow, stopRow))
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param startRow 掃描開始行
     * @param stopRow 掃描結束行
     * @return 掃描結果集對象ResultScanner
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static ResultScanner scan(String tableName, String startRow, String stopRow) throws IOException {
        return scan(tableName, createScan(startRow, stopRow));
    }

    // =============== Delete =====================================

    /**
     * 根據rowKey生成一個查詢的Delete對象
     * 
     * @param rowKey rowKey
     * @return Delete對象
     */
    public static Delete createDelete(String rowKey) {
        return new Delete(Bytes.toBytes(rowKey));
    }

    /**
     * 在Delete對象上增加Cell
     * 
     * @param delete Delete對象
     * @param cell cell對象
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void addCellOnDelete(Delete delete, Cell cell) throws IOException {
        delete.add(cell);
    }

    /**
     * 對刪除對象增加指定列簇
     * 
     * @param delete Delete對象
     * @param family 列簇
     */
    public static void addFamilyOnDelete(Delete delete, String family) {
        delete.addFamily(Bytes.toBytes(family));
    }

    /**
     * 對刪除對象增加指定列簇和列
     * 
     * @param delete Delete對象
     * @param family 列簇
     * @param qualifier 列
     */
    public static void addColumnOnDelete(Delete delete, String family, String qualifier) {
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
    }

    /**
     * 按表名刪除一個Delete對象指定的數據
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param delete Delete對象
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void delete(String tableName, Delete delete) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {
            table.delete(delete);
        }
    }

    /**
     * 按表名批量刪除Delete對象集合包含的指定數據
     * 
     * @param tableName 表名,必要時可指定命名空間,比如:“default:user”
     * @param deletes Delete對象集合
     * @throws IOException 有異常拋出,由調用者捕獲處理
     */
    public static void delete(String tableName, List<Delete> deletes) throws IOException {
        try (
                Table table = getTable(tableName);
        ) {
            table.delete(deletes);
        }
    }

}

測試類

package com.java.demo;

import java.util.ArrayList;
import java.util.List;

import javax.utils.HbaseUtils;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

/**
 * Hbase 客戶端測試
 * 
 * @author Logan
 * @version 1.0.0
 * @createDate 2019-05-03
 *
 */
public class HbaseClientDemo {

    /**
     * 向user表中插入數據
     */
    @Test
    public void put() {

        String tableName = "default:user";
        try {

            List<Put> puts = new ArrayList<Put>();
            Put put = HbaseUtils.createPut("key1005");
            HbaseUtils.addValueOnPut(put, "info", "name", "孫悟空");
            HbaseUtils.addValueOnPut(put, "info", "age", "500");
            HbaseUtils.addValueOnPut(put, "info", "address", "花果山");
            // HbaseUtils.put(tableName, put);
            puts.add(put);

            put = HbaseUtils.createPut("key1006");
            HbaseUtils.addValueOnPut(put, "info", "name", "沙悟凈");
            HbaseUtils.addValueOnPut(put, "info", "age", "1000");
            HbaseUtils.addValueOnPut(put, "info", "address", "流沙河");
            puts.add(put);

            HbaseUtils.put(tableName, puts);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 按rowKey批量查詢user表中全部列簇全部列的值
     */
    @Test
    public void getAllFamily() {
        try {
            String tableName = "default:user";
            String[] rowKeys = { "key1001", "key1002", "key1003", "key1005", "key1006" };

            // 按表名和rowKey查詢所有列
            Result[] results = HbaseUtils.get(tableName, rowKeys);
            for (Result result : results) {

                // 打印查詢結果
                printResult(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 按rowKey查詢user表中指定列簇指定列的值
     */
    @Test
    public void get() {
        try {
            String tableName = "default:user";
            String rowKey = "key1002";

            Get get = HbaseUtils.createGet(rowKey);

            HbaseUtils.addColumnOnGet(get, "info", "name");
            HbaseUtils.addColumnOnGet(get, "info", "age");

            // 不存在的列,查詢結果不顯示
            HbaseUtils.addColumnOnGet(get, "info", "address");

            // 如果在增加列后增加已有的列簇,會返回該列簇的全部列數據,覆蓋前邊的增加列
            // HbaseUtils.addFamilyOnGet(get, "info");

            Result result = HbaseUtils.get(tableName, get);
            printResult(result);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void scan() {
        try {
            String tableName = "default:user";
            String startRow = "key1001";
            String stopRow = "key1006";
            ResultScanner resultScanner = HbaseUtils.scan(tableName, startRow, stopRow);
            for (Result result : resultScanner) {
                printResult(result);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 打印查詢結果
     * 
     * @param result 查詢結果對象
     */
    private void printResult(Result result) {
        Cell[] cells = result.rawCells();

        // 從Result中讀取 rowKey
        System.out.println(Bytes.toString(result.getRow()));

        String print = "%s\t %s:%s \t %s";
        for (Cell cell : cells) {

            // 從Cell中取rowKey
            String row = Bytes.toString(CellUtil.cloneRow(cell));
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));

            System.out.println(String.format(print, row, family, qualifier, value));

        }
    }

    /**
     * 刪除指定列
     */
    @Test
    public void deleteColumn() {
        try {
            String tableName = "default:user";
            List<Delete> deletes = new ArrayList<Delete>();
            Delete delete = HbaseUtils.createDelete("key1005");
            HbaseUtils.addColumnOnDelete(delete, "info", "age");
            HbaseUtils.addColumnOnDelete(delete, "info", "address");
            // HbaseUtils.delete(tableName, delete);
            deletes.add(delete);

            delete = HbaseUtils.createDelete("key1006");
            HbaseUtils.addColumnOnDelete(delete, "info", "address");
            deletes.add(delete);

            HbaseUtils.delete(tableName, deletes);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 刪除指定列簇
     */
    @Test
    public void deleteFamily() {
        try {
            String tableName = "default:user";
            List<Delete> deletes = new ArrayList<Delete>();
            Delete delete = HbaseUtils.createDelete("key1005");
            HbaseUtils.addFamilyOnDelete(delete, "info");
            // HbaseUtils.delete(tableName, delete);
            deletes.add(delete);

            delete = HbaseUtils.createDelete("key1006");
            HbaseUtils.addFamilyOnDelete(delete, "info");
            deletes.add(delete);

            HbaseUtils.delete(tableName, deletes);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM