Hive 語法和SQL的類似,但不完全一樣,這里給出一個官方文檔地址和轉載一個優秀的教程(要問我為什么不寫?要寫完實在是太多了。。。)
官方DDL地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
下面是轉載的,轉載至:https://www.cnblogs.com/qiaoyihang/p/6181630.html
建表規則如下:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
•CREATE TABLE 創建一個指定名字的表。如果相同名字的表已經存在,則拋出異常;用戶可以用 IF NOT EXIST 選項來忽略這個異常
•EXTERNAL 關鍵字可以讓用戶創建一個外部表,在建表的同時指定一個指向實際數據的路徑(LOCATION)
•LIKE 允許用戶復制現有的表結構,但是不復制數據
•COMMENT可以為表與字段增加描述
•ROW FORMAT DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
用戶在建表的時候可以自定義 SerDe 或者使用自帶的 SerDe。如果沒有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,將會使用自帶的 SerDe。在建表的時候,用戶還需要為表指定列,用戶在指定表的列的同時也會指定自定義的 SerDe,Hive 通過 SerDe 確定表的具體的列的數據。
•STORED AS
SEQUENCEFILE
| TEXTFILE
| RCFILE
| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
如果文件數據是純文本,可以使用 STORED AS TEXTFILE。如果數據需要壓縮,使用 STORED AS SEQUENCE 。
例:創建外部表
hive> CREATE EXTERNAL TABLE IF NOT EXISTS student2
> (sno INT,sname STRING,age INT,sex STRING)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE
> LOCATION '/user/external';
一些基本操作:
刪除:
hive> DROP TABLE test1;
修改表結構:
DESC student1;
hive> ALTER TABLE student1 ADD COLUMNS
> (address STRING,grade STRING);
修改表名:
hive> ALTER TABLE student1 RENAME TO student3;
創建和已知表相同結構的表:
hive> CREATE TABLE copy_student1 LIKE student1;
導入外部文件數據:
加載數據到student1表中
LOAD DATA LOCAL INPATH '/home/hadoop/data/student1.txt' INTO TABLE student1;
加載hdfs中的文件:
LOAD DATA INPATH '/user/hive/student1.txt' INTO TABLE copy_student1;
復制表數據:
INSERT OVERWRITE TABLE copy_student2 SELECT * FROM student1;
多表同時復制:
hive> FROM student1
> INSERT OVERWRITE TABLE copy_student3
> SELECT *
> INSERT OVERWRITE TABLE copy_student4
> SELECT *;
ORDER BY 會對輸入做全局排序,因此只有一個 Reduce(多個 Reduce 無法保證全局有序)會導致當輸入規模較大時,需要較長的計算時間。使用 ORDER BY 查詢的時候,為了優化查詢的速度,使用 hive.mapred.mode 屬性。
hive.mapred.mode = nonstrict;(default value/默認值)
hive.mapred.mode=strict;
與數據庫中 ORDER BY 的區別在於,在 hive.mapred.mode=strict 模式下必須指定limit ,否則執行會報錯。
hive> set hive.mapred.mode=strict;
hive> select * from group_test order by uid limit 5;
sort by 不受 hive.mapred.mode 的值是否為 strict 和 nostrict 的影響。sort by 的數據只能保證在同一個 Reduce 中的數據可以按指定字段排序。
使用 sort by 可以指定執行的 Reduce 個數(set mapred.reduce.tasks=< number>)這樣可以輸出更多的數據。對輸出的數據再執行歸並排序,即可以得到全部結果。
hive> set hive.mapred.mode=strict;
hive> select * from group_test sort by uid ;
DISTRIBUTE BY 排序查詢
-- 按照指定的字段對數據划分到不同的輸出 Reduce 文件中,操作如下。 hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test distribute by length(gender); --此方法根據 gender 的長度划分到不同的 Reduce 中,最終輸出到不同的文件中。length 是內建函數,也可以指定其它的函數或者使用自定義函數。 hive> insert overwrite local directory '/home/hadoop/djt/test' select * from group_test order by gender distribute by length(gender);
order by gender 與 distribute by length(gender) 不能共用。
索引操作
創建一個索引
hive> create index user_index on table user(id)
> as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
> with deferred rebuild
> IN TABLE user_index_table;
hive> alter index user_index on user rebuild;
hive> select * from user_index_table limit 5;
0 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [0]
1 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [352]
2 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [704]
3 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [1056]
4 hdfs://mycluster/user/hive/warehouse/table02/000000_0 [1408]
Time taken: 0.244 seconds, Fetched: 5 row(s)
索引案例
創建一個索引測試表 index_test,dt作為分區屬性,“ROW FORMAT DELIMITED FILEDS TERMINATED BY ','” 表示用逗號分割字符串,默認為‘\001’。
create table index_test(id INT,name STRING) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
創建一個臨時索引表 index_tmp。
hive> create table index_tmp(id INT,name STRING,dt STRING) ROW FORMAT DELIMITED FILEDS TERMINATED BY ',';
加載本地數據到 index_tmp 表中。
hive> load data local inpath '/home/hadoop/djt/test.txt' into table index_tmp
設置 Hive 的索引屬性來優化索引查詢,命令如下。
hive> set hive.exec.dynamic.partition.mode=nonstrict;----設置所有列為 dynamic partition
hive> set hive.exec.dynamic.partition=true;----使用動態分區
查詢index_tmp 表中的數據,插入 table_test 表中。
hive> insert overwrite table index_test partition(dt) select id,name,dt from index_tmp;
--使用 index_test 表,在屬性 id 上創建一個索引 index1_index_test 。
hive> create index index1_index_test on table index_test(id) as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' WITH DEFERERD REBUILD;
--填充索引數據。
hive> alter index index1_index_test on index_test rebuild;
--查看創建的索引。
hive> show index on index_test
-- 查看分區信息。
hive> show partitions index_test;
修改配置文件信息:
< property>
< name>hive.optimize.index.filter< /name>
< value>true< /value>
< /property>
< property>
< name>hive.optimize.index.groupby< /name>
< value>true< /value>
< /property>
< property>
< name>hive.optimize.index.filter.compact.minsize< /name>
< value>5120< /value>
< /property>
hive.optimize.index.filter 和 hive.optimize.index.groupby 參數默認是 false。使用索引的時候必須把這兩個參數開啟,才能起到作用。
hive.optimize.index.filter.compact.minsize 參數為輸入一個緊湊的索引將被自動采用最小尺寸、默認5368709120(以字節為單位)。
分區操作
Hive 的分區通過在創建表時啟動 PARTITION BY 實現,用來分區的維度並不是實際數據的某一列,具體分區的標志是由插入內容時給定的。當要查詢某一分區的內容時可以采用 WHERE 語句, 例如使用 “WHERE tablename.partition_key>a” 創建含分區的表。創建分區語法如下。
CREATE TABLE table_name(
...
)
PARTITION BY (dt STRING,country STRING)
1、 創建分區
Hive 中創建分區表沒有什么復雜的分區類型(范圍分區、列表分區、hash 分區,混合分區等)。分區列也不是表中的一個實際的字段,而是一個或者多個偽列。意思是說,在表的數據文件中實際並不保存分區列的信息與數據。
創建一個簡單的分區表。
hive> create table partition_test(member_id string,name string) partitioned by (stat_date string,province string) row format delimited fields terminated by ',';
--這個例子中創建了 stat_date 和 province 兩個字段作為分區列。通常情況下需要預先創建好分區,然后才能使用該分區。例如:
hive> alter table partition_test add partition (stat_date='2015-01-18',province='beijing');
--這樣就創建了一個分區。這時會看到 Hive 在HDFS 存儲中創建了一個相應的文件夾。
$ hadoop fs -ls /user/hive/warehouse/partition_test/stat_date=2015-01-18
/user/hive/warehouse/partition_test/stat_date=2015-01-18/province=beijing----顯示剛剛創建的分區
每一個分區都會有一個獨立的文件夾,在上面例子中,stat_date 是主層次,province 是 副層次。
--向分區表中插入數據
--使用一個輔助的非分區表 partition_test_input 准備向 partition_test 中插入數據,實現步驟如下。
insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu';
向多個分區插入數據,命令如下。
hive> from partition_test_input
insert overwrite table partition_test partition(stat_date='2015-01-18',province='jiangsu') select member_id,name from partition_test_input where stat_date='2015-01-18' and province='jiangsu'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='sichuan') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='sichuan'
insert overwrite table partition_test partition(stat_date='2015-01-28',province='beijing') select member_id,name from partition_test_input where stat_date='2015-01-28' and province='beijing';
動態分區的產生
按照上面的方法向分區表中插入數據,如果數據源很大,針對一個分區就要寫一個 insert ,非常麻煩。使用動態分區可以很好地解決上述問題。動態分區可以根據查詢得到的數據自動匹配到相應的分區中去。動態分區可以通過下面的設置來打開:
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
動態分區的使用方法很簡單,假設向 stat_date='2015-01-18' 這個分區下插入數據,至於 province 插到哪個子分區下讓數據庫自己來判斷。stat_date 叫做靜態分區列,province 叫做動態分區列。
hive> insert overwrite table partition_test partition(stat_date='2015-01-18',province)
select member_id,name province from partition_test_input where stat_date='2015-01-18';
注意,動態分區不允許主分區采用動態列而副分區采用靜態列,這樣將導致所有的主分區都要創建副分區靜態列所定義的分區。
幾個常用參數
hive.exec.max.dynamic.partitions.pernode:每一個 MapReduce Job 允許創建的分區的最大數量,如果超過這個數量就會報錯(默認值100)。
hive.exec.max.dynamic.partitions:一個 dml 語句允許創建的所有分區的最大數量(默認值100)。
hive.exec.max.created.files:所有 MapReduce Job 允許創建的文件的最大數量(默認值10000)。
盡量讓分區列的值相同的數據在同一個 MapReduce 中,這樣每一個 MapReduce 可以盡量少地產生新的文件夾,可以通過 DISTRIBUTE BY 將分區列值相同的數據放到一起,命令如下。
hive> insert overwrite table partition_test partition(stat_date,province)
select memeber_id,name,stat_date,province from partition_test_input distribute by stat_date,province;
桶操作
Hive 中 table 可以拆分成 Partition table 和 桶(BUCKET),table和partition可以通過‘CLUSTERED BY ’進一步分bucket,BUCKET 中的數據可以通過 SORT BY 排序。
BUCKET 主要作用如下。
1)數據 sampling;
2)提升某些查詢操作效率,例如 Map-Side Join。
需要特別主要的是,CLUSTERED BY 和 SORT BY 不會影響數據的導入,這意味着,用戶必須自己負責數據的導入,包括數據額分桶和排序。 'set hive.enforce.bucketing=true' 可以自動控制上一輪 Reduce 的數量從而適配 BUCKET 的個數,當然,用戶也可以自主設置 mapred.reduce.tasks 去適配 BUCKET 個數,推薦使用:
hive> set hive.enforce.bucketing=true;
1) 創建臨時表 student_tmp,並導入數據。
hive> desc student_tmp;
hive> select * from student_tmp;
2) 創建 student 表。
hive> create table student(id int,age int,name string)
partitioned by (stat_date string)
clustered by (id) sorted by(age) into 2 bucket
row format delimited fields terminated by ',';
3) 設置環境變量。
hive> set hive.enforce.bucketing=true;
4) 插入數據。
hive> from student_tmp
insert overwrite table student partition(stat_date='2015-01-19')
select id,age,name where stat_date='2015-01-18' sort by age;
5) 查看文件目錄。
$ hadoop fs -ls /usr/hive/warehouse/student/stat_date=2015-01-19/
6) 查看 sampling 數據。
hive> select * from student tablesample(bucket 1 out of 2 on id);
tablesample 是抽樣語句,語法如下。
tablesample(bucket x out of y)
y 必須是 table 中 BUCKET 總數的倍數或者因子。
Hive 復合類型
hive提供了復合數據類型:
1)Structs: structs內部的數據可以通過DOT(.)來存取。例如,表中一列c的類型為STRUCT{a INT; b INT},我們可以通過c.a來訪問域a。
2)Map(K-V對):訪問指定域可以通過["指定域名稱"]進行。例如,一個Map M包含了一個group-》gid的kv對,gid的值可以通過M['group']來獲取。
3)Array:array中的數據為相同類型。例如,假如array A中元素['a','b','c'],則A[1]的值為'b'
1、Struct使用
1) 建表
hive> create table student_test(id INT, info struct< name:STRING, age:INT>)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
> COLLECTION ITEMS TERMINATED BY ':';
'FIELDS TERMINATED BY' :字段與字段之間的分隔符。'COLLECTION ITEMS TERMINATED BY' :一個字段各個item的分隔符。
2) 導入數據
$ cat test5.txt
1,zhou:30
2,yan:30
3,chen:20
4,li:80
hive> LOAD DATA LOCAL INPATH '/home/hadoop/djt/test5.txt' INTO TABLE student_test;
3) 查詢數據
hive> select info.age from student_test;
2、Array使用
1) 建表
hive> create table class_test(name string, student_id_list array< INT>)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY ','
> COLLECTION ITEMS TERMINATED BY ':';
2) 導入數據
$ cat test6.txt
034,1:2:3:4
035,5:6
036,7:8:9:10
hive> LOAD DATA LOCAL INPATH '/home/work/data/test6.txt' INTO TABLE class_test ;
3) 查詢
hive> select student_id_list[3] from class_test;
3、Map使用
1) 建表
hive> create table employee(id string, perf map< string, int>)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> COLLECTION ITEMS TERMINATED BY ','
> MAP KEYS TERMINATED BY ':';
‘MAP KEYS TERMINATED BY’ :key value分隔符
2) 導入數據
$ cat test7.txt
1 job:80,team:60,person:70
2 job:60,team:80
3 job:90,team:70,person:100
hive> LOAD DATA LOCAL INPATH '/home/work/data/test7.txt' INTO TABLE employee;
3) 查詢
hive> select perf['person'] from employee;
Hive 的 JOIN 用法
hive只支持等連接,外連接,左半連接。hive不支持非相等的join條件(通過其他方式實現,如left outer join),因為它很難在map/reduce job實現這樣的條件。而且,hive可以join兩個以上的表。
1、等連接 只有等連接才允許 hive> SELECT a.* FROM a JOIN b ON (a.id = b.id) hive> SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department) 2、多表連接 同個查詢,可以join兩個以上的表 hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 3、join的緩存和任務轉換 hive轉換多表join時,如果每個表在join字句中,使用的都是同一個列,只會轉換為一個單獨的map/reduce。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
被轉換為兩個map/reduce任務,因為b的key1列在第一個join條件使用,而b表的key2列在第二個join條件使用。第一個map/reduce任務join a和b。第二個任務是第一個任務的結果join c。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2) 在join的每個map/reduce階段,序列中的最后一個表,當其他被緩存時,它會流到reducers。所以,reducers需要緩存join關鍵字的特定值組成的行,通過組織最大的表出現在序列的最后,有助於減少reducers的內存。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三個表,在同一個獨立的map/reduce任務做join。a和b的key對應的特定值組成的行,會緩存在reducers的內存。然后reducers接受c的每一行,和緩存的每一行做join計算。
hive> SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
這里有兩個map/reduce任務在join計算被調用。第一個是a和b做join,然后reducers緩存a的值,另一邊,從流接收b的值。第二個階段,reducers緩存第一個join的結果,另一邊從流接收c的值。 在join的每個map/reduce階段,通過關鍵字,可以指定哪個表從流接收。
hive> SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三個表的連接,會轉換為一個map/reduce任務,reducer會把b和c的key的特定值緩存在內存里,然后從流接收a的每一行,和緩存的行做join。 4、join的結果 LEFT,RIGHT,FULL OUTER連接存在是為了提供ON語句在沒有匹配時的更多控制。例如,這個查詢: hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) 將會返回a的每一行。如果b.key等於a.key,輸出將是a.val,b.val,如果a沒有和b.key匹配,輸出的行將是 a.val,NULL。如果b的行沒有和a.key匹配上,將被拋棄。語法"FROM a LEFT OUTER JOIN b"必須寫在一行,為了理解它如何工作——這個查詢,a是b的左邊,a的所有行會被保持;RIGHT OUTER JOIN將保持b的所有行, FULL OUTER JOIN將會保存a和b的所有行。OUTER JOIN語義應該符合標准的SQL規范。 5、join的過濾 Joins發生在where字句前,所以,如果要限制join的輸出,需要寫在where字句,否則寫在JOIN字句。現在討論的一個混亂的大點,就是分區表 hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key) WHERE a.ds='2009-07-07' AND b.ds='2009-07-07' 將會連接a和b,產生a.val和b.val的列表。WHERE字句,也可以引用join的輸出列,然后過濾他們。 但是,無論何時JOIN的行找到a的key,但是找不到b的key時,b的所有列會置成NULL,包括ds列。這就是說,將過濾join輸出的所有行,包括沒有合法的b.key的行。然后你會在LEFT OUTER的要求撲空。 也就是說,如果你在WHERE字句引用b的任何列,LEFT OUTER的部分join結果是不相關的。所以,當外連接時,使用這個語句 hive> SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds='2009-07-07' AND a.ds='2009-07-07'; join的輸出會預先過濾,然后你不用對有a.key而沒有b.key的行做過濾。RIGHT和FULL join也是一樣的邏輯。 6、join的順序 join是不可替換的,連接是從左到右,不管是LEFT或RIGHT join。 hive> SELECT a.val1, a.val2, b.val, c.val FROM a JOIN b ON (a.key = b.key) LEFT OUTER JOIN c ON (a.key = c.key) 首先,連接a和b,扔掉a和b中沒有匹配的key的行。結果表再連接c。這提供了直觀的結果,如果有一個鍵都存在於A和C,但不是B:完整行(包括 a.val1,a.val2,a.key)會在"a jOIN b"步驟,被丟棄,因為它不在b中。結果沒有a.key,所以當它和c做LEFT OUTER JOIN,c.val也無法做到,因為沒有c.key匹配a.key(因為a的行都被移除了)。類似的,RIGHT OUTER JOIN(替換為LEFT),我們最終會更怪的效果,NULL, NULL, NULL, c.val。因為盡管指定了join key是a.key=c.key,我們已經在第一個JOIN丟棄了不匹配的a的所有行。 為了達到更直觀的效果,相反,我們應該從 hive> FROM c LEFT OUTER JOIN a ON (c.key = a.key) LEFT OUTER JOIN b ON (c.key = b.key). LEFT SEMI JOIN實現了相關的IN / EXISTS的子查詢語義的有效途徑。由於Hive目前不支持IN / EXISTS的子查詢,所以你可以用 LEFT SEMI JOIN 重寫你的子查詢語句。LEFT SEMI JOIN 的限制是, JOIN 子句中右邊的表只能在 ON 子句中設置過濾條件,在 WHERE 子句、SELECT 子句或其他地方過濾都不行。 hive> SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B); 可以重寫為 hive> SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key) 7、map 端 join 但如果所有被連接的表是小表,join可以被轉換為只有一個map任務。查詢是 hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key 不需要reducer。對於每一個mapper,A和B已經被完全讀出。限制是a FULL/RIGHT OUTER JOIN b不能使用。 如果表在join的列已經分桶了,其中一張表的桶的數量,是另一個表的桶的數量的整倍,那么兩者可以做桶的連接。如果A有4個桶,表B有4個桶,下面的連接: hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key 只能在mapper工作。為了為A的每個mapper完整抽取B。對於上面的查詢,mapper處理A的桶1,只會抽取B的桶1,這不是默認行為,要使用以下參數: hive> set hive.optimize.bucketmapjoin = true; 如果表在join的列經過排序,分桶,而且他們有相同數量的桶,可以使用排序-合並 join。每個mapper,相關的桶會做連接。如果A和B有4個桶 hive> SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM A a join B b on a.key = b.key 只能在mapper使用。使用A的桶的mapper,也會遍歷B相關的桶。這個不是默認行為,需要配置以下參數: hive> set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; hive> set hive.optimize.bucketmapjoin = true; hive> set hive.optimize.bucketmapjoin.sortedmerge = true;
Hive 內置操作符與函數
1)字符串長度函數:length
2)字符串反轉函數:reverse
3)字符串連接函數:concat
4)帶分隔符字符串連接函數:concat_ws
5)字符串截取函數:substr,substring
7)字符串轉大寫函數:upper,ucase
8)字符串轉小寫函數:lower,lcase
9)去空格函數:trim
10)左邊去空格函數:ltrim
11)右邊去空格函數:rtrim
集合統計函數
1) 個數統計函數 count。
2) 總和統計函數 sum。
3) 平均值統計函數avg。
4) 最小值統計函數 min。統計結果集中 col 字段的最小值。
5) 最大值統計函數 max。統計結果集中 col 字段的最大值。
復合類型操作
1) Map 類型構建。根據輸入的 Key-Value 對構建 Map 類型。
語法:map(key1, value1, key2, value2,...)
舉例:
hive>create table map_test as select map('100','jay','200','liu') from student;
hive>describe map_test;
hive>select map_test from student;
2) Struct 類型構建。根據輸入的參數構建結構體 Struct 類型。
語法:struct(val1, val2, val3, ...)
舉例:
hive>create table struct_test as select struct('jay','liu','gang') from student;
hive>describe struct_test;
hive>select struct_test from student;
3) Array 類型構建。根據輸入的參數構建數組 Array 類型。
語法:array(val1,val2, ...)
舉例:
hive> create table array_test as select array('jay','liu','gang') from student;
hive> describe array_test;
hive> select array_test from array_test;
用戶自定義函數 UDF
UDF(User Defined Function,用戶自定義函數) 對數據進行處理。UDF 函數可以直接應用於 select 語句,對查詢結構做格式化處理后,再輸出內容。
Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。Hive中有3種UDF:
1)UDF:操作單個數據行,產生單個數據行。
2)UDAF:操作多個數據行,產生一個數據行。
3)UDTF:操作一個數據行,產生多個數據行一個表作為輸出。
用戶構建的UDF使用過程如下:
第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。
第二步:將寫好的類打包為jar。如hivefirst.jar。
第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar 注冊該jar文件。
第四步:為該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這里注意UDF只是為這個Hive會話臨時定義的。
第五步:在select中使用mylength()。
自定義UDF
package whut;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
//UDF是作用於單個數據行,產生一個數據行
//用戶必須要繼承UDF,且必須至少實現一個evalute方法,該方法並不在UDF中
//但是Hive會檢查用戶的UDF是否擁有一個evalute方法
public class Strip extends UDF{
private Text result=new Text();
//自定義方法
public Text evaluate(Text str)
{
if(str==null)
return null;
result.set(StringUtils.strip(str.toString()));
return result;
}
public Text evaluate(Text str,String stripChars)
{
if(str==null)
return null;
result.set(StringUtils.strip(str.toString(),stripChars));
return result;
}
}
注意事項:
1、一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;
2、一個UDF必須要包含有evaluate()方法,但是該方法並不存在於UDF中。evaluate的參數個數以及類型都是用戶自己定義的。在使用的時候,Hive會調用UDF的evaluate()方法。
自定義UDAF找到最大值
package whut;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是輸入多個數據行,產生一個數據行
//用戶自定義的UDAF必須是繼承了UDAF,且內部包含多個實現了exec的靜態類
public class MaxiNumber extends UDAF{
public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
//最終結果
private IntWritable result;
//負責初始化計算函數並設置它的內部狀態,result是存放最終結果的
@Override
public void init() {
result=null;
}
//每次對一個新值進行聚集計算都會調用iterate方法
public boolean iterate(IntWritable value)
{
if(value==null)
return false;
if(result==null)
result=new IntWritable(value.get());
else
result.set(Math.max(result.get(), value.get()));
return true;
}
//Hive需要部分聚集結果的時候會調用該方法
//會返回一個封裝了聚集計算當前狀態的對象
public IntWritable terminatePartial()
{
return result;
}
//合並兩個部分聚集值會調用這個方法
public boolean merge(IntWritable other)
{
return iterate(other);
}
//Hive需要最終聚集結果時候會調用該方法
public IntWritable terminate()
{
return result;
}
}
}
注意事項:
1、用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF。
2、用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。
3、一個計算函數必須實現的5個方法的具體含義如下:
init():主要是負責初始化計算函數並且重設其內部狀態,一般就是重設其內部字段。一般在靜態類中定義一個內部字段來存放最終的結果。
iterate():每一次對一個新值進行聚集計算時候都會調用該方法,計算函數會根據聚集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。
terminatePartial():Hive需要部分聚集結果的時候會調用該方法,必須要返回一個封裝了聚集計算當前狀態的對象。
merge():Hive進行合並一個部分聚集和另一個部分聚集的時候會調用該方法。
terminate():Hive最終聚集結果的時候就會調用該方法。計算函數需要把狀態作為一個值返回給用戶。
4、部分聚集結果的數據類型和最終結果的數據類型可以不同。
Hive 的權限控制
Hive從0.10可以通過元數據控制權限。但是Hive的權限控制並不是完全安全的。基本的授權方案的目的是防止用戶不小心做了不合適的事情。
為了使用Hive的授權機制,有兩個參數必須在hive-site.xml中設置:
< property>
< name>hive.security.authorization.enabled< /name>
< value>true< /value>
< description>enable or disable the hive client authorization< /description>
< /property>
< property>
< name>hive.security.authorization.createtable.owner.grants< /name>
< value>ALL< /value>
< description>the privileges automatically granted to the owner whenever a table gets created. An example like "select,drop" will grant select and drop privilege to the owner of the table< /description>
< /property>
hive.security.authorization.enabled //參數是開啟權限驗證,默認為 false。
hive.security.authorization.createtable.owner.grants //參數是指表的創建者對表擁有所有權限。
角色的創建和刪除
Hive 中的角色定義與關系型數據庫中角色的定義類似,它是一種機制,給予那些沒有適當權限的用戶分配一定的權限。
1) 創建角色。
語法:hive> create role role_name;
示例:hive> create role role_tes1;
2) 刪除角色。
語法:drop role role_name
示例:drop role role_test1;
角色的授權和撤銷
1) 把 role_test1 角色授權給 xiaojiang 用戶,命令如下。
hive> grant role role_test1 to user xiaojiang;
2) 查看 xiaojiang 用戶被授權的角色,命令如下。
show role grant user xiaojiang;
3) 取消 xiaojiang 用戶的 role_test1 角色,命令如下。
hive> revoke role role_test1 from user xiaojiang;
Hive 支持的權限控制。
1) 把 select 權限授權給 xiaojiang 用戶,命令如下。
hive> grant select on database default to user xiaojiang;
2) 查看 xiaojiang 被授予那些操作權限,命令如下。
hive> show grant user xiaojiang on database default;
3) 收回 xiaojiang 的 select 權限,操作如下。
hive> revoke select on database default from user xiaojiang;
4) 查看 xiaojiang 用戶擁有哪些權限,命令如下。
hive> show grant user xiaojiang on database default;
超級管理權限
HIVE本身有權限管理功能,需要通過配置開啟。
< property>
< name>hive.metastore.authorization.storage.checks< /name>
< value>true< /value>
< /property>
< property>
< name>hive.metastore.execute.setugi< /name>
< value>false< /value>
< /property>
< property>
< name>hive.security.authorization.enabled< /name>
< value>true< /value>
< /property>
< property>
< name>hive.security.authorization.createtable.owner.grants< /name>
< value>ALL< /value>
< /property>
其中hive.security.authorization.createtable.owner.grants設置成ALL表示用戶對自己創建的表是有所有權限的(這樣是比較合理地)。
開啟權限控制有Hive的權限功能還有一個需要完善的地方,那就是“超級管理員”。 Hive中沒有超級管理員,任何用戶都可以進行Grant/Revoke操作,為了完善“超級管理員”,必須添加hive.semantic.analyzer.hook配置,並實現自己的權限控制類。
編寫權限控制類,代碼如下所示。
package com.xxx.hive;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
/**
* 設置Hive超級管理員 *
* @author
* @version $Id: AuthHook.java,v 0.1 2013-6-13 下午3:32:12 yinxiu Exp $
*/
public class AuthHook extends AbstractSemanticAnalyzerHook {
private static String admin = "admin";
@Override 27 public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, 28 ASTNode ast) throws SemanticException {
switch (ast.getToken().getType()) {
case HiveParser.TOK_CREATEDATABASE:
case HiveParser.TOK_DROPDATABASE:
case HiveParser.TOK_CREATEROLE:
case HiveParser.TOK_DROPROLE:
case HiveParser.TOK_GRANT:
case HiveParser.TOK_REVOKE:
case HiveParser.TOK_GRANT_ROLE:
case HiveParser.TOK_REVOKE_ROLE:
String userName = null;
if (SessionState.get() != null && SessionState.get().getAuthenticator() != null) {
userName = SessionState.get().getAuthenticator().getUserName();
}
if (!admin.equalsIgnoreCase(userName)) {
throw new SemanticException(userName + " can't use ADMIN options, except " + admin + ".");
}
break;
default:
break;
}
return ast;
}
}
添加了控制類之后還必須添加下面的配置:
< property>
< name>hive.semantic.analyzer.hook< /name>
< value>com.xxx.AuthHook< /value>
< /property>
若有使用hiveserver,hiveserver必須重啟。
至此,只有admin用戶可以進行Grant/Revoke操作。
權限操作示例:
grant select on database default to user xiaojiang;
revoke all on database default from user xiaojiang;
show grant user xiaojiang on database default;
Hive與JDBC示例
在使用 JDBC 開發 Hive 程序時, 必須首先開啟 Hive 的遠程服務接口。使用下面命令進行開啟:
hive -service hiveserver & //Hive低版本提供的服務是:hiveserver
hive --service hiveserver2 & //Hive0.11.0以上版本提供了的服務是:hiveserver2
本課程我們使用的hive1.0版本,故我們使用hiveserver2服務,下面我使用 Java 代碼通過JDBC連接Hiveserver。
1) 測試數據
本地目錄/home/hadoop/下的djt.txt文件內容(每行數據之間用tab鍵隔開)如下所示:
1 dajiangtai
2 hadoop
3 hive
4 hbase
5 spark
2) 程序代碼
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
public class Hive {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";//hive驅動名稱
private static String url = "jdbc:hive2://djt11:10000/default";//連接hive2服務的連接地址,Hive0.11.0以上版本提供了一個全新的服務:HiveServer2
private static String user = "hadoop";//對HDFS有操作權限的用戶
private static String password = "";//在非安全模式下,指定一個用戶運行查詢,忽略密碼
private static String sql = "";
private static ResultSet res;
public static void main(String[] args) {
try {
Class.forName(driverName);//加載HiveServer2驅動程序
Connection conn = DriverManager.getConnection(url, user, password);//根據URL連接指定的數據庫
Statement stmt = conn.createStatement();
//創建的表名
String tableName = "testHiveDriverTable";
/** 第一步:表存在就先刪除 **/
sql = "drop table " + tableName;
stmt.execute(sql);
/** 第二步:表不存在就創建 **/
sql = "create table " + tableName + " (key int, value string) row format delimited fields terminated by '\t' STORED AS TEXTFILE";
stmt.execute(sql);
// 執行“show tables”操作
sql = "show tables '" + tableName + "'";
res = stmt.executeQuery(sql);
if (res.next()) {
System.out.println(res.getString(1));
}
// 執行“describe table”操作
sql = "describe " + tableName;
res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(res.getString(1) + "\t" + res.getString(2));
}
// 執行“load data into table”操作
String filepath = "/home/hadoop/djt.txt";//hive服務所在節點的本地文件路徑
sql = "load data local inpath '" + filepath + "' into table " + tableName;
stmt.execute(sql);
// 執行“select * query”操作
sql = "select * from " + tableName;
res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(res.getInt(1) + "\t" + res.getString(2));
}
// 執行“regular hive query”操作,此查詢會轉換為MapReduce程序來處理
sql = "select count(*) from " + tableName;
res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println(res.getString(1));
}
conn.close();
conn = null;
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
}
3) 運行結果(右擊-->Run as-->Run on Hadoop)
執行“show tables”運行結果:
testhivedrivertable
執行“describe table”運行結果:
key int
value string
執行“select * query”運行結果:
1 dajiangtai
2 hadoop
3 hive
4 hbase
5 spark
執行“regular hive query”運行結果:
5
hive性能調優
hive性能調優
(一)Hadoop 計算框架的特性
什么是數據傾斜
由於數據的不均衡原因,導致數據分布不均勻,造成數據大量的集中到一點,造成數據熱點
Hadoop框架的特性
不怕數據大,怕數據傾斜
jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的
sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端的匯總合並優化,使數據傾斜不成問題
count(distinct ),在數據量大的情況下,效率較低,因為count(distinct)是按group by 字段分組,按distinct字段排序,一般這種分布方式是很傾斜的
(二)優化的常用手段
解決數據傾斜問題
減少job數
設置合理的map reduce的task數,能有效提升性能。
了解數據分布,自己動手解決數據傾斜問題是個不錯的選擇
數據量較大的情況下,慎用count(distinct)。
對小文件進行合並,是行至有效的提高調度效率的方法。
優化時把握整體,單個作業最優不如整體最優。
(三)Hive的數據類型方面的優化
優化原則
按照一定規則分區(例如根據日期)。通過分區,查詢的時候指定分區,會大大減少在無用數據上的掃描, 同時也非常方便數據清理。
合理的設置Buckets。在一些大數據join的情況下,map join有時候會內存不夠。如果使用Bucket Map Join的話,可以只把其中的一個bucket放到內存中,內存中原來放不下的內存表就變得可以放下。這需要使用buckets的鍵進行join的條件連結,並且需要如下設置
set hive.optimize.bucketmapjoin = true
(四)Hive的操作方面的優化
(1)全排序
Hive的排序關鍵字是SORT BY,它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序
(2)怎樣做笛卡爾積
當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積
MapJoin是的解決辦法
MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存
MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)
其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里
在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數
(3)控制Hive的Map數
通常情況下,作業會通過input的目錄產生一個或者多個map任務
主要的決定因素有: input的文件總個數,input的文件大小,集群設置的文件塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令查看到,該參數不能自定義修改)
是不是map數越多越好
答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。
是不是保證每個map處理接近128m的文件塊,就高枕無憂了?
答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。
針對上面的問題3和4,我們需要采取兩種方式來解決:即減少map數和增加map數;
舉例
a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數
b)假設input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產生4個map數
即如果文件大於塊大小(128m),那么會拆分,如果小於塊大小,則把該文件當成一個塊
(4)怎樣決定reducer個數
Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率
不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:
參數1:hive.exec.reducers.bytes.per.reducer(默認為1G)
參數2 :hive.exec.reducers.max(默認為999)
計算reducer數的公式
N=min(參數2,總輸入數據量/參數1)
依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)
reduce個數並不是越多越好
同map一樣,啟動和初始化reduce也會消耗時間和資源;
另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題
什么情況下只有一個reduce
很多時候你會發現任務中不管數據量多大,不管你有沒有設置調整reduce個數的參數,任務中一直都只有一個reduce任務;
其實只有一個reduce任務的情況,除了數據量小於
hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:
a)沒有group by的匯總
b)用了Order by
(5)合並 MapReduce 操作
Multi-group by
Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便
FROM log
insert overwrite table test1 select log.id group by log.id
insert overwrite table test2 select log.name group by log.name
上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。
Bucket 與 Sampling
Bucket是指將數據以指定列的值為key進行hash,hash到指定數目的桶中。這樣就可以支持高效采樣了
Sampling可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而如果一個表已經對某一列制作了bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。
如下例所示就是采樣了test中32個桶中的第三個桶。
SELECT * FROM test 、、、TABLESAMPLE(BUCKET 3 OUT OF 32);
(6)JOIN 原則
在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊
原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率
Map Join
Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到
例如:
INSERT OVERWRITE TABLE phone_traffic
SELECT /*+ MAPJOIN(phone_location) */ l.phone,p.location,l.traffic from phone_location p join log l on (p.phone=l.phone)
相關的參數為:
hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000
(7)Group By
Map 端部分聚合
並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果
基於 Hash
參數包括:
hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True
hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目
有數據傾斜的時候進行負載均衡
hive.groupby.skewindata = false
當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
(8)合並小文件
文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的結果文件來消除這樣的影響:
hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True
hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False
hive.merge.size.per.task = 256*1000*1000 合並文件的大小