流式大數據計算實踐(5)----HBase使用&SpringBoot集成


一、前言

1、上文中我們搭建好了一套HBase集群環境,這一文我們學習一下HBase的基本操作和客戶端API的使用

二、shell操作

先通過命令進入HBase的命令行操作

/work/soft/hbase-1.2.2/bin/hbase shell

1、建表

create 'test', 'cf'

(1)以上命令是建立一個test表,里面有一個列族cf

(2)與RDS不同,HBase的列不是必須的,當向列族中插入一個單元格數據時,才有了列

2、查看所有表

list

3、查看表屬性

describe 'test'

4、增加列族

alter 'test', 'cf2'

5、插入數據

put 'test', 'row1', 'cf:name', 'jack'

(1)命令解釋:向test表中的row1行插入列族cf,列名name的數據jack

6、查詢行數據

scan 'test', {STARTROW => 'row3'}
scan 'test', {ENDROW => 'row4'}

(1)命令解釋:查找test表中rowkey大於等於row3的數據

(2)命令解釋:查找test表中rowkey小於row4的數據(不包括row4)

7、查詢單元格數據

get 'test', 'row7', 'cf:name'

8、刪除數據

delete 'test', 'row4', 'cf:name'

(1)命令解釋:刪除test表中row4行的cf:name列的單元格數據

三、客戶端API

正常開發中操作HBase多數情況下通過客戶端API操作,我們這里使用Java來操作,jdk要求至少1.7以上,編譯器我這里用的是IntelliJ IDEA

(1)新建一個maven工程

(2)打開pom文件,引入HBase的依賴

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.1</version>
</dependency>

(3)將HBase的相關配置文件引入到我們的maven項目中,拷貝HBase目錄下的hbase-site.xml和Hadoop目錄下的core-site.xml,將兩個文件復制到src/main/resources目錄下

(4)記得將前文中虛擬機的IP和hostname映射配置到寫代碼這台機器的hosts文件中(比如win7的hosts目錄為C:\Windows\System32\drivers\etc)

(5)新建一個類,編寫CRUD的示例代碼,下面代碼用了jdk1.7的一個語法糖:try-with-resources,在try()里面聲明的對象,會自動幫你調用對象的close方法來關閉對象,不用手動調用close(),非常方便

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.net.URISyntaxException;

public class HelloHBase
{
    public static void main(String[] args) throws URISyntaxException
    {
        // 加載HBase的配置
        Configuration configuration = HBaseConfiguration.create();

        // 讀取配置文件
        configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
        configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));

        try (// 創建一個HBase連接
             Connection connection = ConnectionFactory.createConnection(configuration);
             // 獲得執行操作的管理接口
             Admin admin = connection.getAdmin();)
        {
            // 新建一個表名為mytable的表
            TableName tableName = TableName.valueOf("mytable");
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);

            // 新建一個列族名為mycf的列族
            HColumnDescriptor mycf = new HColumnDescriptor("mycf");
            // 將列族添加到表中
            tableDescriptor.addFamily(mycf);
            // 執行建表操作
            createOrOverwrite(admin, tableDescriptor);

            // 設置列族的壓縮方式為GZ
            mycf.setCompactionCompressionType(Compression.Algorithm.GZ);
            // 設置最大版本數量(ALL_VERSIONS實際上就是Integer.MAX_VALUE)
            mycf.setMaxVersions(HConstants.ALL_VERSIONS);
            // 列族更新到表中
            tableDescriptor.modifyFamily(mycf);
            // 執行更新操作
            admin.modifyTable(tableName, tableDescriptor);

            // 新增一個列族
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("newcf");
            hColumnDescriptor.setCompactionCompressionType(Compression.Algorithm.GZ);
            hColumnDescriptor.setMaxVersions(HConstants.ALL_VERSIONS);
            // 執行新增操作
            admin.addColumnFamily(tableName, hColumnDescriptor);

            // 獲取表對象
            Table table = connection.getTable(tableName);

            // 創建一個put請求,用於添加數據或者更新數據
            Put put = new Put(Bytes.toBytes("row1"));
            put.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("jack"));
            table.put(put);

            // 創建一個append請求,用於在數據后面添加內容
            Append append = new Append(Bytes.toBytes("row1"));
            append.add(Bytes.toBytes("mycf"), Bytes.toBytes("name"), Bytes.toBytes("son"));
            table.append(append);

            // 創建一個long類型的列
            Put put1 = new Put(Bytes.toBytes("row2"));
            put1.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), Bytes.toBytes(6L));
            // 創建一個增值請求,將值增加10L
            Increment increment = new Increment(Bytes.toBytes("row2"));
            increment.addColumn(Bytes.toBytes("mycf"), Bytes.toBytes("age"), 10L);
            table.increment(increment);

            // 創建一個查詢請求,查詢一行數據
            Get get = new Get(Bytes.toBytes("row1"));
            // 由於HBase的一行可能非常大,所以限定要取出的列族
            get.addFamily(Bytes.toBytes("mycf"));
            // 創建一個結果請求
            Result result = table.get(get);
            // 從查詢結果中取出name列,然后打印(這里默認取最新版本的值,如果要取其他版本要使用Cell對象)
            byte[] name = result.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));

            // 創建一個掃描請求,查詢多行數據
            Scan scan = new Scan(Bytes.toBytes("row1"));
            // 設置掃描器的緩存數量,遍歷數據時不用發多次請求,默認100,適當的緩存可以提高性能
            scan.setCaching(150);
            // 創建掃描結果,這個時候不會真正從HBase查詢數據,下面的遍歷才是去查詢
            ResultScanner resultScanner = table.getScanner(scan);
            for (Result r : resultScanner)
            {
                String data = Bytes.toString(r.getValue(Bytes.toBytes("mycf"), Bytes.toBytes("name")));
                System.out.println(data);
            }
            // 使用完畢關閉
            resultScanner.close();

            // 創建一個刪除請求
            Delete delete = new Delete(Bytes.toBytes("row2"));
            // 可以自定義一些篩選條件
            delete.addFamily(Bytes.toBytes("age"));
            table.delete(delete);

            // 停用表
            admin.disableTable(tableName);
            // 刪除列族
            admin.deleteColumnFamily(tableName, "mycf".getBytes());
            // 刪除表
            admin.deleteTable(tableName);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("ok");
    }

    public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException
    {
        // 獲取table名
        TableName tableName = table.getTableName();
        // 判斷table是否存在,如果存在則先停用並刪除
        if (admin.tableExists(tableName))
        {
            // 停用表
            admin.disableTable(tableName);
            // 刪除表
            admin.deleteTable(tableName);
        }
        // 創建表
        admin.createTable(table);
    }
}

 四、API的高級用法

上一章介紹了API的基本使用方法,這一章總結一些高級用法

1、過濾器:通過get或者scan查找數據時,經常需要加入一些條件來查找

(1)值過濾器:相當於傳統sql的where column like '%jack%',但是會對所有的列都做過濾,如果需要對單個列過濾,可以使用SingleColumnValueFilter,如果需要查詢值相等的過濾器,可以使用BinaryComparator

CompareFilter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("jack"));
scan.setFilter(filter);

(2)分頁過濾器:相當於傳統sql的limit,但是不能指定起始頁碼,所以需要自己記錄最后一個row key,並通過scan.setStartRow()設置,在做分頁時有個小技巧,如果你通過scan.setStartRow()設置最后一個row key時,下一頁的數據依然會包含上一頁的最后一個數據,所以你可以將最后一個row key的末尾加一個0,就可以不包含最后一個數據了,因為row key是按照字典順序排序的

Filter filter1 = new PageFilter(10L);
scan.setFilter(filter1);

(3)過濾器列表:用於組合多個過濾器,實現復雜一些的查詢場景,注意這個過濾器列表是有順序的,FilterList的第一個參數用來指定多個條件的連接方式(and、or),MUST_PASS_ALL相當於and連接,MUST_PASS_ONE相當於or連接

FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
filterList.addFilter(filter);
filterList.addFilter(filter1);
scan.setFilter(filterList);

(4)還有一些其他的過濾器,使用方法大同小異,比如行鍵過濾器、列過濾器、單元格過濾器,甚至可以自定義過濾器,其他高級用法可以等用到再看

五、SpringBoot集成

1、后台我們用SpringCloud的微服務搭建,本章是用SpringBoot集成HBase環境,SpringBoot項目的搭建非常簡單

2、首先將HBase的相關配置文件引入到我們已經搭建好的的SpringBoot項目中,拷貝HBase目錄下的hbase-site.xml和Hadoop目錄下的core-site.xml、hdfs-site.xml,將三個文件復制到src/main/resources目錄下

3、編寫一個Java配置文件來集成HBase環境

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hadoop.hbase.HbaseTemplate;

import java.net.URISyntaxException;

@Configuration
public class HBaseConfig
{
    @Bean
    public HbaseTemplate hbaseTemplate()
    {
        HbaseTemplate hbaseTemplate = new HbaseTemplate();
        hbaseTemplate.setConfiguration(getConfiguration());
        hbaseTemplate.setAutoFlush(true);
        return hbaseTemplate;
    }

    private org.apache.hadoop.conf.Configuration getConfiguration()
    {
        try
        {
            org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
            configuration.addResource(new Path(ClassLoader.getSystemResource("hdfs-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI()));
            configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI()));
            return configuration;
        }
        catch (URISyntaxException e)
        {
            e.printStackTrace();
        }
        return null;
    }
}

4、上面的配置中將HBase操作的對象注入到Spring中,所以當我們需要操作HBase時,直接使用HbaseTemplate即可,下例中是將一條數據插入到HBase中,可以看出HbaseTemplate高度封裝了CRUD,使用起來更加簡單方便

@Service
public class HBaseService implements IHBaseService
{
    @Autowired
    private HbaseTemplate hbaseTemplate;

    @Override
    public void saveDeviceHeartbeat(String uuid, JSONObject heartObject)
    {
        hbaseTemplate.put("mytable", "row1", "mycf", "uuid", Bytes.toBytes(uuid));
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM