Hive 基本語法


Hive 語法和SQL的類似,但不完全一樣,這里給出一個官方文檔地址和轉載一個優秀的教程(要問我為什么不寫?要寫完實在是太多了。。。)

官方DDL地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

下面是轉載的,轉載至:https://www.cnblogs.com/qiaoyihang/p/6181630.html

建表規則如下:

復制代碼
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
  [(col_name data_type [COMMENT col_comment], ...)] 
  [COMMENT table_comment] 
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
  [CLUSTERED BY (col_name, col_name, ...) 
  [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
  [ROW FORMAT row_format] 
  [STORED AS file_format] 
  [LOCATION hdfs_path]
復制代碼

•CREATE TABLE 創建一個指定名字的表。如果相同名字的表已經存在,則拋出異常;用戶可以用 IF NOT EXIST 選項來忽略這個異常

•EXTERNAL 關鍵字可以讓用戶創建一個外部表,在建表的同時指定一個指向實際數據的路徑(LOCATION)

•LIKE 允許用戶復制現有的表結構,但是不復制數據

•COMMENT可以為表與字段增加描述

•ROW FORMAT DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]

[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]

| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]

用戶在建表的時候可以自定義 SerDe 或者使用自帶的 SerDe。如果沒有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,將會使用自帶的 SerDe。在建表的時候,用戶還需要為表指定列,用戶在指定表的列的同時也會指定自定義的 SerDe,Hive 通過 SerDe 確定表的具體的列的數據。

•STORED AS

SEQUENCEFILE

| TEXTFILE

| RCFILE

| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname

如果文件數據是純文本,可以使用 STORED AS TEXTFILE。如果數據需要壓縮,使用 STORED AS SEQUENCE 。

例:創建外部表

hive> CREATE EXTERNAL TABLE IF NOT EXISTS student2
    > (sno INT,sname STRING,age INT,sex STRING)   
    > ROW FORMAT DELIMITED                        
    > FIELDS TERMINATED BY '\t'                   
    > STORED AS TEXTFILE                          
    > LOCATION '/user/external';      

 

一些基本操作:

復制代碼
刪除:
 hive> DROP TABLE test1;

修改表結構:
 DESC student1;
hive> ALTER TABLE student1 ADD COLUMNS  
    > (address STRING,grade STRING);

修改表名:
hive> ALTER TABLE student1 RENAME TO student3;

創建和已知表相同結構的表:
hive> CREATE TABLE copy_student1 LIKE student1;

導入外部文件數據:
加載數據到student1表中
LOAD DATA LOCAL INPATH '/home/hadoop/data/student1.txt' INTO TABLE student1;

加載hdfs中的文件:
LOAD DATA INPATH '/user/hive/student1.txt' INTO TABLE copy_student1;

復制表數據:
 INSERT OVERWRITE TABLE copy_student2 SELECT * FROM student1;

多表同時復制:
hive> FROM student1                                       
    > INSERT OVERWRITE TABLE copy_student3
    > SELECT *                            
    > INSERT OVERWRITE TABLE copy_student4
    > SELECT *;
復制代碼

 

ORDER BY 會對輸入做全局排序,因此只有一個 Reduce(多個 Reduce 無法保證全局有序)會導致當輸入規模較大時,需要較長的計算時間。使用 ORDER BY 查詢的時候,為了優化查詢的速度,使用 hive.mapred.mode 屬性。

hive.mapred.mode = nonstrict;(default value/默認值)
hive.mapred.mode=strict;

與數據庫中 ORDER BY 的區別在於,在 hive.mapred.mode=strict 模式下必須指定limit ,否則執行會報錯。

hive> set hive.mapred.mode=strict;
hive> select * from group_test order by uid limit 5;

 

sort by 不受 hive.mapred.mode 的值是否為 strict 和 nostrict 的影響。sort by 的數據只能保證在同一個 Reduce 中的數據可以按指定字段排序。

使用 sort by 可以指定執行的 Reduce 個數(set mapred.reduce.tasks=< number>)這樣可以輸出更多的數據。對輸出的數據再執行歸並排序,即可以得到全部結果。

hive> set hive.mapred.mode=strict;
hive> select * from group_test sort by uid ;

 

DISTRIBUTE BY 排序查詢

-- 按照指定的字段對數據划分到不同的輸出 Reduce 文件中,操作如下。
hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test distribute by length(gender);

--此方法根據 gender 的長度划分到不同的 Reduce 中,最終輸出到不同的文件中。length 是內建函數,也可以指定其它的函數或者使用自定義函數。
hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test order by gender  distribute by length(gender);
order by gender 與 distribute by length(gender) 不能共用。

 

 

索引操作

創建一個索引

復制代碼
hive> create index user_index on table user(id) 
    > as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' 
    > with deferred rebuild
    > IN TABLE user_index_table;
hive> alter index user_index on user rebuild;
hive> select * from user_index_table limit 5; 
0       hdfs://mycluster/user/hive/warehouse/table02/000000_0   [0]
1       hdfs://mycluster/user/hive/warehouse/table02/000000_0   [352]
2       hdfs://mycluster/user/hive/warehouse/table02/000000_0   [704]
3       hdfs://mycluster/user/hive/warehouse/table02/000000_0   [1056]
4       hdfs://mycluster/user/hive/warehouse/table02/000000_0   [1408]
Time taken: 0.244 seconds, Fetched: 5 row(s)
復制代碼

 

 

索引案例

創建一個索引測試表 index_test,dt作為分區屬性,“ROW FORMAT DELIMITED FILEDS TERMINATED BY ','” 表示用逗號分割字符串,默認為‘\001’。

 create table index_test(id INT,name STRING) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

創建一個臨時索引表 index_tmp。

hive> create table index_tmp(id INT,name STRING,dt STRING) ROW FORMAT DELIMITED FILEDS TERMINATED BY ',';

加載本地數據到 index_tmp 表中。

hive> load data local inpath '/home/hadoop/djt/test.txt' into table index_tmp

設置 Hive 的索引屬性來優化索引查詢,命令如下。

hive> set hive.exec.dynamic.partition.mode=nonstrict;----設置所有列為 dynamic partition
hive> set hive.exec.dynamic.partition=true;----使用動態分區

查詢index_tmp 表中的數據,插入 table_test 表中。

hive> insert overwrite table index_test partition(dt) select id,name,dt from index_tmp;
復制代碼
--使用 index_test 表,在屬性 id 上創建一個索引 index1_index_test 。
hive> create index index1_index_test on table index_test(id) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' WITH DEFERERD REBUILD;

--填充索引數據。
hive> alter index index1_index_test on index_test rebuild;

--查看創建的索引。
hive> show index on index_test

-- 查看分區信息。
hive> show partitions index_test;
復制代碼

 

修改配置文件信息:

復制代碼
< property>
    < name>hive.optimize.index.filter< /name>
    < value>true< /value>
< /property>
< property>
    < name>hive.optimize.index.groupby< /name>
    < value>true< /value>
< /property>
< property>
    < name>hive.optimize.index.filter.compact.minsize< /name>
    < value>5120< /value>
< /property>
復制代碼

hive.optimize.index.filter 和 hive.optimize.index.groupby 參數默認是 false。使用索引的時候必須把這兩個參數開啟,才能起到作用。

hive.optimize.index.filter.compact.minsize 參數為輸入一個緊湊的索引將被自動采用最小尺寸、默認5368709120(以字節為單位)。

 

分區操作

Hive 的分區通過在創建表時啟動 PARTITION BY 實現,用來分區的維度並不是實際數據的某一列,具體分區的標志是由插入內容時給定的。當要查詢某一分區的內容時可以采用 WHERE 語句, 例如使用 “WHERE tablename.partition_key>a” 創建含分區的表。創建分區語法如下。

CREATE TABLE table_name(
...
)
PARTITION BY (dt STRING,country STRING)

1、 創建分區

Hive 中創建分區表沒有什么復雜的分區類型(范圍分區、列表分區、hash 分區,混合分區等)。分區列也不是表中的一個實際的字段,而是一個或者多個偽列。意思是說,在表的數據文件中實際並不保存分區列的信息與數據。

創建一個簡單的分區表。

hive> create table partition_test(member_id string,name string) partitioned by (stat_date string,province string) row format delimited fields terminated by ',';

 

復制代碼
--這個例子中創建了 stat_date 和 province 兩個字段作為分區列。通常情況下需要預先創建好分區,然后才能使用該分區。例如:
hive> alter table partition_test add partition (stat_date='2015-01-18',province='beijing');

--這樣就創建了一個分區。這時會看到 Hive 在HDFS 存儲中創建了一個相應的文件夾。
$ hadoop fs -ls /user/hive/warehouse/partition_test/stat_date=2015-01-18
/user/hive/warehouse/partition_test/stat_date=2015-01-18/province=beijing----顯示剛剛創建的分區

每一個分區都會有一個獨立的文件夾,在上面例子中,stat_date 是主層次,province 是 副層次。
復制代碼

 

復制代碼
--向分區表中插入數據
--使用一個輔助的非分區表 partition_test_input 准備向 partition_test 中插入數據,實現步驟如下。

insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu';

向多個分區插入數據,命令如下。
hive> from partition_test_input
insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='sichuan') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='sichuan'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='beijing') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='beijing';
復制代碼

 

 動態分區的產生

按照上面的方法向分區表中插入數據,如果數據源很大,針對一個分區就要寫一個 insert ,非常麻煩。使用動態分區可以很好地解決上述問題。動態分區可以根據查詢得到的數據自動匹配到相應的分區中去。動態分區可以通過下面的設置來打開:

set hive.exec.dynamic.partition=true;  
set hive.exec.dynamic.partition.mode=nonstrict; 

動態分區的使用方法很簡單,假設向 stat_date='2015-01-18' 這個分區下插入數據,至於 province 插到哪個子分區下讓數據庫自己來判斷。stat_date 叫做靜態分區列,province 叫做動態分區列。

hive> insert overwrite table partition_test partition(stat_date='2015-01-18',province)
select member_id,name province from partition_test_input where stat_date='2015-01-18';

注意,動態分區不允許主分區采用動態列而副分區采用靜態列,這樣將導致所有的主分區都要創建副分區靜態列所定義的分區。

 

幾個常用參數

hive.exec.max.dynamic.partitions.pernode:每一個 MapReduce Job 允許創建的分區的最大數量,如果超過這個數量就會報錯(默認值100)。

hive.exec.max.dynamic.partitions:一個 dml 語句允許創建的所有分區的最大數量(默認值100)。

hive.exec.max.created.files:所有 MapReduce Job 允許創建的文件的最大數量(默認值10000)。

盡量讓分區列的值相同的數據在同一個 MapReduce 中,這樣每一個 MapReduce 可以盡量少地產生新的文件夾,可以通過 DISTRIBUTE BY 將分區列值相同的數據放到一起,命令如下。

hive> insert overwrite table partition_test partition(stat_date,province)
select memeber_id,name,stat_date,province from partition_test_input distribute by stat_date,province;

 

桶操作

Hive 中 table 可以拆分成 Partition table 和 桶(BUCKET),table和partition可以通過‘CLUSTERED BY ’進一步分bucket,BUCKET 中的數據可以通過 SORT BY 排序。

BUCKET 主要作用如下。

 

1)數據 sampling;

 

2)提升某些查詢操作效率,例如 Map-Side Join。

 

需要特別主要的是,CLUSTERED BY 和 SORT BY 不會影響數據的導入,這意味着,用戶必須自己負責數據的導入,包括數據額分桶和排序。 'set hive.enforce.bucketing=true' 可以自動控制上一輪 Reduce 的數量從而適配 BUCKET 的個數,當然,用戶也可以自主設置 mapred.reduce.tasks 去適配 BUCKET 個數,推薦使用:

hive> set hive.enforce.bucketing=true;
復制代碼
1) 創建臨時表 student_tmp,並導入數據。
hive> desc student_tmp;
hive> select * from student_tmp;

2) 創建 student 表。
hive> create table student(id int,age int,name string)
partitioned by (stat_date string)
clustered by (id) sorted by(age) into 2 bucket
row format delimited fields terminated by ',';

3) 設置環境變量。
hive> set hive.enforce.bucketing=true;

4) 插入數據。
hive> from student_tmp
insert overwrite table student partition(stat_date='2015-01-19')
select id,age,name where stat_date='2015-01-18' sort by age;

5) 查看文件目錄。
$ hadoop fs -ls /usr/hive/warehouse/student/stat_date=2015-01-19/

6) 查看 sampling 數據。
hive> select * from student tablesample(bucket 1 out of 2 on id);
tablesample 是抽樣語句,語法如下。

tablesample(bucket x out of y)

y 必須是 table 中 BUCKET 總數的倍數或者因子。
復制代碼

 

Hive 復合類型

 

hive提供了復合數據類型:

 

 1)Structs: structs內部的數據可以通過DOT(.)來存取。例如,表中一列c的類型為STRUCT{a INT; b INT},我們可以通過c.a來訪問域a。

 

 2)Map(K-V對):訪問指定域可以通過["指定域名稱"]進行。例如,一個Map M包含了一個group-》gid的kv對,gid的值可以通過M['group']來獲取。

 

 3)Array:array中的數據為相同類型。例如,假如array A中元素['a','b','c'],則A[1]的值為'b'

復制代碼
1、Struct使用

 1) 建表
hive> create table student_test(id INT, info struct< name:STRING, age:INT>)  
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ','                         
> COLLECTION ITEMS TERMINATED BY ':';
 'FIELDS TERMINATED BY' :字段與字段之間的分隔符。'COLLECTION ITEMS TERMINATED BY' :一個字段各個item的分隔符。

 2) 導入數據
$ cat test5.txt   
1,zhou:30  
2,yan:30  
3,chen:20  
4,li:80  
hive> LOAD DATA LOCAL INPATH '/home/hadoop/djt/test5.txt' INTO TABLE student_test;

 3) 查詢數據
hive> select info.age from student_test;  

2、Array使用
 1) 建表
hive> create table class_test(name string, student_id_list array< INT>)  
> ROW FORMAT DELIMITED                                              
> FIELDS TERMINATED BY ','                                          
> COLLECTION ITEMS TERMINATED BY ':';

 2) 導入數據
$ cat test6.txt   
034,1:2:3:4  
035,5:6  
036,7:8:9:10  
hive>  LOAD DATA LOCAL INPATH '/home/work/data/test6.txt' INTO TABLE class_test ; 

 3) 查詢
hive> select student_id_list[3] from class_test; 

3、Map使用
 1) 建表
hive> create table employee(id string, perf map< string, int>)       
> ROW FORMAT DELIMITED                                          
> FIELDS TERMINATED BY '\t'                                
> COLLECTION ITEMS TERMINATED BY ','                       
> MAP KEYS TERMINATED BY ':';  
 ‘MAP KEYS TERMINATED BY’ :key value分隔符

 2) 導入數據
$ cat test7.txt   
1       job:80,team:60,person:70  
2       job:60,team:80  
3       job:90,team:70,person:100  
hive>  LOAD DATA LOCAL INPATH '/home/work/data/test7.txt' INTO TABLE employee;  

 3) 查詢
hive> select perf['person'] from employee;
復制代碼

 

 

Hive 的 JOIN 用法

 hive只支持等連接,外連接,左半連接。hive不支持非相等的join條件(通過其他方式實現,如left outer join),因為它很難在map/reduce job實現這樣的條件。而且,hive可以join兩個以上的表。

復制代碼
1、等連接

只有等連接才允許
hive> SELECT a.* FROM a JOIN b ON (a.id = b.id)  
hive> SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department) 

2、多表連接
 同個查詢,可以join兩個以上的表
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 

3、join的緩存和任務轉換
 hive轉換多表join時,如果每個表在join字句中,使用的都是同一個列,只會轉換為一個單獨的map/reduce。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
被轉換為兩個map/reduce任務,因為b的key1列在第一個join條件使用,而b表的key2列在第二個join條件使用。第一個map/reduce任務join a和b。第二個任務是第一個任務的結果join c。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 在join的每個map/reduce階段,序列中的最后一個表,當其他被緩存時,它會流到reducers。所以,reducers需要緩存join關鍵字的特定值組成的行,通過組織最大的表出現在序列的最后,有助於減少reducers的內存。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三個表,在同一個獨立的map/reduce任務做join。a和b的key對應的特定值組成的行,會緩存在reducers的內存。然后reducers接受c的每一行,和緩存的每一行做join計算。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
這里有兩個map/reduce任務在join計算被調用。第一個是a和b做join,然后reducers緩存a的值,另一邊,從流接收b的值。第二個階段,reducers緩存第一個join的結果,另一邊從流接收c的值。 在join的每個map/reduce階段,通過關鍵字,可以指定哪個表從流接收。
hive> SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三個表的連接,會轉換為一個map/reduce任務,reducer會把b和c的key的特定值緩存在內存里,然后從流接收a的每一行,和緩存的行做join。 4、join的結果 LEFT,RIGHT,FULL OUTER連接存在是為了提供ON語句在沒有匹配時的更多控制。例如,這個查詢: hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) 將會返回a的每一行。如果b.key等於a.key,輸出將是a.val,b.val,如果a沒有和b.key匹配,輸出的行將是 a.val,NULL。如果b的行沒有和a.key匹配上,將被拋棄。語法"FROM a LEFT OUTER JOIN b"必須寫在一行,為了理解它如何工作——這個查詢,a是b的左邊,a的所有行會被保持;RIGHT OUTER JOIN將保持b的所有行, FULL OUTER JOIN將會保存a和b的所有行。OUTER JOIN語義應該符合標准的SQL規范。 5、join的過濾 Joins發生在where字句前,所以,如果要限制join的輸出,需要寫在where字句,否則寫在JOIN字句。現在討論的一個混亂的大點,就是分區表 hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) WHERE a.ds='2009-07-07' AND b.ds='2009-07-07' 將會連接a和b,產生a.val和b.val的列表。WHERE字句,也可以引用join的輸出列,然后過濾他們。 但是,無論何時JOIN的行找到a的key,但是找不到b的key時,b的所有列會置成NULL,包括ds列。這就是說,將過濾join輸出的所有行,包括沒有合法的b.key的行。然后你會在LEFT OUTER的要求撲空。 也就是說,如果你在WHERE字句引用b的任何列,LEFT OUTER的部分join結果是不相關的。所以,當外連接時,使用這個語句 hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07'; join的輸出會預先過濾,然后你不用對有a.key而沒有b.key的行做過濾。RIGHT和FULL join也是一樣的邏輯。 6、join的順序 join是不可替換的,連接是從左到右,不管是LEFT或RIGHT join。 hive> SELECT a.val1, a.val2, b.val, c.val FROM a JOIN b ON (a.key = b.key) LEFT OUTER JOIN c ON (a.key = c.key) 首先,連接a和b,扔掉a和b中沒有匹配的key的行。結果表再連接c。這提供了直觀的結果,如果有一個鍵都存在於A和C,但不是B:完整行(包括 a.val1,a.val2,a.key)會在"a jOIN b"步驟,被丟棄,因為它不在b中。結果沒有a.key,所以當它和c做LEFT OUTER JOIN,c.val也無法做到,因為沒有c.key匹配a.key(因為a的行都被移除了)。類似的,RIGHT OUTER JOIN(替換為LEFT),我們最終會更怪的效果,NULL, NULL, NULL, c.val。因為盡管指定了join key是a.key=c.key,我們已經在第一個JOIN丟棄了不匹配的a的所有行。 為了達到更直觀的效果,相反,我們應該從 hive> FROM c LEFT OUTER JOIN a ON (c.key = a.key) LEFT OUTER JOIN b ON (c.key = b.key). LEFT SEMI JOIN實現了相關的IN / EXISTS的子查詢語義的有效途徑。由於Hive目前不支持IN / EXISTS的子查詢,所以你可以用 LEFT SEMI JOIN 重寫你的子查詢語句。LEFT SEMI JOIN 的限制是, JOIN 子句中右邊的表只能在 ON 子句中設置過濾條件,在 WHERE 子句、SELECT 子句或其他地方過濾都不行。 hive> SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B); 可以重寫為 hive> SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key) 7、map 端 join 但如果所有被連接的表是小表,join可以被轉換為只有一個map任務。查詢是 hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key 不需要reducer。對於每一個mapper,A和B已經被完全讀出。限制是a FULL/RIGHT OUTER JOIN b不能使用。 如果表在join的列已經分桶了,其中一張表的桶的數量,是另一個表的桶的數量的整倍,那么兩者可以做桶的連接。如果A有4個桶,表B有4個桶,下面的連接: hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key 只能在mapper工作。為了為A的每個mapper完整抽取B。對於上面的查詢,mapper處理A的桶1,只會抽取B的桶1,這不是默認行為,要使用以下參數: hive> set hive.optimize.bucketmapjoin = true; 如果表在join的列經過排序,分桶,而且他們有相同數量的桶,可以使用排序-合並 join。每個mapper,相關的桶會做連接。如果A和B有4個桶 hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM A a join B b on a.key = b.key 只能在mapper使用。使用A的桶的mapper,也會遍歷B相關的桶。這個不是默認行為,需要配置以下參數: hive> set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; hive> set hive.optimize.bucketmapjoin = true; hive> set hive.optimize.bucketmapjoin.sortedmerge = true;
復制代碼

 

Hive 內置操作符與函數

 1)字符串長度函數:length

2)字符串反轉函數:reverse

 3)字符串連接函數:concat

 4)帶分隔符字符串連接函數:concat_ws

 5)字符串截取函數:substr,substring

  7)字符串轉大寫函數:upper,ucase

 8)字符串轉小寫函數:lower,lcase

9)去空格函數:trim

 10)左邊去空格函數:ltrim

11)右邊去空格函數:rtrim

集合統計函數

 1) 個數統計函數 count。

 2) 總和統計函數 sum。

 3) 平均值統計函數avg。

4) 最小值統計函數 min。統計結果集中 col 字段的最小值。

 5) 最大值統計函數 max。統計結果集中 col 字段的最大值。

 

復合類型操作

 1) Map 類型構建。根據輸入的 Key-Value 對構建 Map 類型。

語法:map(key1, value1, key2, value2,...)
舉例:
hive>create table map_test as select map('100','jay','200','liu') from student;
hive>describe map_test;
hive>select map_test from student;

 2) Struct 類型構建。根據輸入的參數構建結構體 Struct 類型。

語法:struct(val1, val2, val3, ...)
舉例:
hive>create table struct_test as select struct('jay','liu','gang') from student;
hive>describe struct_test;
hive>select struct_test from student;

3) Array 類型構建。根據輸入的參數構建數組 Array 類型。

語法:array(val1,val2, ...)
舉例:
hive> create table array_test as select array('jay','liu','gang') from student;
hive> describe array_test;
hive> select array_test from array_test;

用戶自定義函數 UDF

 UDF(User Defined Function,用戶自定義函數) 對數據進行處理。UDF 函數可以直接應用於 select 語句,對查詢結構做格式化處理后,再輸出內容。

 Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。Hive中有3種UDF:

 1)UDF:操作單個數據行,產生單個數據行。

 2)UDAF:操作多個數據行,產生一個數據行。

 3)UDTF:操作一個數據行,產生多個數據行一個表作為輸出。

 用戶構建的UDF使用過程如下:

 第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。

 第二步:將寫好的類打包為jar。如hivefirst.jar。

 第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar 注冊該jar文件。

 第四步:為該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這里注意UDF只是為這個Hive會話臨時定義的。

 第五步:在select中使用mylength()。

自定義UDF

復制代碼
package whut;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
//UDF是作用於單個數據行,產生一個數據行
//用戶必須要繼承UDF,且必須至少實現一個evalute方法,該方法並不在UDF中
//但是Hive會檢查用戶的UDF是否擁有一個evalute方法
public class Strip extends UDF{
    private Text result=new Text();
    //自定義方法
    public Text evaluate(Text str)
    {
        if(str==null)
        return null;
        result.set(StringUtils.strip(str.toString()));
        return result;
    }
    public Text evaluate(Text str,String stripChars)
    {
        if(str==null)
        return null;
        result.set(StringUtils.strip(str.toString(),stripChars));
        return result;
    }
}
復制代碼

注意事項:

 1、一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;

 2、一個UDF必須要包含有evaluate()方法,但是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶自己定義的。在使用的時候,Hive會調用UDF的evaluate()方法。

 

自定義UDAF找到最大值

復制代碼
package whut;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是輸入多個數據行,產生一個數據行
//用戶自定義的UDAF必須是繼承了UDAF,且內部包含多個實現了exec的靜態類
public class MaxiNumber extends UDAF{
    public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
    //最終結果
    private IntWritable result;
    //負責初始化計算函數並設置它的內部狀態,result是存放最終結果的
    @Override
    public void init() {
        result=null;
    }
    //每次對一個新值進行聚集計算都會調用iterate方法
    public boolean iterate(IntWritable value)
    {
        if(value==null)
            return false;
        if(result==null)
            result=new IntWritable(value.get());
        else
            result.set(Math.max(result.get(), value.get()));
        return true;
    }
     
    //Hive需要部分聚集結果的時候會調用該方法
    //會返回一個封裝了聚集計算當前狀態的對象
    public IntWritable terminatePartial()
    {
        return result;
    }
    //合並兩個部分聚集值會調用這個方法
    public boolean merge(IntWritable other)
    {
        return iterate(other);
    }
    //Hive需要最終聚集結果時候會調用該方法
    public IntWritable terminate()
    {
        return result;
    }
    }
}
復制代碼

 注意事項:

 1、用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF。 

 2、用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。 

 3、一個計算函數必須實現的5個方法的具體含義如下: 

  init():主要是負責初始化計算函數並且重設其內部狀態,一般就是重設其內部字段。一般在靜態類中定義一個內部字段來存放最終的結果。 

  iterate():每一次對一個新值進行聚集計算時候都會調用該方法,計算函數會根據聚集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。 

  terminatePartial():Hive需要部分聚集結果的時候會調用該方法,必須要返回一個封裝了聚集計算當前狀態的對象。 

  merge():Hive進行合並一個部分聚集和另一個部分聚集的時候會調用該方法。 

  terminate():Hive最終聚集結果的時候就會調用該方法。計算函數需要把狀態作為一個值返回給用戶。 

 4、部分聚集結果的數據類型和最終結果的數據類型可以不同。

 

Hive 的權限控制

 Hive從0.10可以通過元數據控制權限。但是Hive的權限控制並不是完全安全的。基本的授權方案的目的是防止用戶不小心做了不合適的事情。

 為了使用Hive的授權機制,有兩個參數必須在hive-site.xml中設置:

復制代碼
< property> 
    < name>hive.security.authorization.enabled< /name> 
    < value>true< /value> 
     < description>enable or disable the hive client authorization< /description> 
 < /property> 
 
< property> 
     < name>hive.security.authorization.createtable.owner.grants< /name> 
     < value>ALL< /value> 
     < description>the privileges automatically granted to the owner whenever a table gets created. An example like "select,drop" will grant select and drop privilege to the owner of the table< /description>
< /property>
復制代碼

 hive.security.authorization.enabled //參數是開啟權限驗證,默認為 false。

 hive.security.authorization.createtable.owner.grants //參數是指表的創建者對表擁有所有權限。

角色的創建和刪除

 Hive 中的角色定義與關系型數據庫中角色的定義類似,它是一種機制,給予那些沒有適當權限的用戶分配一定的權限。

復制代碼
 1) 創建角色。
語法:hive> create role role_name;
示例:hive> create role role_tes1;

 2) 刪除角色。
語法:drop role role_name
示例:drop role role_test1;

角色的授權和撤銷
 1) 把 role_test1 角色授權給 xiaojiang 用戶,命令如下。
hive> grant role role_test1 to user xiaojiang;

 2) 查看 xiaojiang 用戶被授權的角色,命令如下。
show role grant user xiaojiang;

 3) 取消 xiaojiang 用戶的 role_test1 角色,命令如下。
hive> revoke role role_test1 from user xiaojiang;

Hive 支持的權限控制。
 1) 把 select 權限授權給 xiaojiang 用戶,命令如下。
hive> grant select on database default to user xiaojiang;

 2) 查看 xiaojiang 被授予那些操作權限,命令如下。
hive> show grant user xiaojiang on database default;

 3) 收回 xiaojiang 的 select 權限,操作如下。
hive> revoke select on database default from user xiaojiang;

 4) 查看 xiaojiang 用戶擁有哪些權限,命令如下。
hive> show grant user xiaojiang on database default;
復制代碼

超級管理權限

 HIVE本身有權限管理功能,需要通過配置開啟。

復制代碼
< property> 
    < name>hive.metastore.authorization.storage.checks< /name>
    < value>true< /value>
< /property>

< property>
    < name>hive.metastore.execute.setugi< /name>
    < value>false< /value>
< /property>

< property>
    < name>hive.security.authorization.enabled< /name>
    < value>true< /value>
< /property>

< property>
    < name>hive.security.authorization.createtable.owner.grants< /name>
    < value>ALL< /value>
< /property>
復制代碼

 

 其中hive.security.authorization.createtable.owner.grants設置成ALL表示用戶對自己創建的表是有所有權限的(這樣是比較合理地)。

 開啟權限控制有Hive的權限功能還有一個需要完善的地方,那就是“超級管理員”。 Hive中沒有超級管理員,任何用戶都可以進行Grant/Revoke操作,為了完善“超級管理員”,必須添加hive.semantic.analyzer.hook配置,並實現自己的權限控制類。

 編寫權限控制類,代碼如下所示。

復制代碼
package com.xxx.hive;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
/**   
    * 設置Hive超級管理員   *   
    * @author   
    * @version $Id: AuthHook.java,v 0.1 2013-6-13 下午3:32:12 yinxiu Exp $  
    */  
public class AuthHook extends AbstractSemanticAnalyzerHook { 
    private static String admin = "admin";
    @Override 27 public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, 28 ASTNode ast) throws SemanticException {
    switch (ast.getToken().getType()) { 
        case HiveParser.TOK_CREATEDATABASE: 
        case HiveParser.TOK_DROPDATABASE: 
        case HiveParser.TOK_CREATEROLE: 
        case HiveParser.TOK_DROPROLE:
        case HiveParser.TOK_GRANT: 
        case HiveParser.TOK_REVOKE: 
        case HiveParser.TOK_GRANT_ROLE: 
        case HiveParser.TOK_REVOKE_ROLE: 
        String userName = null;
        if (SessionState.get() != null  && SessionState.get().getAuthenticator() != null) {
            userName = SessionState.get().getAuthenticator().getUserName();
        } 
        if (!admin.equalsIgnoreCase(userName)) { 
            throw new SemanticException(userName + " can't use ADMIN options, except " + admin + ".");
         } 
        break;
        default: 
        break;
        }
        return ast;
    } 
}
復制代碼

 添加了控制類之后還必須添加下面的配置:

< property> 
    < name>hive.semantic.analyzer.hook< /name> 
    < value>com.xxx.AuthHook< /value>  
< /property>

若有使用hiveserver,hiveserver必須重啟。

 至此,只有admin用戶可以進行Grant/Revoke操作。
 權限操作示例:

grant select on database default to user xiaojiang;
revoke all on database default from user xiaojiang;
show grant user xiaojiang on database default;

Hive與JDBC示例

 在使用 JDBC 開發 Hive 程序時, 必須首先開啟 Hive 的遠程服務接口。使用下面命令進行開啟:

hive -service hiveserver &  //Hive低版本提供的服務是:hiveserver
hive --service hiveserver2 &    //Hive0.11.0以上版本提供了的服務是:hiveserver2

本課程我們使用的hive1.0版本,故我們使用hiveserver2服務,下面我使用 Java 代碼通過JDBC連接Hiveserver。

1) 測試數據

 本地目錄/home/hadoop/下的djt.txt文件內容(每行數據之間用tab鍵隔開)如下所示:

1    dajiangtai
2    hadoop
3    hive
4    hbase
5    spark

 2) 程序代碼

復制代碼
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class Hive {
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";//hive驅動名稱
    private static String url = "jdbc:hive2://djt11:10000/default";//連接hive2服務的連接地址,Hive0.11.0以上版本提供了一個全新的服務:HiveServer2
    private static String user = "hadoop";//對HDFS有操作權限的用戶
    private static String password = "";//在非安全模式下,指定一個用戶運行查詢,忽略密碼
    private static String sql = "";
    private static ResultSet res;
    public static void main(String[] args) {
        try {
            Class.forName(driverName);//加載HiveServer2驅動程序
            Connection conn = DriverManager.getConnection(url, user, password);//根據URL連接指定的數據庫
            Statement stmt = conn.createStatement();
            
            //創建的表名
            String tableName = "testHiveDriverTable";
            
            /** 第一步:表存在就先刪除 **/
            sql = "drop table " + tableName;
            stmt.execute(sql);
            
            /** 第二步:表不存在就創建 **/
            sql = "create table " + tableName + " (key int, value string)  row format delimited fields terminated by '\t' STORED AS TEXTFILE";
            stmt.execute(sql);
            
            // 執行“show tables”操作
            sql = "show tables '" + tableName + "'";
            res = stmt.executeQuery(sql);
            if (res.next()) {
                System.out.println(res.getString(1));
            }
            
            // 執行“describe table”操作
            sql = "describe " + tableName;
            res = stmt.executeQuery(sql);
            while (res.next()) {  
                System.out.println(res.getString(1) + "\t" + res.getString(2));
            }
            
            // 執行“load data into table”操作
            String filepath = "/home/hadoop/djt.txt";//hive服務所在節點的本地文件路徑
            sql = "load data local inpath '" + filepath + "' into table " + tableName;
            stmt.execute(sql);
            
            // 執行“select * query”操作
            sql = "select * from " + tableName;
            res = stmt.executeQuery(sql);
            while (res.next()) {
                System.out.println(res.getInt(1) + "\t" + res.getString(2));
            }
            
            // 執行“regular hive query”操作,此查詢會轉換為MapReduce程序來處理
            sql = "select count(*) from " + tableName;
            res = stmt.executeQuery(sql);
            while (res.next()) {
                System.out.println(res.getString(1));
            }        
            conn.close();
            conn = null;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            System.exit(1);
        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}
復制代碼
復制代碼
3) 運行結果(右擊-->Run as-->Run on Hadoop)

 執行“show tables”運行結果:

testhivedrivertable
 執行“describe table”運行結果:

key    int
value    string
 執行“select * query”運行結果:

1    dajiangtai
2    hadoop
3    hive
4    hbase
5    spark
 執行“regular hive query”運行結果:

5
復制代碼

hive性能調優

hive性能調優

(一)Hadoop 計算框架的特性

什么是數據傾斜

由於數據的不均衡原因,導致數據分布不均勻,造成數據大量的集中到一點,造成數據熱點

Hadoop框架的特性

不怕數據大,怕數據傾斜

jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的

sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端的匯總合並優化,使數據傾斜不成問題

count(distinct ),在數據量大的情況下,效率較低,因為count(distinct)是按group by 字段分組,按distinct字段排序,一般這種分布方式是很傾斜的

(二)優化的常用手段

解決數據傾斜問題

減少job數

設置合理的map reduce的task數,能有效提升性能。

了解數據分布,自己動手解決數據傾斜問題是個不錯的選擇

數據量較大的情況下,慎用count(distinct)。

對小文件進行合並,是行至有效的提高調度效率的方法。

優化時把握整體,單個作業最優不如整體最優。

 

(三)Hive的數據類型方面的優化

優化原則

按照一定規則分區(例如根據日期)。通過分區,查詢的時候指定分區,會大大減少在無用數據上的掃描, 同時也非常方便數據清理。

合理的設置Buckets。在一些大數據join的情況下,map join有時候會內存不夠。如果使用Bucket Map Join的話,可以只把其中的一個bucket放到內存中,內存中原來放不下的內存表就變得可以放下。這需要使用buckets的鍵進行join的條件連結,並且需要如下設置

set hive.optimize.bucketmapjoin = true

(四)Hive的操作方面的優化

(1)全排序

Hive的排序關鍵字是SORT BY,它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序

(2)怎樣做笛卡爾積

當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積

MapJoin是的解決辦法

MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存

MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)

其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里

在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數

(3)控制Hive的Map數

通常情況下,作業會通過input的目錄產生一個或者多個map任務

主要的決定因素有: input的文件總個數,input的文件大小,集群設置的文件塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令查看到,該參數不能自定義修改)

是不是map數越多越好

答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

是不是保證每個map處理接近128m的文件塊,就高枕無憂了?

答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。

針對上面的問題3和4,我們需要采取兩種方式來解決:即減少map數和增加map數;

舉例

a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數

b)假設input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產生4個map數

即如果文件大於塊大小(128m),那么會拆分,如果小於塊大小,則把該文件當成一個塊

(4)怎樣決定reducer個數

Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率

不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:

參數1:hive.exec.reducers.bytes.per.reducer(默認為1G)

參數2 :hive.exec.reducers.max(默認為999)

計算reducer數的公式

N=min(參數2,總輸入數據量/參數1)

依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)

reduce個數並不是越多越好

同map一樣,啟動和初始化reduce也會消耗時間和資源;

另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題

什么情況下只有一個reduce

很多時候你會發現任務中不管數據量多大,不管你有沒有設置調整reduce個數的參數,任務中一直都只有一個reduce任務;

其實只有一個reduce任務的情況,除了數據量小於

hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:

a)沒有group by的匯總

b)用了Order by

(5)合並 MapReduce 操作

Multi-group by

Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便

FROM log

insert overwrite table test1 select log.id group by log.id

insert overwrite table test2 select log.name group by log.name

上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。

Bucket 與 Sampling

Bucket是指將數據以指定列的值為key進行hash,hash到指定數目的桶中。這樣就可以支持高效采樣了

Sampling可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而如果一個表已經對某一列制作了bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。

如下例所示就是采樣了test中32個桶中的第三個桶。

SELECT * FROM test 、、、TABLESAMPLE(BUCKET 3 OUT OF 32);

(6)JOIN 原則

在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊

原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率

Map Join

Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到

例如:

INSERT OVERWRITE TABLE phone_traffic

SELECT /*+ MAPJOIN(phone_location) */ l.phone,p.location,l.traffic from phone_location p join log l on (p.phone=l.phone)

相關的參數為:

hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.

hive.mapjoin.size.key = 10000

hive.mapjoin.cache.numrows = 10000

(7)Group By

Map 端部分聚合

並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果

基於 Hash

參數包括:

hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True

hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目

有數據傾斜的時候進行負載均衡

hive.groupby.skewindata = false

當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。

(8)合並小文件

文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的結果文件來消除這樣的影響:

hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True

hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False

hive.merge.size.per.task = 256*1000*1000 合並文件的大小


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM