Hive-05 Hive和HBase的集成


 

1. 與Hive的集成

Hive和Hbase在大數據架構中處在不同位置,Hive是一個構建在Hadoop基礎之上的數據倉庫,主要解決分布式存儲的大數據處理和計算問題,Hive提供了類SQL語句,叫HiveQL,

通過它可以使用SQL查詢存放在HDFS上的數據,sql語句最終被轉化為Map/Reduce任務運行,但是Hive不能夠進行交互查詢——它只能夠在Haoop上批量的執行Map/Reduce任務。

Hive適合用來對一段時間內的數據進行分析查詢,例如,用來計算趨勢或者網站的日志。Hive不應該用來進行實時的查詢。因為它需要很長時間才可以返回結果。

Hbase是Hadoop database 的簡稱,是一種NoSQL數據庫,非常適用於海量明細數據(十億、百億)的隨機實時查詢,如交易清單、軌跡行為等。

在大數據架構中,Hive和HBase是協作關系,Hive方便地提供了Hive QL的接口來簡化MapReduce的使用, 而HBase提供了低延遲的數據庫訪問。如果兩者結合,可以利用MapReduce的優勢針對HBase存儲的大量內容進行離線的計算和分析。

Hive和HBase的通信原理

Hive與HBase整合的實現是利用兩者本身對外的API接口互相通信來完成的,

這種相互通信是通過$HIVE_HOME/lib/hive-hbase-handler-*.jar工具類實現的。通過HBaseStorageHandler,Hive可以獲取到Hive表所對應的HBase表名,列簇和列,InputFormat、OutputFormat類,創建和刪除HBase表等。基本通信原理如下:

             

訪問

Hive訪問HBase中HTable的數據,實質上是通過MR讀取HBase的數據,而MR是使用HiveHBaseTableInputFormat完成對表的切分,獲取RecordReader對象來讀取數據的。

對HBase表的切分原則是一個Region切分成一個Split,即表中有多少個Regions,MR中就有多少個Map;

讀取HBase表數據都是通過構建Scanner,對表進行全表掃描,如果有過濾條件,則轉化為Filter。當過濾條件為rowkey時,則轉化為對rowkey的過濾;Scanner通過RPC調用RegionServer的next()來獲取數據。

簡單來說,Hive和Hbase的集成就是,打通了Hive和Hbase,使得Hive中的表創建之后,可以同時是一個Hbase的表,並且在Hive端和Hbase端都可以做任何的操作。

使用場景:

(一)將ETL操作的數據通過Hive加載到HBase中,數據源可以是文件也可以是Hive中的表。

             

(二)Hbae作為Hive的數據源,通過整合,讓HBase支持JOIN、GROUP等SQL查詢語法。

                 

(三)構建低延時的數據倉庫

                   

以上原理來自知乎:https://zhuanlan.zhihu.com/p/74041611

HBase與Hive的集成在最新的兩個版本中無法兼容。所以,我們只能重新編譯:hive-hbase-handler-1.2.2.jar!

將所需要的lib的jar包導入進行編譯; apache-hive-1.2.1-src\hbase-handler\src\java

 

操作Hive的同時對HBase也會產生影響,所以Hive需要持有操作HBase的Jar,那么接下來拷貝Hive所依賴的Jar包(或者使用軟連接的形式)。

 對hive添加的jar添加到這個參數,需重啟;相當於hbase的客戶端-hive,hive和hbase集成所依賴的類

export HIVE_AUX_JARS_PATH=/opt/module/hbase-1.3.1/lib/*

同時在hive-site.xml中修改zookeeper的屬性,如下:

<property>
  <name>hive.zookeeper.quorum</name>
  <value>hadoop101,hadoop102,hadoop103</value>
  <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
  <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>

  建立Hive表,關聯HBase表,插入數據到Hive表的同時能夠影響HBase表。

hive表字段映射到hbase中;用stored by指定下格式類型hbase;指定映射關系;serd是序列化和反序列化;列順序和字段一一對應;:key即rowkey

hive中表刪,hbase中就沒有了; 這樣建立出來的表插入的源數據都是在Hbase里面存放着,當從Hbase中刪除記錄的同時也會刪除在Hive中的數據

Hive建內部表與Hbase進行關聯  -- 文件存儲在Hbase中 -- 

  內部表,這種情況是hbase本來沒有這張表,創建后會在hbase中同樣創建一張表,將來數據也是存放在hbase中的;hdfs的hive表目錄有hive文件夾,但是里面沒有數據。

  這時,如果使用load data 命令導入數據會報錯,因為該表是non-native table

  當hive使用overwrite關鍵字進行插入數據時原本數據不會被刪除,有同樣的行健會被更新覆蓋。因為數據是存在hbase中的,遵守hbase插入數據的規則。

 

  • 當hive刪除hive表時,hbase表也會刪除。
  • 當先刪除hbase的時候,先disabled table,然后drop table;這時hbase表就被刪除了,zookeeper里面也就刪除了。但是hive里面還在,用show tables還能查出來。mysql中TBLS里面還有hive表的信息。但是用select * from hive 查詢的時候報錯,表不存在(TableNotFoundException)然后刪除hive里面的表的時候會報錯TableNotFoundException)。繼續show tables時,發現表已經不在了。TBLS里面也沒有hive表了。

 

0: jdbc:hive2://hadoop101:10000> create table hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") tblproperties ("hbase.table.name"="hbase_emp_table","hbase.mapred.output.outputtable"
= "hbase_emp_table");

說明:

a、 在tblproperties 語句中

  "hbase.table.name"定義的是在hbase中的表名 ,這個屬性是可選的,僅當你想在Hive和Hbase中使用不同名字的表名時才需要填寫,如果使用相同的名字則可以省略;

  "hbase.mapred.output.outputtable"定義的hbase_emp_table是存儲數據表的名稱,指定插入數據時寫入的表,如果以后需要往該表插入數據就需要指定該值,這個可以不要,表數據就存儲在第二個表中了 。

b、stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' :是指定處理的存儲器,就是hive-hbase-handler-*.jar包,要做hive和hbase的集成必須要加上這一句;

c、“hbase.columns.mapping” 是定義在hive表中的字段怎么與hbase的列族進行映射。

  例如:info就是列族,ename就是列。它們之間通過“:”連接。

在hive中創建的hive_hbase_emp_table表,映射為hbase中的表hbase_emp_table,其中:key對應hbase的rowkey,value對應hbase的列。

完成之后,可以分別進入Hive和HBase查看,都生成了對應的表 在Hive中創建臨時中間表,用於load文件中的數據; 不能將數據直接load進Hive所關聯HBase的那張表中;要經過mapreduce;
向hive_hbase_emp_table表中插入數據: hive
> insert into table hive_hbase_emp_table select * from emp; 查看Hive以及關聯的HBase表中是否已經成功的同步插入了數據 Hive: hive> select * from hive_hbase_emp_table; HBase: hbase> scan ‘hbase_emp_table’ 或者 describe ‘hbase_emp_table’

對於在hbase已經存在的表(可提前設計rowkey),在hive中使用CREATE EXTERNAL TABLE來建立聯系

  在HBase中建立預分區表;然后再在HIVE中用外表映射到已建立好的HBase表中

  在HBase中已經存儲了某一張表hbase_emp_table,然后在Hive中創建一個外部表來關聯HBase中的hbase_emp_table這張表,使之可以借助Hive-sql來分析HBase這張表中的數據。建立這樣的外表,數據也是在hbase中存放,hive會在hdfs中創建目錄,但沒有數據文件。

  一般的情況Hbase的數據可能先存在要怎么關聯到Hive中( 這類是外鏈表刪除Hive中的表不影響Hbase中的數據 )

注意:在創建hive的外表時,可以引用hbase中不存在列

同樣,不能使用load data命令給外表加載數據,因為該表使用HBaseStorageHandler創建的:

 

  1. 刪除hive表對hbase沒有影響;
  2. 但是先刪除hbase表hive就會報TableNotFoundException;

 

 

創建外部表: create external table relevance_hbase_emp(
empno
int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") tblproperties ("hbase.table.name"="hbase_emp_table"); 關聯后就可以使用Hive函數進行一些分析操作了 hive (default)> select * from relevance_hbase_emp;

hbase中已有表,去關聯它,創建外部表即可;hive中刪表,數據在hbase中還是存在的;

多列和多列族

例如: hbase中id,name,age可拆分成多個Hive表:

CREATE EXTERNAL TABLE test_hbase_1 (   
  id STRING,
  name STRING) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.table.name'='test:table1', 'hbase.mapred.output.outputtable'='test:table1',
'hbase.columns.mapping'=':key,f:name')

CREATE EXTERNAL TABLE test_hbase_2 (   
  id STRING,   
  age STRING) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.table.name'='test:table1', 
'hbase.mapred.output.outputtable'='test:table1',
'hbase.columns.mapping'=':key,
f:age');

drop table test_hbase_3;

CREATE EXTERNAL TABLE test_hbase_3 (   
  id STRING,
  name STRING,   
  age STRING) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.table.name'='test:table1', 
'hbase.mapred.output.outputtable'='test:table1',
'hbase.columns.mapping'=':key,f:name,f:age')

  可以看到,同一張HBase表可以映射多張 Hive 外部表,並且查詢列互不影響。
  可見向不同 Hive 外部表中插入數據是不會影響HBase其他列的。
  insert into 與 insert overwrite 操作HBase-Hive映射外部表結果是一樣的,且均是基於Hive表所屬列進行更新,不會影響其他列的值。
注意點
1、HBase 中的空 cell 在 Hive 中會補 null
2、Hive 和 HBase 中不匹配的字段會補 null
3、Bytes 類型的數據,建 Hive 表示加#b, 因為 HBase 中 double,int , bigint 類型以byte方式存儲時,用字符串取出來必然是亂碼。
4、其中:key代表的是 HBase 中的 rowkey, Hive 中也要有一個 key 與之對應, 不然會報錯, cf 指的是 HBase 的列族, 創建完后,會自動把 HBase 表里的數據同步到 Hive 中

使用Hive集成HBase表的需注意

  1. 對HBase表進行預分區,增大其MapReduce作業的並行度
  2. 合理的設計rowkey使其盡可能的分布在預先分區好的Region上
  3. 通過set hbase.client.scanner.caching設置合理的掃描緩存

 

2. MapReduce

用MapReduce將數據從本地文件系統導入到HBase的表中,

比如從HBase中讀取一些原始數據后使用MapReduce做數據分析。

結合計算型框架進行計算統計
查看HBase的MapReduce任務的執行,把jar打印出來的就是需要添加到hadoop的CLASSPATH下的jar包

$ bin/hbase mapredcp

環境變量的導入
(1)執行環境變量的導入(臨時生效,在命令行執行下述操作)

$ export HBASE_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

(2)永久生效

在etc/hadoop下hadoop-env.sh中配置:(注意:在for循環之后配),要重啟下才生效
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

 

運行官方的MapReduce任務
-- 案例一:統計Student表中有多少行數據; 要在hbase中先有student表
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

-- 案例二:使用MapReduce將hdfs數據導入到HBase
1)在本地創建一個tsv格式的文件:fruit.tsv
   Apple    Red
   Pear        Yellow
   Pineapple    Yellow

2)創建HBase表
hbase(main):001:0> create 'fruit','info'

3)在HDFS中創建input_fruit文件夾並上傳fruit.tsv文件
[kris@hadoop101 hadoop-2.7.2]$ hdfs dfs -mkdir /hbase/input_fruit  
[kris@hadoop101 datas]$ hadoop fs -put fruit.tsv /hbase/input_fruit

//把表名寫死了;不寫死可以通過命令行run(String[] args)傳參
[kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr.ReadFromTableDriver
bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr2.ReadFromHdfsDriver /fruit.tsv fruit_mr //給它傳參數

4)執行MapReduce到HBase的fruit表中
[kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar /opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit /hbase/input_fruit  //hdfs://hadoop101:9000/這個可省略


5)使用scan命令查看導入后的結果
hbase(main):001:0> scan ‘fruit’

① 將fruit表中的一部分數據,通過MR遷入到fruit_mr表中。

  從hbase中讀取數據;
  輸入的k,v類型已經寫好(ImmutableBytesWritable key,Result value),要自定義輸出類型k,v

ReadFromTableMapper.java
//把hbase中的fruit表中的一部分數據,通過MR遷入到fruit_mr表, fruit_mr要先在hbase中創建好。
public class ReadFromTableMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //1.獲取rowkey//不可變的字節數組,按rowkey來讀;scan掃描的封裝到了result(1行里邊的所有數據)
        byte[] rowkey = key.get();
        //2.創建put對象
        Put put = new Put(rowkey);  //創建put對象時要指定rowkey
        //3.添加列的信息
        for (Cell cell : value.rawCells()) { //獲取這1列的所有cell
            if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
            }
        }
        context.write(key, put);
    }
}


ReadFromTableReducer.java
// TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation>
//Mutation,Delete和Put都是它的子類;Hbase中每次操作可看做mutation這個類; Put已經封裝了rowkey等所有數據
reducer初始化沒有用到nullwritable
public class ReadFromTableReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}


ReadFromTableDriver.jva
//實現Tool接口,執行任務時用ToolRunner調
public class ReadFromTableDriver implements Tool {
    private Configuration configuration = null;

    public int run(String[] strings) throws Exception { //獲取任務並設置
        //1.獲取job對象
        Job job = Job.getInstance(configuration);
        //2.設置job對象
        job.setJarByClass(ReadFromTableDriver.class);
        Scan scan = new Scan();//在這里傳參數StartRow StopRow
        scan.setCacheBlocks(true); //設置緩存
        scan.setCaching(100); //掃100條數據再跟Hbase的server端交互,減少次數
        TableMapReduceUtil.initTableMapperJob("fruit", scan, //mapper去讀數據;
                ReadFromTableMapper.class, ImmutableBytesWritable.class,//k v; fruit可通過args[0]傳參
                Put.class, job);
        TableMapReduceUtil.initTableReducerJob("fruit_mr", //插入數據的表   //工具類初始化map,reduce
                ReadFromTableReducer.class, job);
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1; //1表false
    }
    public void setConf(Configuration conf) { //hadoop的8個配置文件封裝到Configuration中
        this.configuration = conf;
    }
    public Configuration getConf() {
        return configuration;
    }
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new ReadFromTableDriver(), args); //最終調用的是上邊的run方法;tool對象即ReadFromTableDriver的對象
        System.exit(run);//run為0或1
    }
}
打包運行任務
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.mr.ReadFromTableDriver

②實現將HDFS中的數據寫入到HBase表中。  

  寫入到Hbase;泛型,輸入k,value;value已定義好immutaion;需自定義k

public class ReadFromHdfsTableMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private ImmutableBytesWritable k = new ImmutableBytesWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.獲取1行
        String line = value.toString();
        //2.切割
        String[] fields = line.split("\t");
        //3.獲取put對象
        String rowkey = fields[0];
        Put put = new Put(Bytes.toBytes(rowkey));
        //添加列信息| 列族/列名
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"),
                Bytes.toBytes(fields[1]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"),
                Bytes.toBytes(fields[2]));
        context.write(k, put);
    }
}


public class ReadFromHdfsTableReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}


public class ReadFromHdfsDriver implements Tool {
    private Configuration configuration;
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(ReadFromTableDriver.class);
        job.setMapperClass(ReadFromTableMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        //設置文件路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        TableMapReduceUtil.initTableReducerJob(args[1], ReadFromTableReducer.class, job);
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }
    public void setConf(Configuration conf) {
        configuration =conf;
    }
    public Configuration getConf() {
        return configuration;
    }
    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new ReadFromTableDriver(), args); //傳Tool對象
        System.exit(run);
    }
}
打包運行
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.mr2.ReadFromHdfsDriver
提示:運行任務前,如果待數據導入的表不存在,則需要提前創建之。

hadoop jar
yarn jar
8032rm和yarn的通信端口號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM