Hive基本語法操練


Hive 操作

(一)表操作

        Hive 和 Mysql 的表操作語句類似,如果熟悉 Mysql,學習Hive 的表操作就非常容易了,下面對 Hive 的表操作進行深入講解。

(1)先來創建一個表名為student1的內部表

hive> CREATE TABLE IF NOT EXISTS student1
    > (sno INT,sname STRING,age INT,sex STRING)
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY '\t'
    > STORED AS TEXTFILE;

建表規則如下:

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
  [(col_name data_type [COMMENT col_comment], ...)] 
  [COMMENT table_comment] 
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
  [CLUSTERED BY (col_name, col_name, ...) 
  [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
  [ROW FORMAT row_format] 
  [STORED AS file_format] 
  [LOCATION hdfs_path]

•CREATE TABLE 創建一個指定名字的表。如果相同名字的表已經存在,則拋出異常;用戶可以用 IF NOT EXIST 選項來忽略這個異常

•EXTERNAL 關鍵字可以讓用戶創建一個外部表,在建表的同時指定一個指向實際數據的路徑(LOCATION)

•LIKE 允許用戶復制現有的表結構,但是不復制數據

•COMMENT可以為表與字段增加描述

•ROW FORMAT DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]

[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]

| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]

用戶在建表的時候可以自定義 SerDe 或者使用自帶的 SerDe。如果沒有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,將會使用自帶的 SerDe。在建表的時候,用戶還需要為表指定列,用戶在指定表的列的同時也會指定自定義的 SerDe,Hive 通過 SerDe 確定表的具體的列的數據。

•STORED AS

SEQUENCEFILE

| TEXTFILE

| RCFILE

| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

如果文件數據是純文本,可以使用 STORED AS TEXTFILE。如果數據需要壓縮,使用 STORED AS SEQUENCE 。

(2)創建外部表

hive> CREATE EXTERNAL TABLE IF NOT EXISTS student2
    > (sno INT,sname STRING,age INT,sex STRING)   
    > ROW FORMAT DELIMITED                        
    > FIELDS TERMINATED BY '\t'                   
    > STORED AS TEXTFILE                          
    > LOCATION '/user/external';                  
OK
Time taken: 0.331 seconds
hive> show tables;                             
OK
student1
student2
Time taken: 0.06 seconds, Fetched: 12 row(s)

(3)刪除表

首先創建一個表名為test1的表

hive> CREATE TABLE IF NOT EXISTS test1 
    > (id INT,name STRING);                 
OK
Time taken: 0.07 seconds

然后查看一下是否有test1表

hive> SHOW TABLES;
OK
student1
student2
test1
Time taken: 0.042 seconds, Fetched: 12 row(s)

用命令刪test1表

hive> DROP TABLE test1;
OK
Time taken: 0.191 seconds

查看test1表是否刪除

hive> SHOW TABLES;
OK
student1
student2
Time taken: 0.027 seconds, Fetched: 11 row(s)

(4)修改表的結構,比如為表增加字段

首先看一下student1表的結構

hive> DESC student1;
OK
sno                 	int                 	                    
sname               	string              	                    
age                 	int                 	                    
sex                 	string              	                    
Time taken: 0.886 seconds, Fetched: 4 row(s)

為表student1增加兩個字段

hive> ALTER TABLE student1 ADD COLUMNS  
    > (address STRING,grade STRING);
OK
Time taken: 0.241 seconds

再查看一下表的結構,看是否增加

hive> DESC student1;
OK
sno                 	int                 	                    
sname               	string              	                    
age                 	int                 	                    
sex                 	string              	                    
address             	string              	                    
grade               	string              	                    
Time taken: 0.154 seconds, Fetched: 6 row(s)

(5)修改表名student1為student3

hive> ALTER TABLE student1 RENAME TO student3;
OK
Time taken: 0.172 seconds

查看一下

hive> SHOW TABLES;
OK
student2
student3
Time taken: 0.088 seconds, Fetched: 11 row(s)

下面我們再改回來

hive> ALTER TABLE student3 RENAME TO student1;
OK
Time taken: 0.153 seconds

查看一下

hive> SHOW TABLES;
OK
student1
student2
Time taken: 0.064 seconds, Fetched: 11 row(s)

(6)創建和已知表相同結構的表

hive> CREATE TABLE copy_student1 LIKE student1;
OK
Time taken: 1.109 seconds

查看一下

hive> SHOW TABLES;
OK
copy_student1
student1
student2
Time taken: 0.083 seconds, Fetched: 12 row(s)

2、加入導入數據的方法,(數據里可以包含重復記錄),只有導入了數據,才能供后邊的查詢使用

(1)加載本地數據load

首先看一下表的結構

hive> DESC student1;
OK
sno                 	int                 	                    
sname               	string              	                    
age                 	int                 	                    
sex                 	string              	                    
address             	string              	                    
grade               	string              	                    
Time taken: 1.018 seconds, Fetched: 6 row(s)

創建/home/hadoop/data目錄,並在該目錄下創建student1.txt文件,添加如下內容

201501001       張三    22      男      北京    大三
201501003       李四    23      男      上海    大二
201501004       王娟    22      女      廣州    大三
201501010       周王    24      男      深圳    大四
201501011       李紅    23      女      北京    大三

加載數據到student1表中

hive> LOAD DATA LOCAL INPATH '/home/hadoop/data/student1.txt' INTO TABLE student1;
Loading data to table default.student1
Table default.student1 stats: [numFiles=1, numRows=0, totalSize=191, rawDataSize=0]
OK
Time taken: 0.766 seconds

查看是否加載成功

hive> SELECT * FROM student1;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 0.512 seconds, Fetched: 5 row(s)

(2)加載hdfs中的文件

首先將文件student1.txt上傳到hdfs文件系統對應目錄上

[hadoop@djt01 hadoop]$ hadoop fs -put /home/hadoop/data/student1.txt /user/hive
16/05/16 17:15:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[hadoop@djt01 hadoop]$ hadoop fs -ls /user/hive
16/05/16 17:16:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   3 hadoop supergroup        191 2016-05-19 03:27 /user/hive/student1.txt
drwxr-xr-x   - hadoop supergroup          0 2016-05-19 02:46 /user/hive/warehouse

加載hdfs中的文件數據到copy_student1表中

hive> LOAD DATA INPATH '/user/hive/student1.txt' INTO TABLE copy_student1;
Loading data to table default.copy_student1
Table default.copy_student1 stats: [numFiles=1, totalSize=191]
OK
Time taken: 1.354 seconds

查看是否加載成功

hive> SELECT * FROM copy_student1;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 0.44 seconds, Fetched: 5 row(s)

(3)表插入數據(單表插入、多表插入)

1)單表插入

首先創建一個表copy_student2,表結構和student1相同

hive> CREATE TABLE copy_student2 LIKE student1;
OK
Time taken: 0.586 seconds

查看一下是否創建成功

hive> SHOW TABLES;
OK
copy_student1
copy_student2
student1
student2
Time taken: 0.073 seconds, Fetched: 13 row(s)

看一下copy_student2表的表結構

hive> DESC copy_student2;
OK
sno                 	int                 	                    
sname               	string              	                    
age                 	int                 	                    
sex                 	string              	                    
address             	string              	                    
grade               	string              	                    
Time taken: 0.121 seconds, Fetched: 6 row(s)

把表student1中的數據插入到copy_student2表中

hive> INSERT OVERWRITE TABLE copy_student2 SELECT * FROM student1;

查看數據是否插入

hive> SELECT * FROM copy_student2;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 0.107 seconds, Fetched: 5 row(s)

2)多表插入

先創建兩個表

hive> CREATE TABLE copy_student3 LIKE student1;        OK    
Time taken: 0.622 seconds
hive> CREATE TABLE copy_student4 LIKE student1;
OK
Time taken: 0.162 seconds

向多表插入數據

hive> FROM student1                                       
    > INSERT OVERWRITE TABLE copy_student3
    > SELECT *                            
    > INSERT OVERWRITE TABLE copy_student4
    > SELECT *;

查看結果

hive> SELECT * FROM copy_student3;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 0.103 seconds, Fetched: 5 row(s)
hive> SELECT * FROM copy_student4;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 0.071 seconds, Fetched: 5 row(s)

3、有關表的內容的查詢

(1)查表的所有內容

hive> SELECT * FROM student1;
OK
201501001	張三	22	男	北京	大三
201501003	李四	23	男	上海	大二
201501004	王娟	22	女	廣州	大三
201501010	周王	24	男	深圳	大四
201501011	李紅	23	女	北京	大三
Time taken: 1.201 seconds, Fetched: 5 row(s)

(2)查表的某個字段的屬性

hive> SELECT sname FROM student1;
OK
張三
李四
王娟
周王
李紅
Time taken: 1.22 seconds, Fetched: 5 row(s)

(3)where條件查詢

hive> SELECT * FROM student1 WHERE sno>201501004 AND address="北京";
OK
201501011	李紅	23	女	北京	大三
Time taken: 0.873 seconds, Fetched: 1 row(s)

(4)all和distinct的區別(這就要求表中要有重復的記錄,或者某個字段要有重復的數據)

hive> SELECT ALL age,grade FROM student1;
OK
22	大三
23	大二
22	大三
24	大四
23	大三
Time taken: 0.448 seconds, Fetched: 5 row(s)
hive> SELECT age,grade FROM student1;    
OK
22	大三
23	大二
22	大三
24	大四
23	大三
Time taken: 0.072 seconds, Fetched: 5 row(s)
hive> SELECT DISTINCT age,grade FROM student1;
OK
22	大三
23	大三
23	大二
24	大四
Time taken: 127.397 seconds, Fetched: 4 row(s)
hive> SELECT DISTINCT age FROM student1;
OK
22
23
24
Time taken: 106.21 seconds, Fetched: 3 row(s)

(5)limit限制查詢

pre class="html">hive> SELECT * FROM student1 LIMIT 4; OK 201501001 張三 22 男 北京 大三 201501003 李四 23 男 上海 大二 201501004 王娟 22 女 廣州 大三 201501010 周王 24 男 深圳 大四 Time taken: 0.253 seconds, Fetched: 4 row(s)

(6) GROUP BY 分組查詢

group by 分組查詢在數據統計時比較常用,接下來講解 group by 的使用。

1) 創建一個表 group_test,表的內容如下。

hive> create table group_test(uid STRING,gender STRING,ip STRING) row format delimited fields terminated by '\t'  STORED AS TEXTFILE;

向 group_test 表中導入數據。

hive> LOAD DATA LOCAL INPATH '/home/hadoop/djt/user.txt'  INTO TABLE group_test;

2) 計算表的行數命令如下。

hive> select count(*) from group_test;

3) 根據性別計算去重用戶數。

首先創建一個表 group_gender_sum

hive> create table group_gender_sum(gender STRING,sum INT);

將表 group_test 去重后的數據導入表 group_gender_sum。

hive> insert overwrite table group_gender_sum select group_test.gender,count(distinct group_test.uid) from group_test group by group_test.gender;

同時可以做多個聚合操作,但是不能有兩個聚合操作有不同的 distinct 列。下面正確合法的聚合操作語句。

首先創建一個表 group_gender_agg

hive> create table group_gender_agg(gender STRING,sum1 INT,sum2 INT,sum3 INT);

將表 group_test 聚合后的數據插入表 group_gender_agg。

hive> insert overwrite table group_gender_agg select group_test.gender,count(distinct group_test.uid),count(*),sum(distinct group_test.uid) from group_test group by group_test.gender;

但是,不允許在同一個查詢內有多個 distinct 表達式。下面的查詢是不允許的。

hive> insert overwrite table group_gender_agg select group_test.gender,count(distinct group_test.uid),count(distinct group_test.ip) from group_test group by group_test.gender;

這條查詢語句是不合法的,因為 distinct group_test.uid 和 distinct group_test.ip 操作了uid 和 ip 兩個不同的列。

(7) ORDER BY 排序查詢

ORDER BY 會對輸入做全局排序,因此只有一個 Reduce(多個 Reduce 無法保證全局有序)會導致當輸入規模較大時,需要較長的計算時間。使用 ORDER BY 查詢的時候,為了優化查詢的速度,使用 hive.mapred.mode 屬性。

hive.mapred.mode = nonstrict;(default value/默認值)
hive.mapred.mode=strict;

與數據庫中 ORDER BY 的區別在於,在 hive.mapred.mode=strict 模式下必須指定limit ,否則執行會報錯。

hive> set hive.mapred.mode=strict;
hive> select * from group_test order by uid limit 5;
Total jobs = 1
..............
Total MapReduce CPU Time Spent: 4 seconds 340 msec
OK
01      male    192.168.1.2
01      male    192.168.1.32
01      male    192.168.1.26
01      male    192.168.1.22
02      female  192.168.1.3
Time taken: 58.04 seconds, Fetched: 5 row(s)

(8) SORT BY 查詢

sort by 不受 hive.mapred.mode 的值是否為 strict 和 nostrict 的影響。sort by 的數據只能保證在同一個 Reduce 中的數據可以按指定字段排序。

使用 sort by 可以指定執行的 Reduce 個數(set mapred.reduce.tasks=< number>)這樣可以輸出更多的數據。對輸出的數據再執行歸並排序,即可以得到全部結果。

hive> set hive.mapred.mode=strict;
hive> select * from group_test sort by uid ;
Total MapReduce CPU Time Spent: 4 seconds 450 msec
OK
01      male    192.168.1.2
01      male    192.168.1.32
01      male    192.168.1.26
01      male    192.168.1.22
02      female  192.168.1.3
03      male    192.168.1.23
03      male    192.168.1.5
04      male    192.168.1.9
05      male    192.168.1.8
05      male    192.168.1.29
06      female  192.168.1.201
06      female  192.168.1.52
06      female  192.168.1.7
07      female  192.168.1.11
08      female  192.168.1.21
08      female  192.168.1.62
08      female  192.168.1.88
08      female  192.168.1.42
Time taken: 77.875 seconds, Fetched: 18 row(s)

(9) DISTRIBUTE BY 排序查詢

按照指定的字段對數據划分到不同的輸出 Reduce 文件中,操作如下。

hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test distribute by length(gender);

此方法根據 gender 的長度划分到不同的 Reduce 中,最終輸出到不同的文件中。length 是內建函數,也可以指定其它的函數或者使用自定義函數。

hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test order by gender  distribute by length(gender);
order by gender 與 distribute by length(gender) 不能共用。

(10) CLUSTER BY 查詢

cluster by 除了具有 distribute by 的功能外還兼具 sort by 的功能。

(二)視圖操作

1) 創建一個測試表。

hive> create table test(id int,name string);
OK
Time taken: 0.385 seconds
hive> desc test;
OK
id                      int                                         
name                    string                                      
Time taken: 0.261 seconds, Fetched: 2 row(s)

2) 基於表 test 創建一個 test_view 視圖。

hive> create view test_view(id,name_length) as select id,length(name) from test;

3) 查看 test_view 視圖屬性。

hive> desc test_view;

4) 查看視圖結果。

hive> select * from test_view;

(三)索引操作

1) Hive 創建索引。

hive> create index user_index on table user(id) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild IN TABLE user_index_table;

2) 更新數據。

hive> alter index user_index on user rebuild;

3) 刪除索引

hive> drop index user_index on user;

4) 查看索引

hive> show index on user;

5) 創建表和索引案例

hive> create table index_test(id INT,name STRING) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FILEDS TERMINATED BY ',';

創建一個索引測試表 index_test,dt作為分區屬性,“ROW FORMAT DELIMITED FILEDS TERMINATED BY ','” 表示用逗號分割字符串,默認為‘\001’。

6) 創建一個臨時索引表 index_tmp。

hive> create table index_tmp(id INT,name STRING,dt STRING) ROW FORMAT DELIMITED FILEDS TERMINATED BY ',';

7) 加載本地數據到 index_tmp 表中。

hive> load data local inpath '/home/hadoop/djt/test.txt' into table index_tmp;

設置 Hive 的索引屬性來優化索引查詢,命令如下。

hive> set hive.exec.dynamic.partition.mode=nonstrict;----設置所有列為 dynamic partition
hive> set hive.exec.dynamic.partition=true;----使用動態分區

8) 查詢index_tmp 表中的數據,插入 table_test 表中。

hive> insert overwrite table index_test partition(dt) select id,name,dt from index_tmp;

9) 使用 index_test 表,在屬性 id 上創建一個索引 index1_index_test 。

hive> create index index1_index_test on table index_test(id) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' WITH DEFERERD REBUILD;

10) 填充索引數據。

hive> alter index index1_index_test on index_test rebuild;

11) 查看創建的索引。

hive> show index on index_test

12) 查看分區信息。

hive> show partitions index_test;

13) 查看索引數據。

$ hadoop fs -ls /usr/hive/warehouse/default_index_test_index1_index_test_

14) 刪除索引。

hive> drop index index1_index_test on index_test;
show index on index_test;

15) 索引數據也被刪除。

$ hadoop fs -ls /usr/hive/warehouse/default_index_test_index1_index_test_
no such file or directory

&16) 修改配置文件信息。

< property> < name>hive.optimize.index.filter< /name> < value>true< /value> < /property> < property> < name>hive.optimize.index.groupby< /name> < value>true< /value> < /property> < property> < name>hive.optimize.index.filter.compact.minsize< /name> < value>5120< /value> < /property>

hive.optimize.index.filter 和 hive.optimize.index.groupby 參數默認是 false。使用索引的時候必須把這兩個參數開啟,才能起到作用。

hive.optimize.index.filter.compact.minsize 參數為輸入一個緊湊的索引將被自動采用最小尺寸、默認5368709120(以字節為單位)。

(四)分區操作

Hive 的分區通過在創建表時啟動 PARTITION BY 實現,用來分區的維度並不是實際數據的某一列,具體分區的標志是由插入內容時給定的。當要查詢某一分區的內容時可以采用 WHERE 語句, 例如使用 “WHERE tablename.partition_key>a” 創建含分區的表。創建分區語法如下。

CREATE TABLE table_name(
...
)
PARTITION BY (dt STRING,country STRING)

1、 創建分區

Hive 中創建分區表沒有什么復雜的分區類型(范圍分區、列表分區、hash 分區,混合分區等)。分區列也不是表中的一個實際的字段,而是一個或者多個偽列。意思是說,在表的數據文件中實際並不保存分區列的信息與數據。

創建一個簡單的分區表。

hive> create table partition_test(member_id string,name string) partitioned by (stat_date string,province string) row format delimited fields terminated by ',';

這個例子中創建了 stat_date 和 province 兩個字段作為分區列。通常情況下需要預先創建好分區,然后才能使用該分區。例如:

hive> alter table partition_test add partition (stat_date='2015-01-18',province='beijing');

這樣就創建了一個分區。這時會看到 Hive 在HDFS 存儲中創建了一個相應的文件夾。

$ hadoop fs -ls /user/hive/warehouse/partition_test/stat_date=2015-01-18
/user/hive/warehouse/partition_test/stat_date=2015-01-18/province=beijing----顯示剛剛創建的分區

每一個分區都會有一個獨立的文件夾,在上面例子中,stat_date 是主層次,province 是 副層次。

2、 插入數據

使用一個輔助的非分區表 partition_test_input 准備向 partition_test 中插入數據,實現步驟如下。

1) 查看 partition_test_input 表的結構,命令如下。

hive> desc partition_test_input;

2) 查看 partition_test_input 的數據,命令如下。

hive> select * from partition_test_input;

3) 向 partition_test 的分區中插入數據,命令如下。

insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu';

向多個分區插入數據,命令如下。

hive> from partition_test_input
insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='sichuan') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='sichuan'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='beijing') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='beijing';

3、 動態分區

按照上面的方法向分區表中插入數據,如果數據源很大,針對一個分區就要寫一個 insert ,非常麻煩。使用動態分區可以很好地解決上述問題。動態分區可以根據查詢得到的數據自動匹配到相應的分區中去。

動態分區可以通過下面的設置來打開:

 set hive.exec.dynamic.partition=true;  
set hive.exec.dynamic.partition.mode=nonstrict; 

動態分區的使用方法很簡單,假設向 stat_date='2015-01-18' 這個分區下插入數據,至於 province 插到哪個子分區下讓數據庫自己來判斷。stat_date 叫做靜態分區列,province 叫做動態分區列。

hive> insert overwrite table partition_test partition(stat_date='2015-01-18',province)
select member_id,name province from partition_test_input where stat_date='2015-01-18';
注意,動態分區不允許主分區采用動態列而副分區采用靜態列,這樣將導致所有的主分區都要創建副分區靜態列所定義的分區。

hive.exec.max.dynamic.partitions.pernode:每一個 MapReduce Job 允許創建的分區的最大數量,如果超過這個數量就會報錯(默認值100)。

hive.exec.max.dynamic.partitions:一個 dml 語句允許創建的所有分區的最大數量(默認值100)。

hive.exec.max.created.files:所有 MapReduce Job 允許創建的文件的最大數量(默認值10000)。

盡量讓分區列的值相同的數據在同一個 MapReduce 中,這樣每一個 MapReduce 可以盡量少地產生新的文件夾,可以通過 DISTRIBUTE BY 將分區列值相同的數據放到一起,命令如下。

hive> insert overwrite table partition_test partition(stat_date,province)
select memeber_id,name,stat_date,province from partition_test_input distribute by stat_date,province;

(五)桶操作

Hive 中 table 可以拆分成 Partition table 和 桶(BUCKET),桶操作是通過 Partition 的 CLUSTERED BY 實現的,BUCKET 中的數據可以通過 SORT BY 排序。

BUCKET 主要作用如下。

1)數據 sampling;

2)提升某些查詢操作效率,例如 Map-Side Join。

需要特別主要的是,CLUSTERED BY 和 SORT BY 不會影響數據的導入,這意味着,用戶必須自己負責數據的導入,包括數據額分桶和排序。 'set hive.enforce.bucketing=true' 可以自動控制上一輪 Reduce 的數量從而適配 BUCKET 的個數,當然,用戶也可以自主設置 mapred.reduce.tasks 去適配 BUCKET 個數,推薦使用:

hive> set hive.enforce.bucketing=true;

操作示例如下。

1) 創建臨時表 student_tmp,並導入數據。

hive> desc student_tmp;
hive> select * from student_tmp;

2) 創建 student 表。

hive> create table student(id int,age int,name string)
partitioned by (stat_date string)
clustered by (id) sorted by(age) into 2 bucket
row format delimited fields terminated by ',';

3) 設置環境變量。

hive> set hive.enforce.bucketing=true;

4) 插入數據。

hive> from student_tmp
insert overwrite table student partition(stat_date='2015-01-19')
select id,age,name where stat_date='2015-01-18' sort by age;

5) 查看文件目錄。

$ hadoop fs -ls /usr/hive/warehouse/student/stat_date=2015-01-19/

6) 查看 sampling 數據。

hive> select * from student tablesample(bucket 1 out of 2 on id);

tablesample 是抽樣語句,語法如下。

tablesample(bucket x out of y)

y 必須是 table 中 BUCKET 總數的倍數或者因子。

Hive 復合類型

 hive提供了復合數據類型:

 1)Structs: structs內部的數據可以通過DOT(.)來存取。例如,表中一列c的類型為STRUCT{a INT; b INT},我們可以通過c.a來訪問域a。

 2)Map(K-V對):訪問指定域可以通過["指定域名稱"]進行。例如,一個Map M包含了一個group-》gid的kv對,gid的值可以通過M['group']來獲取。

 3)Array:array中的數據為相同類型。例如,假如array A中元素['a','b','c'],則A[1]的值為'b'

1、Struct使用

 1) 建表

hive> create table student_test(id INT, info struct< name:STRING, age:INT>) > ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' > COLLECTION ITEMS TERMINATED BY ':';

 'FIELDS TERMINATED BY' :字段與字段之間的分隔符。'COLLECTION ITEMS TERMINATED BY' :一個字段各個item的分隔符。

 2) 導入數據

$ cat test5.txt   
1,zhou:30  
2,yan:30  
3,chen:20  
4,li:80  
hive> LOAD DATA LOCAL INPATH '/home/hadoop/djt/test5.txt' INTO TABLE student_test;

 3) 查詢數據

hive> select info.age from student_test;  

2、Array使用

 1) 建表

hive> create table class_test(name string, student_id_list array< INT>) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > COLLECTION ITEMS TERMINATED BY ':';

 2) 導入數據

$ cat test6.txt   
034,1:2:3:4  
035,5:6  
036,7:8:9:10  
hive>  LOAD DATA LOCAL INPATH '/home/work/data/test6.txt' INTO TABLE class_test ; 

 3) 查詢

hive> select student_id_list[3] from class_test; 

3、Map使用

 1) 建表

hive> create table employee(id string, perf map< string, int>) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '\t' > COLLECTION ITEMS TERMINATED BY ',' > MAP KEYS TERMINATED BY ':'; 

 ‘MAP KEYS TERMINATED BY’ :key value分隔符

 2) 導入數據

$ cat test7.txt   
1       job:80,team:60,person:70  
2       job:60,team:80  
3       job:90,team:70,person:100  
hive>  LOAD DATA LOCAL INPATH '/home/work/data/test7.txt' INTO TABLE employee;  

 3) 查詢

hive> select perf['person'] from employee;

Hive 的 JOIN 用法

 hive只支持等連接,外連接,左半連接。hive不支持非相等的join條件(通過其他方式實現,如left outer join),因為它很難在map/reduce job實現這樣的條件。而且,hive可以join兩個以上的表。

1、等連接

 只有等連接才允許

hive> SELECT a.* FROM a JOIN b ON (a.id = b.id)  
hive> SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department) 

2、多表連接

 同個查詢,可以join兩個以上的表

hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 

3、join的緩存和任務轉換

 hive轉換多表join時,如果每個表在join字句中,使用的都是同一個列,只會轉換為一個單獨的map/reduce。

hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

 被轉換為兩個map/reduce任務,因為b的key1列在第一個join條件使用,而b表的key2列在第二個join條件使用。第一個map/reduce任務join a和b。第二個任務是第一個任務的結果join c。

hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 

 在join的每個map/reduce階段,序列中的最后一個表,當其他被緩存時,它會流到reducers。所以,reducers需要緩存join關鍵字的特定值組成的行,通過組織最大的表出現在序列的最后,有助於減少reducers的內存。

hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

 三個表,在同一個獨立的map/reduce任務做join。a和b的key對應的特定值組成的行,會緩存在reducers的內存。然后reducers接受c的每一行,和緩存的每一行做join計算。

hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 

 這里有兩個map/reduce任務在join計算被調用。第一個是a和b做join,然后reducers緩存a的值,另一邊,從流接收b的值。第二個階段,reducers緩存第一個join的結果,另一邊從流接收c的值。

 在join的每個map/reduce階段,通過關鍵字,可以指定哪個表從流接收。

hive> SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1) 

 三個表的連接,會轉換為一個map/reduce任務,reducer會把b和c的key的特定值緩存在內存里,然后從流接收a的每一行,和緩存的行做join。

4、join的結果

 LEFT,RIGHT,FULL OUTER連接存在是為了提供ON語句在沒有匹配時的更多控制。例如,這個查詢:

hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)  

 將會返回a的每一行。如果b.key等於a.key,輸出將是a.val,b.val,如果a沒有和b.key匹配,輸出的行將是 a.val,NULL。如果b的行沒有和a.key匹配上,將被拋棄。語法"FROM a LEFT OUTER JOIN b"必須寫在一行,為了理解它如何工作——這個查詢,a是b的左邊,a的所有行會被保持;RIGHT OUTER JOIN將保持b的所有行, FULL OUTER JOIN將會保存a和b的所有行。OUTER JOIN語義應該符合標准的SQL規范。

5、join的過濾

 Joins發生在where字句前,所以,如果要限制join的輸出,需要寫在where字句,否則寫在JOIN字句。現在討論的一個混亂的大點,就是分區表

hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)  WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'  

 將會連接a和b,產生a.val和b.val的列表。WHERE字句,也可以引用join的輸出列,然后過濾他們。 但是,無論何時JOIN的行找到a的key,但是找不到b的key時,b的所有列會置成NULL,包括ds列。這就是說,將過濾join輸出的所有行,包括沒有合法的b.key的行。然后你會在LEFT OUTER的要求撲空。 也就是說,如果你在WHERE字句引用b的任何列,LEFT OUTER的部分join結果是不相關的。所以,當外連接時,使用這個語句

hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07';

 join的輸出會預先過濾,然后你不用對有a.key而沒有b.key的行做過濾。RIGHT和FULL join也是一樣的邏輯。

6、join的順序

 join是不可替換的,連接是從左到右,不管是LEFT或RIGHT join。

hive>  SELECT a.val1, a.val2, b.val, c.val  FROM a  JOIN b ON (a.key = b.key)  LEFT OUTER JOIN c ON (a.key = c.key)

 首先,連接a和b,扔掉a和b中沒有匹配的key的行。結果表再連接c。這提供了直觀的結果,如果有一個鍵都存在於A和C,但不是B:完整行(包括 a.val1,a.val2,a.key)會在"a jOIN b"步驟,被丟棄,因為它不在b中。結果沒有a.key,所以當它和c做LEFT OUTER JOIN,c.val也無法做到,因為沒有c.key匹配a.key(因為a的行都被移除了)。類似的,RIGHT OUTER JOIN(替換為LEFT),我們最終會更怪的效果,NULL, NULL, NULL, c.val。因為盡管指定了join key是a.key=c.key,我們已經在第一個JOIN丟棄了不匹配的a的所有行。

 為了達到更直觀的效果,相反,我們應該從

hive> FROM c LEFT OUTER JOIN a ON (c.key = a.key) LEFT OUTER JOIN b ON (c.key = b.key). 

 LEFT SEMI JOIN實現了相關的IN / EXISTS的子查詢語義的有效途徑。由於Hive目前不支持IN / EXISTS的子查詢,所以你可以用 LEFT SEMI JOIN 重寫你的子查詢語句。LEFT SEMI JOIN 的限制是, JOIN 子句中右邊的表只能在 ON 子句中設置過濾條件,在 WHERE 子句、SELECT 子句或其他地方過濾都不行。

hive> SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B); 

 可以重寫為

hive> SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)  

7、map 端 join

 但如果所有被連接的表是小表,join可以被轉換為只有一個map任務。查詢是

hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key

 不需要reducer。對於每一個mapper,A和B已經被完全讀出。限制是a FULL/RIGHT OUTER JOIN b不能使用。

 如果表在join的列已經分桶了,其中一張表的桶的數量,是另一個表的桶的數量的整倍,那么兩者可以做桶的連接。如果A有4個桶,表B有4個桶,下面的連接:

hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key 

 只能在mapper工作。為了為A的每個mapper完整抽取B。對於上面的查詢,mapper處理A的桶1,只會抽取B的桶1,這不是默認行為,要使用以下參數:

hive> set hive.optimize.bucketmapjoin = true; 

 如果表在join的列經過排序,分桶,而且他們有相同數量的桶,可以使用排序-合並 join。每個mapper,相關的桶會做連接。如果A和B有4個桶

hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM A a join B b on a.key = b.key 

 只能在mapper使用。使用A的桶的mapper,也會遍歷B相關的桶。這個不是默認行為,需要配置以下參數:

hive> set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;  
hive> set hive.optimize.bucketmapjoin = true;  
hive> set hive.optimize.bucketmapjoin.sortedmerge = true;  

Hive 內置操作符與函數

字符串函數

 1)字符串長度函數:length

語法: length(string A)  
返回值: int  
說明:返回字符串A的長度  
舉例:  
hive> select length(‘abcedfg’) from dual;  
7 

 2)字符串反轉函數:reverse

語法: reverse(string A)  
返回值: string  
說明:返回字符串A的反轉結果  
舉例:  
hive> select reverse(‘abcedfg’) from dual;  
gfdecba

 3)字符串連接函數:concat

語法: concat(string A, string B…)  
返回值: string  
說明:返回輸入字符串連接后的結果,支持任意個輸入字符串  
舉例:  
hive> select concat(‘abc’,'def’,'gh’) from dual;  
abcdefgh  

 4)帶分隔符字符串連接函數:concat_ws

語法: concat_ws(string SEP, string A, string B…)  
返回值: string  
說明:返回輸入字符串連接后的結果,SEP表示各個字符串間的分隔符  
舉例:  
hive> select concat_ws(‘,’,'abc’,'def’,'gh’) from dual;  
abc,def,gh

 5)字符串截取函數:substr,substring

語法: substr(string A, int start),substring(string A, int start)  
返回值: string  
說明:返回字符串A從start位置到結尾的字符串  
舉例:  
hive> select substr(‘abcde’,3) from dual;  
cde  
hive> select substring(‘abcde’,3) from dual;  
cde  
hive>  select substr(‘abcde’,-1) from dual;  (和ORACLE相同)  
e  

 6)字符串截取函數:substr,substring

語法: substr(string A, int start, int len),substring(string A, int start, int len)  
返回值: string  
說明:返回字符串A從start位置開始,長度為len的字符串  
舉例:  
hive> select substr(‘abcde’,3,2) from dual;  
cd  
hive> select substring(‘abcde’,3,2) from dual;  
cd  
hive>select substring(‘abcde’,-2,2) from dual;  
de

 7)字符串轉大寫函數:upper,ucase

語法: upper(string A) ucase(string A)  
返回值: string  
說明:返回字符串A的大寫格式  
舉例:  
hive> select upper(‘abSEd’) from dual;  
ABSED  
hive> select ucase(‘abSEd’) from dual;  
ABSED 

 8)字符串轉小寫函數:lower,lcase

語法: lower(string A) lcase(string A)  
返回值: string  
說明:返回字符串A的小寫格式  
舉例:  
hive> select lower(‘abSEd’) from dual;  
absed  
hive> select lcase(‘abSEd’) from dual;  
absed

 9)去空格函數:trim

語法: trim(string A)  
返回值: string  
說明:去除字符串兩邊的空格  
舉例:  
hive> select trim(‘ abc ‘) from dual;  
abc 

 10)左邊去空格函數:ltrim

語法: ltrim(string A)  
返回值: string  
說明:去除字符串左邊的空格  
舉例:  
hive> select ltrim(‘ abc ‘) from dual;  
abc

 11)右邊去空格函數:rtrim

語法: rtrim(string A)  
返回值: string  
說明:去除字符串右邊的空格  
舉例:  
hive> select rtrim(‘ abc ‘) from dual;  
abc 

集合統計函數

 1) 個數統計函數 count。

語法:count(*),count(expr),count(distinct expr)
返回值 int。
count(*)統計檢索出行的個數,包括 NULL 值的行;
count(expr)返回指定字段的非空值的個數;
count(distinct expr)返回指定字段的不同的非空值的個數。
舉例:
hive> select count(*) from user;
hive> select count(distinct age);

 2) 總和統計函數 sum。

語法:sum(col),sum(distinct col)
返回值 double。
sum(col) 統計結果集中 col 的相加的結果;
sum(distinct col) 統計結果中 col 不同值相加的結果。
舉例:
hive> select sum(age) from user;
hive> select sum(distinct age) from user;

 3) 平均值統計函數avg。

語法:avg(col),avg(distinct col)
返回值 double。
avg(col) 統計結果集中的平均值;
avg(distinct col) 統計結果中 col 不同值相加的平均值。
舉例:
hive> select avg(mark) from user;
select avg(distinct mark) from user;

 4) 最小值統計函數 min。統計結果集中 col 字段的最小值。

語法:min(col)
返回值double。
舉例:
hive>select min(mark) from user;

 5) 最大值統計函數 max。統計結果集中 col 字段的最大值。

語法:max(col)
返回值 double。
舉例:
hive> select max(mark) from user;

復合類型操作

 1) Map 類型構建。根據輸入的 Key-Value 對構建 Map 類型。

語法:map(key1, value1, key2, value2,...)
舉例:
hive>create table map_test as select map('100','jay','200','liu') from student;
hive>describe map_test;
hive>select map_test from student;

 2) Struct 類型構建。根據輸入的參數構建結構體 Struct 類型。

語法:struct(val1, val2, val3, ...)
舉例:
hive>create table struct_test as select struct('jay','liu','gang') from student;
hive>describe struct_test;
hive>select struct_test from student;

 3) Array 類型構建。根據輸入的參數構建數組 Array 類型。

語法:array(val1,val2, ...)
舉例:
hive> create table array_test as select array('jay','liu','gang') from student;
hive> describe array_test;
hive> select array_test from array_test;

用戶自定義函數 UDF

 UDF(User Defined Function,用戶自定義函數) 對數據進行處理。UDF 函數可以直接應用於 select 語句,對查詢結構做格式化處理后,再輸出內容。

 Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。Hive中有3種UDF:

 1)UDF:操作單個數據行,產生單個數據行。

 2)UDAF:操作多個數據行,產生一個數據行。

 3)UDTF:操作一個數據行,產生多個數據行一個表作為輸出。

 用戶構建的UDF使用過程如下:

 第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。

 第二步:將寫好的類打包為jar。如hivefirst.jar。

 第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar 注冊該jar文件。

 第四步:為該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這里注意UDF只是為這個Hive會話臨時定義的。

 第五步:在select中使用mylength()。

 自定義UDF

package whut;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
//UDF是作用於單個數據行,產生一個數據行
//用戶必須要繼承UDF,且必須至少實現一個evalute方法,該方法並不在UDF中
//但是Hive會檢查用戶的UDF是否擁有一個evalute方法
public class Strip extends UDF{
	private Text result=new Text();
	//自定義方法
	public Text evaluate(Text str)
	{
		if(str==null)
		return null;
		result.set(StringUtils.strip(str.toString()));
		return result;
	}
	public Text evaluate(Text str,String stripChars)
	{
		if(str==null)
		return null;
		result.set(StringUtils.strip(str.toString(),stripChars));
		return result;
	}
}

 注意事項:

 1、一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;

 2、一個UDF必須要包含有evaluate()方法,但是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶自己定義的。在使用的時候,Hive會調用UDF的evaluate()方法。

 自定義UDAF找到最大值

package whut;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是輸入多個數據行,產生一個數據行
//用戶自定義的UDAF必須是繼承了UDAF,且內部包含多個實現了exec的靜態類
public class MaxiNumber extends UDAF{
	public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
	//最終結果
	private IntWritable result;
	//負責初始化計算函數並設置它的內部狀態,result是存放最終結果的
	@Override
	public void init() {
		result=null;
	}
	//每次對一個新值進行聚集計算都會調用iterate方法
	public boolean iterate(IntWritable value)
	{
		if(value==null)
			return false;
		if(result==null)
			result=new IntWritable(value.get());
		else
			result.set(Math.max(result.get(), value.get()));
		return true;
	}
	 
	//Hive需要部分聚集結果的時候會調用該方法
	//會返回一個封裝了聚集計算當前狀態的對象
	public IntWritable terminatePartial()
	{
		return result;
	}
	//合並兩個部分聚集值會調用這個方法
	public boolean merge(IntWritable other)
	{
		return iterate(other);
	}
	//Hive需要最終聚集結果時候會調用該方法
	public IntWritable terminate()
	{
		return result;
	}
	}
}
   

 注意事項:

 1、用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF。 

 2、用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。 

 3、一個計算函數必須實現的5個方法的具體含義如下: 

  init():主要是負責初始化計算函數並且重設其內部狀態,一般就是重設其內部字段。一般在靜態類中定義一個內部字段來存放最終的結果。 

  iterate():每一次對一個新值進行聚集計算時候都會調用該方法,計算函數會根據聚集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。 

  terminatePartial():Hive需要部分聚集結果的時候會調用該方法,必須要返回一個封裝了聚集計算當前狀態的對象。 

  merge():Hive進行合並一個部分聚集和另一個部分聚集的時候會調用該方法。 

  terminate():Hive最終聚集結果的時候就會調用該方法。計算函數需要把狀態作為一個值返回給用戶。 

 4、部分聚集結果的數據類型和最終結果的數據類型可以不同。

Hive 的權限控制

 Hive從0.10可以通過元數據控制權限。但是Hive的權限控制並不是完全安全的。基本的授權方案的目的是防止用戶不小心做了不合適的事情。

 為了使用Hive的授權機制,有兩個參數必須在hive-site.xml中設置:

< property> < name>hive.security.authorization.enabled< /name> < value>true< /value> < description>enable or disable the hive client authorization< /description> < /property> < property> < name>hive.security.authorization.createtable.owner.grants< /name> < value>ALL< /value> < description>the privileges automatically granted to the owner whenever a table gets created. An example like "select,drop" will grant select and drop privilege to the owner of the table< /description> < /property>

 hive.security.authorization.enabled //參數是開啟權限驗證,默認為 false。

 hive.security.authorization.createtable.owner.grants //參數是指表的創建者對表擁有所有權限。

角色的創建和刪除

 Hive 中的角色定義與關系型數據庫中角色的定義類似,它是一種機制,給予那些沒有適當權限的用戶分配一定的權限。

 1) 創建角色。

語法:hive> create role role_name;
示例:hive> create role role_tes1;

 2) 刪除角色。

語法:drop role role_name
示例:drop role role_test1;

角色的授權和撤銷

 1) 把 role_test1 角色授權給 xiaojiang 用戶,命令如下。

hive> grant role role_test1 to user xiaojiang;

 2) 查看 xiaojiang 用戶被授權的角色,命令如下。

show role grant user xiaojiang;

 3) 取消 xiaojiang 用戶的 role_test1 角色,命令如下。

hive> revoke role role_test1 from user xiaojiang;

Hive 支持的權限控制。

 1) 把 select 權限授權給 xiaojiang 用戶,命令如下。

hive> grant select on database default to user xiaojiang;

 2) 查看 xiaojiang 被授予那些操作權限,命令如下。

hive> show grant user xiaojiang on database default;

 3) 收回 xiaojiang 的 select 權限,操作如下。

hive> revoke select on database default from user xiaojiang;

 4) 查看 xiaojiang 用戶擁有哪些權限,命令如下。

hive> show grant user xiaojiang on database default;

超級管理權限

 HIVE本身有權限管理功能,需要通過配置開啟。

< property> < name>hive.metastore.authorization.storage.checks< /name> < value>true< /value> < /property> < property> < name>hive.metastore.execute.setugi< /name> < value>false< /value> < /property> < property> < name>hive.security.authorization.enabled< /name> < value>true< /value> < /property> < property> < name>hive.security.authorization.createtable.owner.grants< /name> < value>ALL< /value> < /property>

 其中hive.security.authorization.createtable.owner.grants設置成ALL表示用戶對自己創建的表是有所有權限的(這樣是比較合理地)。

 開啟權限控制有Hive的權限功能還有一個需要完善的地方,那就是“超級管理員”。 Hive中沒有超級管理員,任何用戶都可以進行Grant/Revoke操作,為了完善“超級管理員”,必須添加hive.semantic.analyzer.hook配置,並實現自己的權限控制類。

 編寫權限控制類,代碼如下所示。

package com.xxx.hive;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
/**   
	* 設置Hive超級管理員   *   
	* @author   
	* @version $Id: AuthHook.java,v 0.1 2013-6-13 下午3:32:12 yinxiu Exp $  
	*/  
public class AuthHook extends AbstractSemanticAnalyzerHook { 
	private static String admin = "admin";
	@Override 27 public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, 28 ASTNode ast) throws SemanticException {
	switch (ast.getToken().getType()) { 
		case HiveParser.TOK_CREATEDATABASE: 
		case HiveParser.TOK_DROPDATABASE: 
		case HiveParser.TOK_CREATEROLE: 
		case HiveParser.TOK_DROPROLE:
		case HiveParser.TOK_GRANT: 
		case HiveParser.TOK_REVOKE: 
		case HiveParser.TOK_GRANT_ROLE: 
		case HiveParser.TOK_REVOKE_ROLE: 
		String userName = null;
		if (SessionState.get() != null  && SessionState.get().getAuthenticator() != null) {
			userName = SessionState.get().getAuthenticator().getUserName();
		} 
		if (!admin.equalsIgnoreCase(userName)) { 
			throw new SemanticException(userName + " can't use ADMIN options, except " + admin + ".");
		 } 
		break;
		default: 
		break;
		}
		return ast;
	} 
}

 添加了控制類之后還必須添加下面的配置:

< property> < name>hive.semantic.analyzer.hook< /name> < value>com.xxx.AuthHook< /value> < /property>

 若有使用hiveserver,hiveserver必須重啟。

 至此,只有admin用戶可以進行Grant/Revoke操作。
 權限操作示例:

grant select on database default to user xiaojiang;
revoke all on database default from user xiaojiang;
show grant user xiaojiang on database default;

Hive與JDBC示例

 在使用 JDBC 開發 Hive 程序時, 必須首先開啟 Hive 的遠程服務接口。使用下面命令進行開啟:

hive -service hiveserver &  //Hive低版本提供的服務是:hiveserver
hive --service hiveserver2 &	//Hive0.11.0以上版本提供了的服務是:hiveserver2

本課程我們使用的hive1.0版本,故我們使用hiveserver2服務,下面我使用 Java 代碼通過JDBC連接Hiveserver。

 1) 測試數據

 本地目錄/home/hadoop/下的djt.txt文件內容(每行數據之間用tab鍵隔開)如下所示:

1	dajiangtai
2	hadoop
3	hive
4	hbase
5	spark

 2) 程序代碼

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class Hive {
	private static String driverName = "org.apache.hive.jdbc.HiveDriver";//hive驅動名稱
	private static String url = "jdbc:hive2://djt11:10000/default";//連接hive2服務的連接地址,Hive0.11.0以上版本提供了一個全新的服務:HiveServer2
	private static String user = "hadoop";//對HDFS有操作權限的用戶
	private static String password = "";//在非安全模式下,指定一個用戶運行查詢,忽略密碼
	private static String sql = "";
	private static ResultSet res;
	public static void main(String[] args) {
	    try {
	        Class.forName(driverName);//加載HiveServer2驅動程序
	        Connection conn = DriverManager.getConnection(url, user, password);//根據URL連接指定的數據庫
	        Statement stmt = conn.createStatement();
	        
	        //創建的表名
	        String tableName = "testHiveDriverTable";
	        
	        /** 第一步:表存在就先刪除 **/
	        sql = "drop table " + tableName;
	        stmt.execute(sql);
	        
	        /** 第二步:表不存在就創建 **/
	        sql = "create table " + tableName + " (key int, value string)  row format delimited fields terminated by '\t' STORED AS TEXTFILE";
	        stmt.execute(sql);
	        
	        // 執行“show tables”操作
	        sql = "show tables '" + tableName + "'";
	        res = stmt.executeQuery(sql);
	        if (res.next()) {
	            System.out.println(res.getString(1));
	        }
	        
	        // 執行“describe table”操作
	        sql = "describe " + tableName;
	        res = stmt.executeQuery(sql);
	        while (res.next()) {  
	            System.out.println(res.getString(1) + "\t" + res.getString(2));
	        }
	        
	        // 執行“load data into table”操作
	        String filepath = "/home/hadoop/djt.txt";//hive服務所在節點的本地文件路徑
	        sql = "load data local inpath '" + filepath + "' into table " + tableName;
	        stmt.execute(sql);
	        
	        // 執行“select * query”操作
	        sql = "select * from " + tableName;
	        res = stmt.executeQuery(sql);
	        while (res.next()) {
	            System.out.println(res.getInt(1) + "\t" + res.getString(2));
	        }
	        
	        // 執行“regular hive query”操作,此查詢會轉換為MapReduce程序來處理
	        sql = "select count(*) from " + tableName;
	        res = stmt.executeQuery(sql);
	        while (res.next()) {
	            System.out.println(res.getString(1));
	        }        
	        conn.close();
	        conn = null;
	    } catch (ClassNotFoundException e) {
	        e.printStackTrace();
	        System.exit(1);
	    } catch (SQLException e) {
	        e.printStackTrace();
	        System.exit(1);
	    }
	}
}

 3) 運行結果(右擊-->Run as-->Run on Hadoop)

 執行“show tables”運行結果:

testhivedrivertable

 執行“describe table”運行結果:

key    int
value    string

 執行“select * query”運行結果:

1	dajiangtai
2	hadoop
3	hive
4	hbase
5	spark

 執行“regular hive query”運行結果:

5

hive性能調優

(一)Hadoop 計算框架的特性

什么是數據傾斜

由於數據的不均衡原因,導致數據分布不均勻,造成數據大量的集中到一點,造成數據熱點

Hadoop框架的特性

不怕數據大,怕數據傾斜

jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的

sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端的匯總合並優化,使數據傾斜不成問題

count(distinct ),在數據量大的情況下,效率較低,因為count(distinct)是按group by 字段分組,按distinct字段排序,一般這種分布方式是很傾斜的

(二)優化的常用手段

解決數據傾斜問題

減少job數

設置合理的map reduce的task數,能有效提升性能。

了解數據分布,自己動手解決數據傾斜問題是個不錯的選擇

數據量較大的情況下,慎用count(distinct)。

對小文件進行合並,是行至有效的提高調度效率的方法。

優化時把握整體,單個作業最優不如整體最優。

(三)Hive的數據類型方面的優化

優化原則

按照一定規則分區(例如根據日期)。通過分區,查詢的時候指定分區,會大大減少在無用數據上的掃描, 同時也非常方便數據清理。

合理的設置Buckets。在一些大數據join的情況下,map join有時候會內存不夠。如果使用Bucket Map Join的話,可以只把其中的一個bucket放到內存中,內存中原來放不下的內存表就變得可以放下。這需要使用buckets的鍵進行join的條件連結,並且需要如下設置

set hive.optimize.bucketmapjoin = true

(四)Hive的操作方面的優化

(1)全排序

Hive的排序關鍵字是SORT BY,它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序

(2)怎樣做笛卡爾積

當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積

MapJoin是的解決辦法

MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存

MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)

其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里

在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數

(3)控制Hive的Map數

通常情況下,作業會通過input的目錄產生一個或者多個map任務

主要的決定因素有: input的文件總個數,input的文件大小,集群設置的文件塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令查看到,該參數不能自定義修改)

是不是map數越多越好

答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

是不是保證每個map處理接近128m的文件塊,就高枕無憂了?

答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。

針對上面的問題3和4,我們需要采取兩種方式來解決:即減少map數和增加map數;

舉例

a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數

b)假設input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產生4個map數

即如果文件大於塊大小(128m),那么會拆分,如果小於塊大小,則把該文件當成一個塊

(4)怎樣決定reducer個數

Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率

不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:

參數1:hive.exec.reducers.bytes.per.reducer(默認為1G)

參數2 :hive.exec.reducers.max(默認為999)

計算reducer數的公式

N=min(參數2,總輸入數據量/參數1)

依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)

reduce個數並不是越多越好

同map一樣,啟動和初始化reduce也會消耗時間和資源;

另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題

什么情況下只有一個reduce

很多時候你會發現任務中不管數據量多大,不管你有沒有設置調整reduce個數的參數,任務中一直都只有一個reduce任務;

其實只有一個reduce任務的情況,除了數據量小於

hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:

a)沒有group by的匯總

b)用了Order by

(5)合並 MapReduce 操作

Multi-group by

Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便

FROM log

insert overwrite table test1 select log.id group by log.id

insert overwrite table test2 select log.name group by log.name

上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。

Bucket 與 Sampling

Bucket是指將數據以指定列的值為key進行hash,hash到指定數目的桶中。這樣就可以支持高效采樣了

Sampling可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而如果一個表已經對某一列制作了bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。

如下例所示就是采樣了test中32個桶中的第三個桶。

SELECT * FROM test 、、、TABLESAMPLE(BUCKET 3 OUT OF 32);

(6)JOIN 原則

在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊

原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率

Map Join

Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到

例如:

INSERT OVERWRITE TABLE phone_traffic

SELECT /*+ MAPJOIN(phone_location) */ l.phone,p.location,l.traffic from phone_location p join log l on (p.phone=l.phone)

相關的參數為:

hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.

hive.mapjoin.size.key = 10000

hive.mapjoin.cache.numrows = 10000

(7)Group By

Map 端部分聚合

並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果

基於 Hash

參數包括:

hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True

hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目

有數據傾斜的時候進行負載均衡

hive.groupby.skewindata = false

當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。

(8)合並小文件

文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的結果文件來消除這樣的影響:

hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True

hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False

hive.merge.size.per.task = 256*1000*1000 合並文件的大小

Hive 案例分析

這里我們以機頂盒產生的用戶收視數據為例,來具體分析如何使用Hive。

1、機頂盒產生的用戶原始數據都有一定的格式,包含機頂盒號、收看的頻道、收看的節目、收看的時間等信息。

2、用戶的原始數據通常不直接交給hive處理,而是需要經過一個清洗和轉化的過程。這個過程一般是通過Hadoop 作業來實現,轉化成與hive表對應的格式。

這個案例的具體步驟如下:

步驟1:用戶數據預處理

通過MapReduce作業將日志轉化為固定的格式。

用戶的原始數據如下所示。

< GHApp>< WIC cardNum="1370695139" stbNum="03111108020232488" date="2012-09-21" pageWidgetVersion="1.0">< A e="13:55:11" s="13:50:10" n="104" t="1" pi="789" p="%E5%86%8D%E5%9B%9E%E9%A6%96(21)" sn="BTV影視" />< /WIC>< /GHApp>

轉化之后的數據如下所示,每個字段我們使用"@"分割符號。

1370695139@03111108020232488@2012-09-21@BTV影視@再回首@13:50:10@13:55:11@301

上面的字段分別代表:機頂盒號、用戶編號、收看日期、頻道、欄目、起始時間、結束時間、收視時長。

步驟2:創建hive表

我們根據對應字段,使用hive創建表。

create table tvdata(cardnum string,stbnum string,date string,sn string,p string ,s string,e string,duration int) row format delimited fields terminated by '@' stored as textfile; 

步驟3:將hdfs中的數據導入表中

我們使用以下命令,將hdfs中的數據導入表中。

 load data inpath '/media/tvdata/part-r-00000' into table tvdata;

步驟4:編寫HQL,分析數據

使用HQL語句,統計每個的頻道的人均收視時長。

select sn,sum(duration)/count(*) from tvdata group by sn;

這里我們使用HQL只是從一個角度分析數據,大家可以嘗試從多個角度來分析數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM