HBase和Phoneix使用示例


HBase操作

基本操作

創建表

Examples:
 
  hbase> create 't1', {NAME => 'f1', VERSIONS => 5}
  hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}
  hbase> # The above in shorthand would be the following:
  hbase> create 't1', 'f1', 'f2', 'f3'
  hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
  hbase> create 't1', 'f1', {SPLITS => ['10', '20', '30', '40']}
  hbase> create 't1', 'f1', {SPLITS_FILE => 'splits.txt'}
  hbase> # Optionally pre-split the table into NUMREGIONS, using
  hbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)
  hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

create 'dbname:newsinfo_anticheat_user_tag_data', 'user', 'device'

獲得表的描述

hbase(main):006:0> describe "dbname:newsinfo_anticheat_user_tag_data"  

插入幾條記錄

put 'dbname:newsinfo_anticheat_user_tag_data', '5a483b8769e9560001f9d1b9_20181224', 'user:phone', '190eb638-185d-3e58-a009-fcd69f67b8ac'
put 'dbname:newsinfo_anticheat_user_tag_data', '596f51ba4e94d9000170e1ff_20181224', 'user:phone', '17086385281'

查看所有數據scan

hbase(main):014:0> scan 'dbname:newsinfo_anticheat_user_tag_data'
ROW                                              COLUMN+CELL        
596f51ba4e94d9000170e1ff_20181223               column=user:phone, timestamp=1545646705395, value=17086385281                       

596f51ba4e94d9000170e1ff_20181224               column=user:phone, timestamp=1545646716425, value=17086385281                      

獲得數據 get

獲得一行的所有數據

get 'dbname:newsinfo_anticheat_user_tag_data', '5a483b8769e9560001f9d1b9_20181224'

獲得某行,某列族的所有數據

get 'dbname:newsinfo_anticheat_user_tag_data', '5a483b8769e9560001f9d1b9_20181224','user'

獲得某行,某列族,某列的所有數據

get 'dbname:newsinfo_anticheat_user_tag_data', '5a483b8769e9560001f9d1b9_20181224','user:dt'

預分區

默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。
命令方式:

create ‘t1’, ‘f1’, {NUMREGIONS => 15, SPLITALGO => ‘HexStringSplit’} 
也可以使用api的方式: 
bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info 
參數: 
test_table是表名 
HexStringSplit 是split 方式 
-c 是分10個region 
-f 是family

這樣就可以將表預先分為15個區,減少數據達到storefile 大小的時候自動分區的時間消耗,並且還有以一個優勢,就是合理設計rowkey 能讓各個region 的並發請求平均分配(趨於均勻) 使IO 效率達到最高,但是預分區需要將filesize 設置一個較大的值,設置哪個參數呢, hbase.hregion.max.filesize 這個值默認是10G 也就是說單個region 默認大小是10G,
這個參數的默認值在0.90 到0.92到0.94.3各版本的變化:256M--1G--10G
但是如果MapReduce Input類型為TableInputFormat 使用hbase作為輸入的時候,就要注意了,每個region一個map,如果數據小於10G 那只會啟用一個map 造成很大的資源浪費,這時候可以考慮適當調小該參數的值,或者采用預分配region的方式,並將檢測如果達到這個值,再手動分配region。

HBase已有表與Phoenix映射

使用phoenix 視圖方式映射

初始創建

查看HBASE 已有表dbname:newsinfo_anticheat_tag_data

hbase(main):003:0> scan ' dbname:newsinfo_anticheat_tag_data'
ROW                                              COLUMN+CELL                                                         
 596f51ba4e94d9000170e1ff_20181223               column=user:dt, timestamp=1545647095916, value=20181223                                                                                       
 596f51ba4e94d9000170e1ff_20181223               column=user:phone, timestamp=1545646705395, value=17086385281                                                                                 
 596f51ba4e94d9000170e1ff_20181224               column=user:dt,

phoenix 4.10 版本后,對列映射做了優化,采用一套新的機制,不在基於列名方式映射到 hbase。如果只做查詢,強烈建議使用phoenix 視圖方式映射,刪除視圖不影響 hbase 源數據,語法如下:

0: jdbc:phoenix:dsrv2.heracles.sohuno.com,dme>
use "dbname";
create view "newsinfo_anticheat_user_tag_data"("ROW" varchar primary key, "user"."dt" varchar , "user"."phone" varchar) ;

把HBASE中的ROW當作主鍵
表名和列族以及列名需要用雙引號括起來,因為HBase是區分大小寫的,如果不用雙引號括起來的話Phoenix在創建表的時候會自動將小寫轉換為大寫字母

Hbase新增列后重新映射

Hbase shell

新添加列user.did_count
put 'dbname:test2', '5a483b8769e9560001f9d1b9_20181224', 'user:did_count', '100'

Phoneix sql

#刪除視圖
drop view "newsinfo_anticheat_user_tag_data";

#重新創建視圖,加上新增列
use "dbname";
create view "newsinfo_anticheat_user_tag_data"("ROW" varchar primary key, "user"."dt" varchar , "user"."phone" varchar, "user"."did_count" varchar) ;

重新查詢有了新數據

使用phoenix 表方式映射

創建映射表

必須要表映射,需要禁用列映射規則(會降低查詢性能),如下:

use "dbname";
create table "newsinfo_anticheat_user_tag_data"("ROW" varchar primary key, "user"."dt" varchar , "user"."phone" varchar)  column_encoded_bytes=0;

注意:刪除映射表時,hbase對應數據也會被刪除,慎用刪除表操作!!!

Phoneix二級索引

創建二級索引

create index my_index2 on MY_TABLE(V1) include(v2);

HBase增加列

put 'dbname:test2', '5a483b8769e9560001f9d1b9_20181224', 'user:did_count', '100'

Phoneix導出csv文件

[@dudbname103113.heracles.sohuno.com ~]$ /opt/work/phoenix-4.13.1-HBase-1.3/bin/sqlline.py --help
usage: sqlline.py [-h] [-v VERBOSE] [-c COLOR] [-fc FASTCONNECT]
                  [zookeepers] [sqlfile]

Launches the Apache Phoenix Client.

positional arguments:
  zookeepers            The ZooKeeper quorum string
  sqlfile               A file of SQL commands to execute

optional arguments:
  -h, --help            show this help message and exit
  -v VERBOSE, --verbose VERBOSE
                        Verbosity on sqlline.
  -c COLOR, --color COLOR
                        Color setting for sqlline.
  -fc FASTCONNECT, --fastconnect FASTCONNECT
                        Fetch all schemas on initial connection

編輯一個sqlfile,寫入待查詢sql語句,如下所示:

[@dudbname103113.heracles.sohuno.com ~]$ cat /home/dbname/data/dev_xdf/phoneix/sqlfile.txt
use "dbname";

select * from "newsinfo_anticheat_blacklist_data" limit 5;

執行命令導出數據到csv文件,如下:

/opt/work/phoenix-4.13.1-HBase-1.3/bin/sqlline.py dsrv2.heracles.sohuno.com,dmeta2.heracles.sohuno.com,drm2.heracles.sohuno.com,dmeta1.heracles.sohuno.com /home/dbname/data/dev_xdf/phoneix/sqlfile.txt >> /home/dbname/data/dev_xdf/phoneix/sqlout.csv

查看導出csv文件內容:

[@dudbname103113.heracles.sohuno.com ~]$ cat /home/dbname/data/dev_xdf/phoneix/sqlout.csv
997/997 (100%) Done
+---------------------------+-----+---------------------------------------+-----------+
|          rowkey           | dt  |                  did                  |  did_dt   |
+---------------------------+-----+---------------------------------------+-----------+
| 51bd61df  |     | 76b46d96-a986-36b6-864c-4558d250e6ad  | 20190114  |
| 58a272ec427ad50001849b2b  |     | dd4768f4-5928-37bc-93d9-d700595434a1  | 20190129  |
| 58c246850c0e580001f4104a  |     | 9a79728e-3d0c-3f39-bad4-40a093ec27ea  | 20190122  |
| 596628720c0e5800018c8d2c  |     | f92979768431f3bc6dcf352ac67e5e5d      | 20190129  |
| 5969a1a5c6e6dd0001a27f35  |     | bb60dc61-69ec-3eaa-9ade-772122c8ac88  | 20190129  |
+---------------------------+-----+---------------------------------------+-----------+

Phoneix查詢

0: jdbc:phoenix:dsrv2.heracles.sohuno.com,dme> select * from "device_tag_data" where "dt"='20190210' and TO_NUMBER("user_count")>10 LIMIT 2;
+------------------------------------------------+-----------+-------------+----------+------------+--------------+-----------+--------------+-----+
|                      ROW                       |    dt     | user_count  | fenshen  | gyroscope  | android_ver  | hotcloud  | ad_exposure  | ad_ |
+------------------------------------------------+-----------+-------------+----------+------------+--------------+-----------+--------------+-----+
| 03215fc6-5520-3e49-b1e5-f6d72024fa62_20190210  | 20190210  | 11          | 1        |            |              | 1         |              |     |
| 0a340e16-f0dd-3be0-8632-8ac1895d2e6a_20190210  | 20190210  | 12          | 1        |            | 8.1.0        | 1         |              |     |
+------------------------------------------------+-----------+-------------+----------+------------+--------------+-----------+--------------+-----+

寫hbase

Spark Bulkload方式

def main(args: Array[String]): Unit = {

  //日期定義
  val today = args(0)
  val hbaseTabName=args(1)
  val hdfsTmpPath = args(2)

  //創建SparkSession
  val sparkconf = new SparkConf().setAppName("UserMetrics")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  val sc = new SparkContext(sparkconf)
  val spark = SparkSessionSingleton.getInstance(sc.getConf)

  //配置hbase參數
  val conf = new Configuration()
  conf.set("hbase.zookeeper.quorum", "zklist")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set("zookeeper.znode.parent", "/hbase ")
  conf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTabName)

  //獲取rdd數據
  val device _rdd = get_device_rdd(spark,today)

  //spark bulkload導用戶指標數據入hbase
  spark_bulkload_to_hbase(device_rdd,conf,hdfsTmpPath)

  sc.stop()
}

def get_device_rdd(spark: SparkSession,today: String) :RDD[(ImmutableBytesWritable, Put)] = {
  val sql = ""
  val data = spark.sql(sql)

  val ad_exposure_click_rdd = data.rdd.map(record => {
    val did = record.getString(0)
    val rowkey = did+"_"+today
    val put = new Put(Bytes.toBytes(rowkey))

    try{
      put.addColumn(Bytes.toBytes("device"), Bytes.toBytes("dt"), Bytes.toBytes(today))
      val user_count = record.get(1)
      put.addColumn(Bytes.toBytes("device"), Bytes.toBytes("user_count"), Bytes.toBytes(user_count.toString))
    }catch {
      case e: Exception => println(s"${e.printStackTrace()}")
    }
    (new ImmutableBytesWritable, put)
  })
  ad_exposure_click_rdd
}

/**
  * spark bulkload導dataframe數據入hbase
  */
def spark_direct_bulkload_to_hbase(rdd: RDD[(ImmutableBytesWritable, Put)], conf: Configuration,path: String): Unit ={
  val job = Job.getInstance(conf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  //save to hbase hfile
  job.getConfiguration.set("mapred.output.dir", path)
  rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}

/**
  * spark 先生成hfiles在調用 bulkload導數據入hbase
  */
def spark_bulkload_to_hbase(rdd: RDD[(ImmutableBytesWritable, Put)], conf: Configuration,path: String,hbaseTabName: String): Unit ={
  val myTable = new HTable(conf, hbaseTabName)
  // Save Hfiles on HDFS
  rdd.saveAsNewAPIHadoopFile(path, classOf[ImmutableBytesWritable], classOf[Result], classOf[TableOutputFormat[ImmutableBytesWritable]], conf)

  //Bulk load Hfiles to Hbase
  val bulkLoader = new LoadIncrementalHFiles(conf)
  bulkLoader.doBulkLoad(new Path(path), myTable)
}

Hbase API方式

data.rdd.foreachPartition(
  partitionRecords => {
    val conn = getHBaseConn(hbaseTabName) // 獲取Hbase連接
    val tName = TableName.valueOf(hbaseTabName)
    val table = conn.getTable(tName)
    partitionRecords.foreach(record => {

      val userid = record.getString(0)
      val rowkey = userid + "_" + today
      val put = new Put(Bytes.toBytes(rowkey))

      try {
        put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("dt"), Bytes.toBytes(today))

        val ad_exposure = record.get(1)
        if (ad_exposure != None && ad_exposure != null) {
          put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("ad_exposure"), Bytes.toBytes(ad_exposure.toString))
        }
        val ad_click_rate = record.get(2)
        if (ad_click_rate != None && ad_click_rate != null) {
          put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("ad_click_rate"), Bytes.toBytes(ad_click_rate.toString))
        }
        Try(table.put(put)).getOrElse(table.close())//將數據寫入HBase,若出錯關閉table
      } catch {
        case e: Exception => println(s"================================ ${e.printStackTrace()}")
      }
    })
    table.close()//分區數據寫入HBase后關閉連接
    conn.close()
  })

Hbase數據導出

Hbase shell

通過查詢條件過濾導出

hbase(main):006:0> scan 'dbname:newsinfo_anticheat_blacklist_data",{COLUMNS => 'user:dt',LIMIT=>1}
ROW                                        COLUMN+CELL               59c925dbcb8e580001ddd21c                  column=user:dt, timestamp=1547805110793, value=20190114                              

導出到文件:

echo " scan 'dbname:newsinfo_anticheat_blacklist_data',{COLUMNS => 'user:dt',LIMIT=>1}" | hbase shell > ./hbase.csv

通過hive導出

有時候我們需要把已存在Hbase中的用戶畫像數據導到hive里面查詢,也就是通過hive就能查到hbase里的數據。但是我又不想使用sqoop或者DataX等工具倒來倒去。這時候可以在hive中創建關聯表的方式來查詢hbase中的數據。
HBase中建表,然后Hive中建一個外部表,這樣當Hive中寫入數據后,HBase中也會同時更新。

用hive映射表訪問hbase數據

在hbase中創建表后,我們只能在hbase shell中使用scan查詢數據,這對於熟悉SQL的使用者不怎么習慣,不過我們可以在hive中創建與hbase表的映射來訪問hbase表中的數據,例子如下:

1.這里hbase中的表dbname: device_tag_data已經存在

hbase(main):067:0> scan "dbname:device_tag_data",LIMIT=>1
ROW                                COLUMN+CELL          
0000039f-2d6e-3140-bb97-0d7294cfa4fe_2019 column=device:dt, timestamp=1547637050603, value=20190114                              
0000039f-2d6e-3140-bb97-0d7294cfa4fe_2019 column=device:gyroscope, timestamp=1547631713244, value=103                                   
0000039f-2d6e-3140-bb97-0d7294cfa4fe_2019 column=device:user_count, timestamp=1547631631653, value=1                                     0000039f-2d6e-3140-bb97-0d7294cfa4fe_2019 column=device:android_ver, timestamp=1547638332436, value=6.0

2.創建hive映射表關聯hbase

CREATE EXTERNAL TABLE dbname.device_tag_data(
key string, 
dt string,
user_count string,
fenshen string,
gyroscope string,
android_ver string,
hotcloud string,
ad_exposure string,
ad_click_rate string
) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES
("hbase.columns.mapping" = 
":key,device:dt,device:user_count,device:fenshen,device:gyroscope,device:android_ver,device:hotcloud,device:ad_exposure,device:ad_click_rate")
TBLPROPERTIES("hbase.table.name" = "dbname:device_tag_data");

主要是配置hbase.table.name和hbase.columns.mapping,一個是hbase表名,一個是hbase字段和hive字段的一一映射,然后就可以從hive中讀寫hbase數據。
注意hbase.columns.mapping后面的字段直接不能出現空格和換行。

3.通過hive查詢數據

0: jdbc:hive2://10.31.103.113:10003/dbname> select * from dbname.device_tag_data where key='0000039f-2d6e-3140-bb97-0d7294cfa4fe_20190114';

注意:
這里我們訪問的dbname.device_tag_data表是虛表,數據是存儲在hbase中的。Hive 與HBase集成,直接從Hive里面連HBase的數據庫進行查詢,雖然沒有做專門的Benchmark, 但總感覺直接對HBase進行查詢操作不怎么靠譜,如果我們要頻繁做很多類型的數據分析,那HBase的壓力一定會倍增。為此我們可以再建立一個新的hive空表, 把查詢出來的數據全部導入到新表當中,以后的所有數據分析操作在新表中完成。

4.創建hive表

CREATE TABLE dbname.device_tag_data2(
key string, 
dt string,
user_count string,
fenshen string,
gyroscope string,
android_ver string,
hotcloud string,
ad_exposure string,
ad_click_rate string
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE 
LOCATION 'hdfs://ns/user/dbname/hive/online/device_tag_data2';

5.將hbase中的表數據加載到本地表

INSERT OVERWRITE TABLE dbname.device_tag_data2 select * from dbname.device_tag_data;

至此大功告成!
以后所有復雜的數據查詢和數據分析都可以在新hive表中完成。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM