1. Hive基本概念
1.1 Hive簡介
1.1.1 什么是Hive
Hive是基於Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張數據庫表,並提供類SQL查詢功能。
1.1.2 為什么使用Hive
- 面臨的問題
人員學習成本太高
項目周期要求太短
我只是需要一個簡單的環境
MapReduce如何搞定
復雜查詢好難
Join如何實現
- 為什么要使用Hive
操作接口采用類SQL語法,提供快速開發的能力。
避免了去寫MapReduce,減少開發人員的學習成本。
擴展功能很方便。
1.1.3 Hive的特點
- 可擴展
Hive可以自由的擴展集群的規模,一般情況下不需要重啟服務。
- 延展性
Hive支持用戶自定義函數,用戶可以根據自己的需求來實現自己的函數。
- 容錯
良好的容錯性,節點出現問題SQL仍可完成執行。
1.2 Hive架構
1.2.1 架構圖
1.2.2 基本組成
- 用戶接口:包括 CLI、JDBC/ODBC、WebGUI。
- 元數據存儲:通常是存儲在關系數據庫如 mysql , derby中。
- 解釋器、編譯器、優化器、執行器。
- hive用 HDFS 進行存儲,利用 MapReduce 進行計算。
1.2.3 各組件的基本功能
- 用戶接口主要由三個:CLI、JDBC/ODBC和WebGUI。其中,CLI為shell命令行;JDBC/ODBC是Hive的JAVA實現,與傳統數據庫JDBC類似;WebGUI是通過瀏覽器訪問Hive。
- 元數據存儲:Hive 將元數據存儲在數據庫中,目前只支持 mysql、derby,下一版本會支持更多的數據庫。Hive 中的元數據包括表的名字,表的列和分區及其屬性,表的屬性(是否為外部表等),表的數據所在目錄等。
- 解釋器、編譯器、優化器完成 HQL 查詢語句從詞法分析、語法分析、編譯、優化以及查詢計划的生成。生成的查詢計划存儲在 HDFS 中,並在隨后有 MapReduce 調用執行。
- Hive的數據存儲在 HDFS 中,大部分的查詢由MapReduce 完成。
1.3 Hive與Hadoop的關系
1.4 Hive與傳統數據庫對比
- 查詢語言。由於 SQL 被廣泛的應用在數據倉庫中,因此,專門針對 Hive 的特性設計了類 SQL 的查詢語言 HQL。熟悉 SQL 開發的開發者可以很方便的使用 Hive 進行開發。
- 數據存儲位置。Hive 是建立在 Hadoop 之上的,所有 Hive 的數據都是存儲在 HDFS 中的。而數據庫則可以將數據保存在塊設備或者本地文件系統中。
- 數據格式。Hive 中沒有定義專門的數據格式,數據格式可以由用戶指定,用戶定義數據格式需要指定三個屬性:列分隔符(通常為空格、”\t”、”\x001″)、行分隔符(”\n”)以及讀取文件數據的方法(Hive 中默認有三個文件格式 TextFile,SequenceFile 以及 RCFile)。由於在加載數據的過程中,不需要從用戶數據格式到 Hive 定義的數據格式的轉換,因此,Hive 在加載的過程中不會對數據本身進行任何修改,而只是將數據內容復制或者移動到相應的 HDFS 目錄中。而在數據庫中,不同的數據庫有不同的存儲引擎,定義了自己的數據格式。所有數據都會按照一定的組織存儲,因此,數據庫加載數據的過程會比較耗時。
- 數據更新。由於 Hive 是針對數據倉庫應用設計的,而數據倉庫的內容是讀多寫少的。因此,Hive 中不支持對數據的改寫和添加,所有的數據都是在加載的時候中確定好的。而數據庫中的數據通常是需要經常進行修改的,因此可以使用 INSERT INTO ... VALUES 添加數據,使用 UPDATE ... SET 修改數據。
- 索引。之前已經說過,Hive 在加載數據的過程中不會對數據進行任何處理,甚至不會對數據進行掃描,因此也沒有對數據中的某些 Key 建立索引。Hive 要訪問數據中滿足條件的特定值時,需要暴力掃描整個數據,因此訪問延遲較高。由於 MapReduce 的引入, Hive 可以並行訪問數據,因此即使沒有索引,對於大數據量的訪問,Hive 仍然可以體現出優勢。數據庫中,通常會針對一個或者幾個列建立索引,因此對於少量的特定條件的數據的訪問,數據庫可以有很高的效率,較低的延遲。由於數據的訪問延遲較高,決定了 Hive 不適合在線數據查詢。
- 執行。Hive 中大多數查詢的執行是通過 Hadoop 提供的 MapReduce 來實現的,而數據庫通常有自己的執行引擎。
- 執行延遲。之前提到,Hive 在查詢數據的時候,由於沒有索引,需要掃描整個表,因此延遲較高。另外一個導致 Hive 執行延遲高的因素是 MapReduce 框架。由於 MapReduce 本身具有較高的延遲,因此在利用 MapReduce 執行 Hive 查詢時,也會有較高的延遲。相對的,數據庫的執行延遲較低。當然,這個低是有條件的,即數據規模較小,當數據規模大到超過數據庫的處理能力的時候,Hive 的並行計算顯然能體現出優勢。
- 可擴展性。由於 Hive 是建立在 Hadoop 之上的,因此 Hive 的可擴展性是和 Hadoop 的可擴展性是一致的(世界上最大的 Hadoop 集群在 Yahoo!,2009年的規模在 4000 台節點左右)。而數據庫由於 ACID 語義的嚴格限制,擴展行非常有限。目前最先進的並行數據庫 Oracle 在理論上的擴展能力也只有 100 台左右。
- 數據規模。由於 Hive 建立在集群上並可以利用 MapReduce 進行並行計算,因此可以支持很大規模的數據;對應的,數據庫可以支持的數據規模較小。
1.5 Hive的數據存儲
首先,Hive沒有專門的數據存儲格式,也沒有為數據建立索引,用戶可以非常自由的組織 Hive 中的表,只需要在創建表的時候告訴 Hive 數據中的列分隔符和行分隔符,Hive 就可以解析數據。其次,Hive 中所有的數據都存儲在 HDFS 中,Hive 中包含以下數據模型:Table,External Table,Partition,Bucket。
- Hive 中的 Table 和數據庫中的 Table 在概念上是類似的,每一個 Table 在 Hive 中都有一個相應的目錄存儲數據。例如,一個表 xiaojun,它在 HDFS 中的路徑為:/ warehouse /xiaojun,其中,wh 是在 hive-site.xml 中由 ${hive.metastore.warehouse.dir} 指定的數據倉庫的目錄,所有的 Table 數據(不包括 External Table)都保存在這個目錄中。
- Partition 對應於數據庫中的 Partition 列的密集索引,但是 Hive 中 Partition 的組織方式和數據庫中的很不相同。在 Hive 中,表中的一個 Partition 對應於表下的一個目錄,所有的 Partition 的數據都存儲在對應的目錄中。例如:xiaojun 表中包含 dt 和 city 兩個 Partition,則對應於 dt = 20100801, ctry = US 的 HDFS 子目錄為:/ warehouse /xiaojun/dt=20100801/ctry=US;對應於 dt = 20100801, ctry = CA 的 HDFS 子目錄為;/ warehouse /xiaojun/dt=20100801/ctry=CA
- Buckets 對指定列計算 hash,根據 hash 值切分數據,目的是為了並行,每一個 Bucket 對應一個文件。將 user 列分散至 32 個 bucket,首先對 user 列的值計算 hash,對應 hash 值為 0 的 HDFS 目錄為:/ warehouse /xiaojun/dt =20100801/ctry=US/part-00000;hash 值為 20 的 HDFS 目錄為:/ warehouse /xiaojun/dt =20100801/ctry=US/part-00020
- External Table 指向已經在 HDFS 中存在的數據,可以創建 Partition。它和 Table 在元數據的組織上是相同的,而實際數據的存儲則有較大的差異。
- Table 的創建過程和數據加載過程(這兩個過程可以在同一個語句中完成),在加載數據的過程中,實際數據會被移動到數據倉庫目錄中;之后對數據對訪問將會直接在數據倉庫目錄中完成。刪除表時,表中的數據和元數據將會被同時刪除。
- External Table 只有一個過程,加載數據和創建表同時完成(CREATE EXTERNAL TABLE ……LOCATION),實際數據是存儲在 LOCATION 后面指定的 HDFS 路徑中,並不會移動到數據倉庫目錄中。當刪除一個 External Table 時,僅刪除元數據。
2. Hive基本操作
2.1 DDL操作
2.1.1 創建表
建表語法
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
說明:
1、 CREATE TABLE 創建一個指定名字的表。如果相同名字的表已經存在,則拋出異常;用戶可以用 IF NOT EXISTS 選項來忽略這個異常。
2、 EXTERNAL關鍵字可以讓用戶創建一個外部表,在建表的同時指定一個指向實際數據的路徑(LOCATION),Hive 創建內部表時,會將數據移動到數據倉庫指向的路徑;若創建外部表,僅記錄數據所在的路徑,不對數據的位置做任何改變。在刪除表的時候,內部表的元數據和數據會被一起刪除,而外部表只刪除元數據,不刪除數據。
3、 LIKE 允許用戶復制現有的表結構,但是不復制數據。
4、 ROW FORMAT
DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
用戶在建表的時候可以自定義 SerDe 或者使用自帶的 SerDe。如果沒有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,將會使用自帶的 SerDe。在建表的時候,用戶還需要為表指定列,用戶在指定表的列的同時也會指定自定義的 SerDe,Hive 通過 SerDe 確定表的具體的列的數據。
5、 STORED AS
SEQUENCEFILE|TEXTFILE|RCFILE
如果文件數據是純文本,可以使用 STORED AS TEXTFILE。如果數據需要壓縮,使用 STORED AS SEQUENCE。
具體實例
1、 創建內部表mytable。
2、 創建外部表pageview。
3、 創建分區表invites。
4、 創建帶桶的表student。
2.1.2 修改表
增加/刪除分區
ü 語法結構
ALTER TABLE table_name ADD [IF NOT EXISTS] partition_spec [ LOCATION 'location1' ] partition_spec [ LOCATION 'location2' ] ...
partition_spec:
: PARTITION (partition_col = partition_col_value, partition_col = partiton_col_value, ...)
ALTER TABLE table_name DROP partition_spec, partition_spec,...
ü 具體實例
重命名表
ü 語法結構
ALTER TABLE table_name RENAME TO new_table_name
ü 具體實例
增加/更新列
ü 語法結構
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]
ü 具體實例
2.1.3 顯示命令
show tables
show databases
show partitions
show functions
2.2 DML操作
2.2.1 Load
語法結構
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO
TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
說明:
1、 Load 操作只是單純的復制/移動操作,將數據文件移動到 Hive 表對應的位置。
2、 filepath:
相對路徑,例如:project/data1
絕對路徑,例如:/user/hive/project/data1
包含模式的完整 URI,列如:
hdfs://namenode:9000/user/hive/project/data1
3、 LOCAL關鍵字
如果指定了 LOCAL,那么:
load 命令會去查找本地文件系統中的 filepath。如果發現是相對路徑,則路徑會被解釋為相對於當前用戶的當前路徑。
load 命令會將 filepath中的文件復制到目標文件系統中。目標文件系統由表的位置屬性決定。被復制的數據文件移動到表的數據對應的位置。
如果沒有指定 LOCAL 關鍵字,如果 filepath 指向的是一個完整的 URI,hive 會直接使用這個 URI。 否則:如果沒有指定 schema 或者 authority,Hive 會使用在 hadoop 配置文件中定義的 schema 和 authority,fs.default.name 指定了 Namenode 的 URI。
如果路徑不是絕對的,Hive 相對於/user/進行解釋。
Hive 會將 filepath 中指定的文件內容移動到 table (或者 partition)所指定的路徑中。
4、 OVERWRITE 關鍵字
如果使用了 OVERWRITE 關鍵字,則目標表(或者分區)中的內容(如果有)會被刪除,然后再將 filepath 指向的文件/目錄中的內容添加到表/分區中。
如果目標表(分區)已經有一個文件,並且文件名和 filepath 中的文件名沖突,那么現有的文件會被新文件所替代。
具體實例
1、 加載相對路徑數據。
2、 加載絕對路徑數據。
3、 加載包含模式數據。
4、 OVERWRITE關鍵字使用。
2.2.2 Insert
將查詢結果插入Hive表
ü 語法結構
Standard syntax:
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement
Multiple inserts:
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ...] select_statement2] ...
Dynamic partition inserts:
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement
ü 具體實例
1、基本模式插入。
2、多插入模式。
3、自動分區模式。
導出表數據
ü 語法結構
Standard syntax:
INSERT OVERWRITE [LOCAL] DIRECTORY directory1 SELECT ... FROM ...
Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE [LOCAL] DIRECTORY directory1 select_statement1
[INSERT OVERWRITE [LOCAL] DIRECTORY directory2 select_statement2] ...
ü 具體實例
1、導出文件到本地。
說明:
數據寫入到文件系統時進行文本序列化,且每列用^A來區分,\n為換行符。用more命令查看時不容易看出分割符,可以使用: sed -e 's/\x01/|/g' filename來查看。
2、導出數據到HDFS。
2.2.3 SELECT
基本的Select操作
ü 語法結構
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list [HAVING condition]]
[CLUSTER BY col_list
| [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list] |
[LIMIT number]
ü 具體實例
1、獲取年齡大的3個學生。
2、查詢學生信息按年齡,降序排序。
3、按學生名稱匯總學生年齡。
2.3 Hive Join
語法結構
join_table:
table_reference JOIN table_factor [join_condition]
| table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition
| table_reference LEFT SEMI JOIN table_reference join_condition
Hive 只支持等值連接(equality joins)、外連接(outer joins)和(left/right joins)。Hive 不支持所有非等值的連接,因為非等值連接非常難轉化到 map/reduce 任務。另外,Hive 支持多於 2 個表的連接。
寫 join 查詢時,需要注意幾個關鍵點:
1、只支持等值join
例如:
SELECT a.* FROM a JOIN b ON (a.id = b.id)
SELECT a.* FROM a JOIN b
ON (a.id = b.id AND a.department = b.department)
是正確的,然而:
SELECT a.* FROM a JOIN b ON (a.id b.id)
是錯誤的。
- 可以 join 多於 2 個表。
例如
SELECT a.val, b.val, c.val FROM a JOIN b
ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
如果join中多個表的 join key 是同一個,則 join 會被轉化為單個 map/reduce 任務,例如:
SELECT a.val, b.val, c.val FROM a JOIN b
ON (a.key = b.key1) JOIN c
ON (c.key = b.key1)
被轉化為單個 map/reduce 任務,因為 join 中只使用了 b.key1 作為 join key。
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key2)
而這一 join 被轉化為 2 個 map/reduce 任務。因為 b.key1 用於第一次 join 條件,而 b.key2 用於第二次 join。
3.join 時,每次 map/reduce 任務的邏輯:
reducer 會緩存 join 序列中除了最后一個表的所有表的記錄,再通過最后一個表將結果序列化到文件系統。這一實現有助於在 reduce 端減少內存的使用量。實踐中,應該把最大的那個表寫在最后(否則會因為緩存浪費大量內存)。例如:
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
所有表都使用同一個 join key(使用 1 次 map/reduce 任務計算)。Reduce 端會緩存 a 表和 b 表的記錄,然后每次取得一個 c 表的記錄就計算一次 join 結果,類似的還有:
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
這里用了 2 次 map/reduce 任務。第一次緩存 a 表,用 b 表序列化;第二次緩存第一次 map/reduce 任務的結果,然后用 c 表序列化。
4.LEFT,RIGHT 和 FULL OUTER 關鍵字用於處理 join 中空記錄的情況。
例如:
SELECT a.val, b.val FROM a LEFT OUTER
JOIN b ON (a.key=b.key)
對應所有 a 表中的記錄都有一條記錄輸出。輸出的結果應該是 a.val, b.val,當 a.key=b.key 時,而當 b.key 中找不到等值的 a.key 記錄時也會輸出 a.val, NULL。“FROM a LEFT OUTER JOIN b”這句一定要寫在同一行——意思是 a 表在 b 表的左邊,所以 a 表中的所有記錄都被保留了;“a RIGHT OUTER JOIN b”會保留所有 b 表的記錄。OUTER JOIN 語義應該是遵循標准 SQL spec的。
Join 發生在 WHERE 子句之前。如果你想限制 join 的輸出,應該在 WHERE 子句中寫過濾條件——或是在 join 子句中寫。這里面一個容易混淆的問題是表分區的情況:
SELECT a.val, b.val FROM a
LEFT OUTER JOIN b ON (a.key=b.key)
WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
會 join a 表到 b 表(OUTER JOIN),列出 a.val 和 b.val 的記錄。WHERE 從句中可以使用其他列作為過濾條件。但是,如前所述,如果 b 表中找不到對應 a 表的記錄,b 表的所有列都會列出 NULL,包括 ds 列。也就是說,join 會過濾 b 表中不能找到匹配 a 表 join key 的所有記錄。這樣的話,LEFT OUTER 就使得查詢結果與 WHERE 子句無關了。解決的辦法是在 OUTER JOIN 時使用以下語法:
SELECT a.val, b.val FROM a LEFT OUTER JOIN b
ON (a.key=b.key AND
b.ds='2009-07-07' AND
a.ds='2009-07-07')
這一查詢的結果是預先在 join 階段過濾過的,所以不會存在上述問題。這一邏輯也可以應用於 RIGHT 和 FULL 類型的 join 中。
Join 是不能交換位置的。無論是 LEFT 還是 RIGHT join,都是左連接的。
SELECT a.val1, a.val2, b.val, c.val
FROM a
JOIN b ON (a.key = b.key)
LEFT OUTER JOIN c ON (a.key = c.key)
先 join a 表到 b 表,丟棄掉所有 join key 中不匹配的記錄,然后用這一中間結果和 c 表做 join。這一表述有一個不太明顯的問題,就是當一個 key 在 a 表和 c 表都存在,但是 b 表中不存在的時候:整個記錄在第一次 join,即 a JOIN b 的時候都被丟掉了(包括a.val1,a.val2和a.key),然后我們再和 c 表 join 的時候,如果 c.key 與 a.key 或 b.key 相等,就會得到這樣的結果:NULL, NULL, NULL, c.val
具體實例
1、 獲取已經分配班級的學生姓名。
2、 獲取尚未分配班級的學生姓名。
3、 LEFT SEMI JOIN是IN/EXISTS的高效實現。
2.4 Hive函數
2.4.1 內置運算符
關系運算符
運算符 |
類型 |
說明 |
A = B |
原始類型 |
如果A與B相等,返回TRUE,否則返回FALSE |
A == B |
無 |
失敗,因為無效的語法。 SQL使用”=”,不使用”==”。 |
A <> B |
原始類型 |
如果A不等於B返回TRUE,否則返回FALSE。如果A或B值為”NULL”,結果返回”NULL”。 |
A < B |
原始類型 |
如果A小於B返回TRUE,否則返回FALSE。如果A或B值為”NULL”,結果返回”NULL”。 |
A <= B |
原始類型 |
如果A小於等於B返回TRUE,否則返回FALSE。如果A或B值為”NULL”,結果返回”NULL”。 |
A > B |
原始類型 |
如果A大於B返回TRUE,否則返回FALSE。如果A或B值為”NULL”,結果返回”NULL”。 |
A >= B |
原始類型 |
如果A大於等於B返回TRUE,否則返回FALSE。如果A或B值為”NULL”,結果返回”NULL”。 |
A IS NULL |
所有類型 |
如果A值為”NULL”,返回TRUE,否則返回FALSE |
A IS NOT NULL |
所有類型 |
如果A值不為”NULL”,返回TRUE,否則返回FALSE |
A LIKE B |
字符串 |
如果A或B值為”NULL”,結果返回”NULL”。字符串A與B通過sql進行匹配,如果相符返回TRUE, |
不符返回FALSE。 |
||
B字符串中 的”_”代表任一字符,”%”則代表多個任意字符。 |
||
例如: (‘foobar’ like ‘foo’)返回FALSE,( ‘foobar’ like ‘foo_ _ _’或者 ‘foobar’ like ‘foo%’) |
||
則返回TURE |
||
A RLIKE B |
字符串 |
如果A或B值為”NULL”,結果返回”NULL”。字符串A與B通過java進行匹配,如果相符返回TRUE, |
不符返回FALSE。 |
||
例如:( ‘foobar’ rlike ‘foo’)返回FALSE,(’foobar’ rlike ‘^f.*r$’ )返回TRUE。 |
||
A REGEXP B |
字符串 |
與RLIKE相同。 |
算術運算符
運算符 |
類型 |
說明 |
A + B |
數字類型 |
A和B相加。結果的與操作數值有共同類型。例如每一個整數是一個浮點數,浮點數包含整數。 |
所以,一個浮點數和一個整數相加結果也是一個浮點數。 |
||
A – B |
數字類型 |
A和B相減。結果的與操作數值有共同類型。 |
A * B |
數字類型 |
A和B相乘,結果的與操作數值有共同類型。需要說明的是,如果乘法造成溢出,將選擇更高的類型。 |
A / B |
數字類型 |
A和B相除,結果是一個double(雙精度)類型的結果。 |
A % B |
數字類型 |
A除以B余數與操作數值有共同類型。 |
A & B |
數字類型 |
運算符查看兩個參數的二進制表示法的值,並執行按位”與”操作。兩個表達式的一位均為1時,則結果的該位為 1。 |
否則,結果的該位為 0。 |
||
A|B |
數字類型 |
運算符查看兩個參數的二進制表示法的值,並執行按位”或”操作。只要任一表達式的一位為 1,則結果的該位為 1。 |
否則,結果的該位為 0。 |
||
A ^ B |
數字類型 |
運算符查看兩個參數的二進制表示法的值,並執行按位”異或”操作。當且僅當只有一個表達式的某位上為 1 時, |
結果的該位才為 1。 |
||
否則結果的該位為 0。 |
||
~A |
數字類型 |
對一個表達式執行按位”非”(取反)。 |
邏輯運算符
運算符 |
類型 |
說明 |
A AND B |
布爾值 |
A和B同時正確時,返回TRUE,否則FALSE。如果A或B值為NULL,返回NULL。 |
A && B |
布爾值 |
與”A AND B”相同。 |
A OR B |
布爾值 |
A或B正確,或兩者同時正確返返回TRUE,否則FALSE。如果A和B值同時為NULL,返回NULL。 |
A | B |
布爾值 |
與”A OR B”相同 |
NOT A |
布爾值 |
如果A為NULL或錯誤的時候返回TURE,否則返回FALSE。 |
! A |
布爾值 |
與”NOT A”相同 |
復制類型函數
函數 |
類型 |
說明 |
map |
(key1, value1, key2, value2, …) |
通過指定的鍵/值對,創建一個map。 |
struct |
(val1, val2, val3, …) |
通過指定的字段值,創建一個結構。結構字段名稱將COL1,COL2,… |
array |
(val1, val2, …) |
通過指定的元素,創建一個數組。 |
復制類型函數操作
函數 |
類型 |
說明 |
A[n] |
A是一個數組,n為int型 |
返回數組A的第n個元素,第一個元素的索引為0。如果A數組為['foo','bar'], |
M[key] |
M是Map<K, V>,關鍵K型 |
返回關鍵值對應的值,例如mapM為 \{‘f’ -> ‘foo’, ‘b’ -> ‘bar’, ‘all’ -> ‘foobar’\},則M['all'] 返回’foobar’。 |
S.x |
S為struct |
返回結構x字符串在結構S中的存儲位置。 |
2.4.2 內置函數
數學函數
返回類型 |
函數 |
說明 |
BIGINT |
round(double a) |
四舍五入 |
DOUBLE |
round(double a, int d) |
小數部分d位之后數字四舍五入,例如round(21.263,2),返回21.26 |
BIGINT |
floor(double a) |
對給定數據進行向下舍入最接近的整數。例如floor(21.2),返回21。 |
BIGINT |
ceil(double a) |
將參數向上舍入為最接近的整數。例如ceil(21.2),返回23. |
ceiling(double a) |
||
double |
rand(), rand(int seed) |
返回大於或等於0且小於1的平均分布隨機數(依重新計算而變) |
double |
exp(double a) |
返回e的n次方 |
double |
ln(double a) |
返回給定數值的自然對數 |
double |
log10(double a) |
返回給定數值的以10為底自然對數 |
double |
log2(double a) |
返回給定數值的以2為底自然對數 |
double |
log(double base, double a) |
返回給定底數及指數返回自然對數 |
double |
pow(double a, double p) |
返回某數的乘冪 |
power(double a, double p) |
||
double |
sqrt(double a) |
返回數值的平方根 |
string |
bin(BIGINT a) |
返回二進制格式, |
參考:http://dev.mysql.com/doc/refman/5.0/en/string-functions.html#function_hex |
||
string |
hex(BIGINT a) |
將整數或字符轉換為十六進制格式。 |
hex(string a) |
參考:http://dev.mysql.com/doc/refman/5.0/en/string-functions.html#function_hex |
|
string |
unhex(string a) |
十六進制字符轉換由數字表示的字符。 |
string |
conv(BIGINT num, int from_base, int to_base) |
將指定數值,由原來的度量體系轉換為指定的試題體系。例如CONV(‘a’,16,2),返回。 |
參考:’1010′ http://dev.mysql.com/doc/refman/5.0/en/mathematical-functions.html#function_conv |
||
double |
abs(double a) |
取絕對值 |
int double |
pmod(int a, int b) |
返回a除b的余數的絕對值 |
pmod(double a, double b) |
||
double |
sin(double a) |
返回給定角度的正弦值 |
double |
asin(double a) |
返回x的反正弦,即是X。如果X是在-1到1的正弦值,返回NULL。 |
double |
cos(double a) |
返回余弦 |
double |
acos(double a) |
返回X的反余弦,即余弦是X,,如果-1<= A <= 1,否則返回null. |
int double |
positive(int a) |
返回A的值,例如positive(2),返回2。 |
positive(double a) |
||
int double |
negative(int a) |
返回A的相反數,例如negative(2),返回-2。 |
negative(double a) |
收集函數
回類型 |
函數 |
說明 |
int |
size(Map<K.V>) |
返回的map類型的元素的數量 |
int |
size(Array<T>) |
返回數組類型的元素數量 |
類型轉換函數
返回類型 |
函數 |
說明 |
指定 “type” |
cast(expr as <type>) |
類型轉換。例如將字符”1″轉換為整數:cast(’1′ as bigint),如果轉換失敗返回NULL。 |
日期函數
返回類型 |
函數 |
說明 |
string |
from_unixtime(bigint unixtime[, string format]) |
UNIX_TIMESTAMP參數表示返回一個值’YYYY- MM – DD HH:MM:SS’ |
或YYYYMMDDHHMMSS.uuuuuu格式,這取決於是否是在一個字符串或數字語境中 |
||
使用的功能。 |
||
該值表示在當前的時區。 |
||
bigint |
unix_timestamp() |
如果不帶參數的調用,返回一個Unix時間戳(從’1970- 01 – 0100:00:00′到現在的 |
UTC秒數) |
||
為無符號整數。 |
||
bigint |
unix_timestamp(string date) |
指定日期參數調用UNIX_TIMESTAMP(),它返回參數值’1970- 01 – 0100:00:00′到 |
指定日期的秒數。 |
||
bigint |
unix_timestamp(string date, string pattern) |
指定時間輸入格式,返回到1970年秒數: |
unix_timestamp(’2009-03-20′, ‘yyyy-MM-dd’) = 1237532400 |
||
參考:http://java.sun.com/j2se/1.4.2/docs/api/java/text/SimpleDateFormat.html |
||
string |
to_date(string timestamp) |
返回時間中的年月日: to_date(“1970-01-01 00:00:00″) = “1970-01-01″ |
string |
to_dates(string date) |
給定一個日期date,返回一個天數(0年以來的天數) |
int |
year(string date) |
返回指定時間的年份,范圍在1000到9999,或為”零”日期的0。 |
int |
month(string date) |
返回指定時間的月份,范圍為1至12月,或0一個月的一部分,如’0000-00-00′ |
或’2008-00-00′的日期。 |
||
int |
day(string date) dayofmonth(date) |
返回指定時間的日期 |
int |
hour(string date) |
返回指定時間的小時,范圍為0到23。 |
int |
minute(string date) |
返回指定時間的分鍾,范圍為0到59。 |
int |
second(string date) |
返回指定時間的秒,范圍為0到59。 |
int |
weekofyear(string date) |
返回指定日期所在一年中的星期號,范圍為0到53。 |
int |
datediff(string enddate, string startdate) |
兩個時間參數的日期之差。 |
int |
date_add(string startdate, int days) |
給定時間,在此基礎上加上指定的時間段。 |
int |
date_sub(string startdate, int days) |
給定時間,在此基礎上減去指定的時間段。 |
條件函數
返回類型 |
函數 |
說明 |
T |
if(boolean testCondition, T valueTrue, T valueFalseOrNull) |
判斷是否滿足條件,如果滿足返回一個值,如果不滿足則返回另一個值。 |
T |
COALESCE(T v1, T v2, …) |
返回一組數據中,第一個不為NULL的值,如果均為NULL,返回NULL。 |
T |
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END |
當a=b時,返回c;當a=d時,返回e,否則返回f。 |
T |
CASE WHEN a THEN b [WHEN c THEN d]* [ELSE e] END |
當值為a時返回b,當值為c時返回d。否則返回e。 |
字符函數
返回類型 |
函數 |
說明 |
int |
length(string A) |
返回字符串的長度 |
string |
reverse(string A) |
返回倒序字符串 |
string |
concat(string A, string B…) |
連接多個字符串,合並為一個字符串,可以接受任意數量的輸入字符串 |
string |
concat_ws(string SEP, string A, string B…) |
鏈接多個字符串,字符串之間以指定的分隔符分開。 |
string |
substr(string A, int start) substring(string A, int start) |
從文本字符串中指定的起始位置后的字符。 |
string |
substr(string A, int start, int len) substring(string A, int start, int len) |
從文本字符串中指定的位置指定長度的字符。 |
string |
upper(string A) ucase(string A) |
將文本字符串轉換成字母全部大寫形式 |
string |
lower(string A) lcase(string A) |
將文本字符串轉換成字母全部小寫形式 |
string |
trim(string A) |
刪除字符串兩端的空格,字符之間的空格保留 |
string |
ltrim(string A) |
刪除字符串左邊的空格,其他的空格保留 |
string |
rtrim(string A) |
刪除字符串右邊的空格,其他的空格保留 |
string |
regexp_replace(string A, string B, string C) |
字符串A中的B字符被C字符替代 |
string |
regexp_extract(string subject, string pattern, int index) |
通過下標返回正則表達式指定的部分。regexp_extract(‘foothebar’, ‘foo(.*?)(bar)’, 2) returns ‘bar.’ |
string |
parse_url(string urlString, string partToExtract [, string keyToExtract]) |
返回URL指定的部分。parse_url(‘http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1′, ‘HOST’) 返回:’facebook.com’ |
string |
get_json_object(string json_string, string path) |
select a.timestamp, get_json_object(a.appevents, ‘$.eventid’), get_json_object(a.appenvets, ‘$.eventname’) from log a; |
string |
space(int n) |
返回指定數量的空格 |
string |
repeat(string str, int n) |
重復N次字符串 |
int |
ascii(string str) |
返回字符串中首字符的數字值 |
string |
lpad(string str, int len, string pad) |
返回指定長度的字符串,給定字符串長度小於指定長度時,由指定字符從左側填補。 |
string |
rpad(string str, int len, string pad) |
返回指定長度的字符串,給定字符串長度小於指定長度時,由指定字符從右側填補。 |
array |
split(string str, string pat) |
將字符串轉換為數組。 |
int |
find_in_set(string str, string strList) |
返回字符串str第一次在strlist出現的位置。如果任一參數為NULL,返回NULL;如果第一個參數包含逗號,返回0。 |
array<array<string>> |
sentences(string str, string lang, string locale) |
將字符串中內容按語句分組,每個單詞間以逗號分隔,最后返回數組。 例如sentences(‘Hello there! How are you?’) 返回:( (“Hello”, “there”), (“How”, “are”, “you”) ) |
array<struct<string,double>> |
ngrams(array<array<string>>, int N, int K, int pf) |
SELECT ngrams(sentences(lower(tweet)), 2, 100 [, 1000]) FROM twitter; |
array<struct<string,double>> |
context_ngrams(array<array<string>>, array<string>, int K, int pf) |
SELECT context_ngrams(sentences(lower(tweet)), array(null,null), 100, [, 1000]) FROM twitter; |
內置聚合函數
返回類型 |
函數 |
說明 |
bigint |
count(*) , count(expr), count(DISTINCT expr[, expr_., expr_.]) |
返回記錄條數。 |
double |
sum(col), sum(DISTINCT col) |
求和 |
double |
avg(col), avg(DISTINCT col) |
求平均值 |
double |
min(col) |
返回指定列中最小值 |
double |
max(col) |
返回指定列中最大值 |
double |
var_pop(col) |
返回指定列的方差 |
double |
var_samp(col) |
返回指定列的樣本方差 |
double |
stddev_pop(col) |
返回指定列的偏差 |
double |
stddev_samp(col) |
返回指定列的樣本偏差 |
double |
covar_pop(col1, col2) |
兩列數值協方差 |
double |
covar_samp(col1, col2) |
兩列數值樣本協方差 |
double |
corr(col1, col2) |
返回兩列數值的相關系數 |
double |
percentile(col, p) |
返回數值區域的百分比數值點。0<=P<=1,否則返回NULL,不支持浮點型數值。 |
array<double> |
percentile(col, array(p~1,,\ [, p,,2,,]…)) |
返回數值區域的一組百分比值分別對應的數值點。0<=P<=1,否則返回NULL,不支持浮點型數值。 |
double |
percentile_approx(col, p[, B]) |
Returns an approximate p^th^ percentile of a numeric column (including floating point types) in the group. |
The B parameter controls approximation accuracy at the cost of memory. Higher values yield better approximations, and the default is 10,000. When the number of distinct values in col is smaller than B, |
||
this gives an exact percentile value. |
||
array<double> |
percentile_approx(col, array(p~1,, [, p,,2_]…) [, B]) |
Same as above, but accepts and returns an array of percentile values instead of a single one. |
array<struct\{‘x’,'y’\}> |
histogram_numeric(col, b) |
Computes a histogram of a numeric column in the group using b non-uniformly spaced bins. |
The output is an array of size b of double-valued (x,y) coordinates that represent the bin centers and heights |
||
array |
collect_set(col) |
返回無重復記錄 |
內置表函數
返回類型 |
函數 |
說明 |
數組 |
explode(array<TYPE> a) |
數組一條記錄中有多個參數,將參數拆分,每個參數生成一列。 |
|
json_tuple |
get_json_object語句: |
select a.timestamp, get_json_object(a.appevents, ‘$.eventid’), get_json_object(a.appenvets, ‘$.eventname’) |
||
from log a; |
||
json_tuple語句: |
||
select a.timestamp, b.* from log a lateral view json_tuple(a.appevent, ‘eventid’, ‘eventname’) b as f1, f2 |
2.5 Hive Shell基本操作
2.5.1 Hive命令行
語法結構
hive [-hiveconf x=y]* [<-i filename>]* [<-f filename>|<-e query-string>] [-S]
說明:
1、 -i 從文件初始化HQL。
2、 -e從命令行執行指定的HQL
3、 -f 執行HQL腳本
4、 -v 輸出執行的HQL語句到控制台
5、 -p <port> connect to Hive Server on port number -hiveconf x=y Use this to set hive/hadoop configuration variables.
具體實例
1、運行一個查詢。
2、運行一個文件。
3、運行參數文件。
3. Hive參數配置說明
開發Hive應用時,不可避免地需要設定Hive的參數。設定Hive的參數可以調優HQL代碼的執行效率,或幫助定位問題。然而實踐中經常遇到的一個問題是,為什么設定的參數沒有起作用?
這通常是錯誤的設定方式導致的。
對於一般參數,有以下三種設定方式:
- 配置文件
- 命令行參數
- 參數聲明
配置文件:Hive的配置文件包括
- 用戶自定義配置文件:$HIVE_CONF_DIR/hive-site.xml
- 默認配置文件:$HIVE_CONF_DIR/hive-default.xml
用戶自定義配置會覆蓋默認配置。另外,Hive也會讀入Hadoop的配置,因為Hive是作為Hadoop的客戶端啟動的,Hadoop的配置文件包括
- $HADOOP_CONF_DIR/hive-site.xml
- $HADOOP_CONF_DIR/hive-default.xml
Hive的配置會覆蓋Hadoop的配置。
配置文件的設定對本機啟動的所有Hive進程都有效。
命令行參數:啟動Hive(客戶端或Server方式)時,可以在命令行添加-hiveconf param=value來設定參數,例如:
bin/hive -hiveconf hive.root.logger=INFO,console
這一設定對本次啟動的Session(對於Server方式啟動,則是所有請求的Sessions)有效。
參數聲明:可以在HQL中使用SET關鍵字設定參數,例如:
set mapred.reduce.tasks=100;
這一設定的作用域也是Session級的。
上述三種設定方式的優先級依次遞增。即參數聲明覆蓋命令行參數,命令行參數覆蓋配置文件設定。注意某些系統級的參數,例如log4j相關的設定,必須用前兩種方式設定,因為那些參數的讀取在Session建立以前已經完成了。
另外,SerDe參數必須寫在DDL(建表)語句中。例如:
create table if not exists t_dummy(
dummy string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim'='\t',
'escape.delim'='\\',
'serialization.null.format'=' '
) STORED AS TEXTFILE;
類似serialization.null.format這樣的參數,必須和某個表或分區關聯。在DDL外部聲明將不起作用。
4. Hive自定義函數和定義Transform
當Hive提供的內置函數無法滿足你的業務處理需要時,此時就可以考慮使用用戶自定義函數(UDF:user-defined function)。
4.1 自定義函數類別
UDF操作作用於單個數據行,產生一個數據行作為輸出。(數學函數,字符串函數)
UDAF(用戶定義聚集函數):接收多個輸入數據行,並產生一個輸出數據行。(count,max)
UDTF(用戶定義表生成函數):操作作用於單個數據行,並產生多個數據行。(explode)
4.2 添加函數到hive庫
add jar /home/hadoop/workspace/hiveext/target/hive-test.jar;
create temporary function strip as 'com.simple.udf.Strip';
4.3 UDF開發實例
4.4 Transform實現
Hive的 TRANSFORM 關鍵字提供了在SQL中調用自寫腳本的功能,適合實現Hive中沒有的功能又不想寫UDF的情況。
如下面這句sql就是借用了weekday_mapper.py對數據進行了處理.
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
add FILE weekday_mapper.py;
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py'
AS (userid, movieid, rating, weekday)
FROM u_data;
,其中weekday_mapper.py內容如下
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)])
如下面的例子則是使用了shell的cat命令來處理數據
FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
5. Hive執行過程實例分析
5.1 JOIN
對於 JOIN 操作:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid); |
實現過程為:
- Map:
- 以 JOIN ON 條件中的列作為 Key,如果有多個列,則 Key 是這些列的組合
- 以 JOIN 之后所關心的列作為 Value,當有多個列時,Value 是這些列的組合。在 Value 中還會包含表的 Tag 信息,用於標明此 Value 對應於哪個表。
- 按照 Key 進行排序。
- Shuffle:
- 根據 Key 的值進行 Hash,並將 Key/Value 對按照 Hash 值推至不同對 Reduce 中。
- Reduce:
- Reducer 根據 Key 值進行 Join 操作,並且通過 Tag 來識別不同的表中的數據。
具體實現過程如圖:
5.2 GROUP BY
SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age; |
具體實現過程如圖:
5.3 DISTINCT
SELECT age, count(distinct pageid) FROM pv_users GROUP BY age; |
實現過程如圖:
6. Hive使用注意點
6.1字符集
Hadoop和Hive都是用UTF-8編碼的,所以, 所有中文必須是UTF-8編碼, 才能正常使用
備注:中文數據load到表里面, 如果字符集不同,很有可能全是亂碼需要做轉碼的, 但是hive本身沒有函數來做這個
6.2壓縮
hive.exec.compress.output 這個參數, 默認是 false,但是很多時候貌似要單獨顯式設置一遍
否則會對結果做壓縮的,如果你的這個文件后面還要在hadoop下直接操作, 那么就不能壓縮了
6.3 count(distinct)
當前的 Hive 不支持在一條查詢語句中有多 Distinct。如果要在 Hive 查詢語句中實現多Distinct,需要使用至少 n+1 條查詢語句(n為distinct的數目),前 n 條查詢分 別對 n 個列去重,最后一條查詢語句對 n 個去重之后的列做 Join 操作,得到最終結果。
6.4 JOIN
只支持等值連接
6.5 DML操作
只支持INSERT/LOAD操作,新版本支持UPDATE和DELTE。
6.6 HAVING
不支持HAVING操作。如果需要這個功能要嵌套一個子查詢用where限制
新版本支持。
6.7 子查詢
Hive不支持where子句中的子查詢
6.8 Join中處理null值的語義區別
SQL標准中,任何對null的操作(數值比較,字符串操作等)結果都為null。Hive對null值處理的邏輯和標准基本一致,除了Join時的特殊邏輯。
這里的特殊邏輯指的是,Hive的Join中,作為Join key的字段比較,null=null是有意義的,且返回值為true。檢查以下查詢:
select u.uid, count(u.uid)
from t_weblog l join t_user u on (l.uid = u.uid) group by u.uid;
查詢中,t_weblog表中uid為空的記錄將和t_user表中uid為空的記錄做連接,即l.uid = u.uid=null成立。
如果需要與標准一致的語義,我們需要改寫查詢手動過濾null值的情況:
select u.uid, count(u.uid)
from t_weblog l join t_user u
on (l.uid = u.uid and l.uid is not null and u.uid is not null)
group by u.uid;
實踐中,這一語義區別也是經常導致數據傾斜的原因之一。
6.9分號字符
分號是SQL語句結束標記,在HiveQL中也是,但是在HiveQL中,對分號的識別沒有那么智慧,例如:
select concat(cookie_id,concat(';',’zoo’)) from c02_clickstat_fatdt1 limit 2;
FAILED: Parse Error: line 0:-1 cannot recognize input '<EOF>' in function specification
可以推斷,Hive解析語句的時候,只要遇到分號就認為語句結束,而無論是否用引號包含起來。
解決的辦法是,使用分號的八進制的ASCII碼進行轉義,那么上述語句應寫成:
select concat(cookie_id,concat('\073','zoo')) from c02_clickstat_fatdt1 limit 2;
為什么是八進制ASCII碼?
我嘗試用十六進制的ASCII碼,但Hive會將其視為字符串處理並未轉義,好像僅支持八進制,原因不詳。這個規則也適用於其他非SELECT語句,如CREATE TABLE中需要定義分隔符,那么對不可見字符做分隔符就需要用八進制的ASCII碼來轉義。
6.10 Insert
6.10.1新增數據
根據語法Insert必須加“OVERWRITE”關鍵字,也就是說每一次插入都是一次重寫。那如何實現表中新增數據呢?
假設Hive中有表xiaojun1,
hive> DESCRIBE xiaojun1;
OK
id int
value int
hive> SELECT * FROM xiaojun1;
OK
3 4
1 2
2 3
現增加一條記錄:
hive> INSERT OVERWRITE TABLE xiaojun1
SELECT id, value FROM (
SELECT id, value FROM xiaojun1
UNION ALL
SELECT 4 AS id, 5 AS value FROM xiaojun1 limit 1
) u;
結果是:
hive>SELECT * FROM p1;
OK
3 4
4 5
2 3
1 2
其中的關鍵在於, 關鍵字UNION ALL的應用, 即將原有數據集和新增數據集進行結合, 然后重寫表.
6.10.2 插入次序
INSERT OVERWRITE TABLE在插入數據時,是按照后面的SELECT語句中的字段順序插入的. 也就說, 當id 和value 的位置互換, 那么value將被寫入id, 同id被寫入value.
6.10.3 初始值
INSERT OVERWRITE TABLE在插入數據時, 后面的字段的初始值應注意與表定義中的一致性. 例如, 當為一個STRING類型字段初始為NULL時:
NULL AS field_name // 這可能會被提示定義類型為STRING, 但這里是void
CAST(NULL AS STRING) AS field_name // 這樣是正確的
又如, 為一個BIGINT類型的字段初始為0時:
CAST(0 AS BIGINT) AS field_name
7. Hive優化策略
7.1 HADOOP計算框架特性
- 數據量大不是問題,數據傾斜是個問題。
- jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。原因是map reduce作業初始化的時間是比較長的。
- sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端的匯總合並優化,使數據傾斜不成問題。
- count(distinct ),在數據量大的情況下,效率較低,如果是多count(distinct )效率更低,因為count(distinct)是按group by 字段分組,按distinct字段排序,一般這種分布方式是很傾斜的,比如男uv,女uv,淘寶一天30億的pv,如果按性別分組,分配2個reduce,每個reduce處理15億數據。
7.2 優化的常用手段
- 好的模型設計事半功倍。
- 解決數據傾斜問題。
- 減少job數。
- 設置合理的map reduce的task數,能有效提升性能。(比如,10w+級別的計算,用160個reduce,那是相當的浪費,1個足夠)。
- 了解數據分布,自己動手解決數據傾斜問題是個不錯的選擇。set hive.groupby.skewindata=true;這是通用的算法優化,但算法優化有時不能適應特定業務背景,開發人員了解業務,了解數據,可以通過業務邏輯精確有效的解決數據傾斜問題。
- 數據量較大的情況下,慎用count(distinct),count(distinct)容易產生傾斜問題。
- 對小文件進行合並,是行至有效的提高調度效率的方法,假如所有的作業設置合理的文件數,對雲梯的整體調度效率也會產生積極的正向影響。
- 優化時把握整體,單個作業最優不如整體最優。
7.3 全排序
Hive的排序關鍵字是SORT BY,它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序。
7.3.1 例1
set mapred.reduce.tasks=2;
原值
select cookie_id,page_id,id from c02_clickstat_fatdt1
where cookie_id IN('1.193.131.218.1288611279693.0','1.193.148.164.1288609861509.2')
1.193.148.164.1288609861509.2 113181412886099008861288609901078194082403 684000005
1.193.148.164.1288609861509.2 127001128860563972141288609859828580660473 684000015
1.193.148.164.1288609861509.2 113181412886099165721288609915890452725326 684000018
1.193.131.218.1288611279693.0 01c183da6e4bc50712881288611540109914561053 684000114
1.193.131.218.1288611279693.0 01c183da6e4bc22412881288611414343558274174 684000118
1.193.131.218.1288611279693.0 01c183da6e4bc50712881288611511781996667988 684000121
1.193.131.218.1288611279693.0 01c183da6e4bc22412881288611523640691739999 684000126
1.193.131.218.1288611279693.0 01c183da6e4bc50712881288611540109914561053 684000128
hive> select cookie_id,page_id,id from c02_clickstat_fatdt1 where
cookie_id IN('1.193.131.218.1288611279693.0','1.193.148.164.1288609861509.2')
SORT BY COOKIE_ID,PAGE_ID;
SORT排序后的值
1.193.131.218.1288611279693.0 684000118 01c183da6e4bc22412881288611414343558274174 684000118
1.193.131.218.1288611279693.0 684000114 01c183da6e4bc50712881288611540109914561053 684000114
1.193.131.218.1288611279693.0 684000128 01c183da6e4bc50712881288611540109914561053 684000128
1.193.148.164.1288609861509.2 684000005 113181412886099008861288609901078194082403 684000005
1.193.148.164.1288609861509.2 684000018 113181412886099165721288609915890452725326 684000018
1.193.131.218.1288611279693.0 684000126 01c183da6e4bc22412881288611523640691739999 684000126
1.193.131.218.1288611279693.0 684000121 01c183da6e4bc50712881288611511781996667988 684000121
1.193.148.164.1288609861509.2 684000015 127001128860563972141288609859828580660473 684000015
select cookie_id,page_id,id from c02_clickstat_fatdt1
where cookie_id IN('1.193.131.218.1288611279693.0','1.193.148.164.1288609861509.2')
ORDER BY PAGE_ID,COOKIE_ID;
1.193.131.218.1288611279693.0 684000118 01c183da6e4bc22412881288611414343558274174 684000118
1.193.131.218.1288611279693.0 684000126 01c183da6e4bc22412881288611523640691739999 684000126
1.193.131.218.1288611279693.0 684000121 01c183da6e4bc50712881288611511781996667988 684000121
1.193.131.218.1288611279693.0 684000114 01c183da6e4bc50712881288611540109914561053 684000114
1.193.131.218.1288611279693.0 684000128 01c183da6e4bc50712881288611540109914561053 684000128
1.193.148.164.1288609861509.2 684000005 113181412886099008861288609901078194082403 684000005
1.193.148.164.1288609861509.2 684000018 113181412886099165721288609915890452725326 684000018
1.193.148.164.1288609861509.2 684000015 127001128860563972141288609859828580660473 684000015
可以看到SORT和ORDER排序出來的值不一樣。一開始我指定了2個reduce進行數據分發(各自進行排序)。結果不一樣的主要原因是上述查詢沒有reduce key,hive會生成隨機數作為reduce key。這樣的話輸入記錄也隨機地被分發到不同reducer機器上去了。為了保證reducer之間沒有重復的cookie_id記錄,可以使用DISTRIBUTE BY關鍵字指定分發key為cookie_id。
select cookie_id,country,id,page_id,id from c02_clickstat_fatdt1 where cookie_id IN('1.193.131.218.1288611279693.0','1.193.148.164.1288609861509.2') distribute by cookie_id SORT BY COOKIE_ID,page_id;
1.193.131.218.1288611279693.0 684000118 01c183da6e4bc22412881288611414343558274174 684000118
1.193.131.218.1288611279693.0 684000126 01c183da6e4bc22412881288611523640691739999 684000126
1.193.131.218.1288611279693.0 684000121 01c183da6e4bc50712881288611511781996667988 684000121
1.193.131.218.1288611279693.0 684000114 01c183da6e4bc50712881288611540109914561053 684000114
1.193.131.218.1288611279693.0 684000128 01c183da6e4bc50712881288611540109914561053 684000128
1.193.148.164.1288609861509.2 684000005 113181412886099008861288609901078194082403 684000005
1.193.148.164.1288609861509.2 684000018 113181412886099165721288609915890452725326 684000018
1.193.148.164.1288609861509.2 684000015 127001128860563972141288609859828580660473 684000015
7.3.2 例2
CREATE TABLE if not exists t_order(
id int, -- 訂單編號
sale_id int, -- 銷售ID
customer_id int, -- 客戶ID
product _id int, -- 產品ID
amount int -- 數量
) PARTITIONED BY (ds STRING);
在表中查詢所有銷售記錄,並按照銷售ID和數量排序:
set mapred.reduce.tasks=2;
Select sale_id, amount from t_order
Sort by sale_id, amount;
這一查詢可能得到非期望的排序。指定的2個reducer分發到的數據可能是(各自排序):
Reducer1:
Sale_id | amount
0 | 100
1 | 30
1 | 50
2 | 20
Reducer2:
Sale_id | amount
0 | 110
0 | 120
3 | 50
4 | 20
使用DISTRIBUTE BY關鍵字指定分發key為sale_id。改造后的HQL如下:
set mapred.reduce.tasks=2;
Select sale_id, amount from t_order
Distribute by sale_id
Sort by sale_id, amount;
這樣能夠保證查詢的銷售記錄集合中,銷售ID對應的數量是正確排序的,但是銷售ID不能正確排序,原因是hive使用hadoop默認的HashPartitioner分發數據。
這就涉及到一個全排序的問題。解決的辦法無外乎兩種:
1.) 不分發數據,使用單個reducer:
set mapred.reduce.tasks=1;
這一方法的缺陷在於reduce端成為了性能瓶頸,而且在數據量大的情況下一般都無法得到結果。但是實踐中這仍然是最常用的方法,原因是通常排序的查詢是為了得到排名靠前的若干結果,因此可以用limit子句大大減少數據量。使用limit n后,傳輸到reduce端(單機)的數據記錄數就減少到n* (map個數)。
2.) 修改Partitioner,這種方法可以做到全排序。這里可以使用Hadoop自帶的TotalOrderPartitioner(來自於Yahoo!的TeraSort項目),這是一個為了支持跨reducer分發有序數據開發的Partitioner,它需要一個SequenceFile格式的文件指定分發的數據區間。如果我們已經生成了這一文件(存儲在/tmp/range_key_list,分成100個reducer),可以將上述查詢改寫為
set mapred.reduce.tasks=100;
set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
set total.order.partitioner.path=/tmp/ range_key_list;
Select sale_id, amount from t_order
Cluster by sale_id
Sort by amount;
有很多種方法生成這一區間文件(例如hadoop自帶的o.a.h.mapreduce.lib.partition.InputSampler工具)。這里介紹用Hive生成的方法,例如有一個按id有序的t_sale表:
CREATE TABLE if not exists t_sale (
id int,
name string,
loc string
);
則生成按sale_id分發的區間文件的方法是:
create external table range_keys(sale_id int)
row format serde
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as
inputformat
'org.apache.hadoop.mapred.TextInputFormat'
outputformat
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/range_key_list';
insert overwrite table range_keys
select distinct sale_id
from source t_sale sampletable(BUCKET 100 OUT OF 100 ON rand())
sort by sale_id;
生成的文件(/tmp/range_key_list目錄下)可以讓TotalOrderPartitioner按sale_id有序地分發reduce處理的數據。區間文件需要考慮的主要問題是數據分發的均衡性,這有賴於對數據深入的理解。
7.4 怎樣做笛卡爾積
當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積,這實際說明了Hive對笛卡爾積支持較弱。因為找不到Join key,Hive只能使用1個reducer來完成笛卡爾積。
當然也可以用上面說的limit的辦法來減少某個表參與join的數據量,但對於需要笛卡爾積語義的需求來說,經常是一個大表和一個小表的Join操作,結果仍然很大(以至於無法用單機處理),這時MapJoin才是最好的解決辦法。
MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存。
MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)。其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里。
PS:有用戶說MapJoin在子查詢中可能出現未知BUG。在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數。
7.5 怎樣寫exist/in子句
Hive不支持where子句中的子查詢,SQL常用的exist in子句需要改寫。這一改寫相對簡單。考慮以下SQL查詢語句:
SELECT a.key, a.value
FROM a
WHERE a.key in
(SELECT b.key
FROM B);
可以改寫為
SELECT a.key, a.value
FROM a LEFT OUTER JOIN b ON (a.key = b.key)
WHERE b.key <> NULL;
一個更高效的實現是利用left semi join改寫為:
SELECT a.key, a.val
FROM a LEFT SEMI JOIN b on (a.key = b.key);
left semi join是0.5.0以上版本的特性。
7.6 怎樣決定reducer個數
Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率,這使得Hive怎樣決定reducer個數成為一個關鍵問題。遺憾的是Hive的估計機制很弱,不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:
1. hive.exec.reducers.bytes.per.reducer(默認為1000^3)
2. hive.exec.reducers.max(默認為999)
計算reducer數的公式很簡單:
N=min(參數2,總輸入數據量/參數1)
通常情況下,有必要手動指定reducer個數。考慮到map階段的輸出數據量通常會比輸入有大幅減少,因此即使不設定reducer個數,重設參數2還是必要的。依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)。
7.7 合並MapReduce操作
Multi-group by
Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便。例如,
FROM (SELECT a.status, b.school, b.gender
FROM status_updates a JOIN profiles b
ON (a.userid = b.userid and
a.ds='2009-03-20' )
) subq1
INSERT OVERWRITE TABLE gender_summary
PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary
PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。
Multi-distinct
Multi-distinct是淘寶開發的另一個multi-xxx特性,使用Multi-distinct可以在同一查詢/子查詢中使用多個distinct,這同樣減少了多次MapReduce操作
7.7 Bucket 與 Sampling
Bucket是指將數據以指定列的值為key進行hash,hash到指定數目的桶中。這樣就可以支持高效采樣了。
如下例就是以userid這一列為bucket的依據,共設置32個buckets
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;
Sampling可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而如果一個表已經對某一列制作了bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。
如下例所示就是采樣了page_view中32個桶中的第三個桶。
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
7.8 Partition
Partition就是分區。分區通過在創建表時啟用partition by實現,用來partition的維度並不是實際數據的某一列,具體分區的標志是由插入內容時給定的。當要查詢某一分區的內容時可以采用where語句,形似where tablename.partition_key > a來實現。
創建含分區的表
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
STORED AS TEXTFILE;
載入內容,並指定分區標志
LOAD DATA LOCAL INPATH `/tmp/pv_2008-06-08_us.txt` INTO TABLE page_view PARTITION(date='2008-06-08', country='US');
查詢指定標志的分區內容
SELECT page_views.*
FROM page_views
WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND page_views.referrer_url like '%xyz.com';
7.9 JOIN
7.9.1 JOIN原則
在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊。原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率。對於一條語句中有多個 Join 的情況,如果 Join 的條件相同,比如查詢:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x ON (u.userid = x.userid); |
- 如果 Join 的 key 相同,不管有多少個表,都會則會合並為一個 Map-Reduce
- 一個 Map-Reduce 任務,而不是 ‘n’ 個
- 在做 OUTER JOIN 的時候也是一樣
如果 Join 的條件不相同,比如:
INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x on (u.age = x.age); |
Map-Reduce 的任務數目和 Join 操作的數目是對應的,上述查詢和以下查詢是等價的:
INSERT OVERWRITE TABLE tmptable SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid); INSERT OVERWRITE TABLE pv_users SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age); |
7.9.2 Map Join
Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到。比如查詢:
INSERT OVERWRITE TABLE pv_users SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid); |
可以在 Map 階段完成 Join,如圖所示:
相關的參數為:
- hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
- hive.mapjoin.size.key = 10000
- hive.mapjoin.cache.numrows = 10000
7.10 數據傾斜
7.10.1 空值數據傾斜
場景:如日志中,常會有信息丟失的問題,比如全網日志中的user_id,如果取其中的user_id和bmw_users關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯
Select * From log a
Join bmw_users b
On a.user_id is not null
And a.user_id = b.user_id
Union all
Select * from log a
where a.user_id is null;
解決方法2 :賦與空值分新的key值
Select *
from log a
left outer join bmw_users b
on case when a.user_id is null then concat(‘dp_hive’,rand() ) else a.user_id end = b.user_id;
結論:方法2比方法效率更好,不但io少了,而且作業數也少了。方法1 log讀取兩次,jobs是2。方法2 job數是1 。這個優化適合無效id(比如-99,’’,null等)產生的傾斜問題。把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。附上hadoop通用關聯的實現方法(關聯通過二次排序實現的,關聯的列為parition key,關聯的列c1和表的tag組成排序的group key,根據parition key分配reduce。同一reduce內根據group key排序)
7.10.2 不同數據類型關聯產生數據傾斜
場景:一張表s8的日志,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題。s8的日志中有字符串商品id,也有數字的商品id,類型是string的,但商品中的數字id是bigint的。猜測問題的原因是把s8的商品id轉成數字id做hash來分配reduce,所以字符串id的s8日志,都到一個reduce上了,解決的方法驗證了這個猜測。
解決方法:把數字類型轉換成字符串類型
Select * from s8_log a
Left outer join r_auction_auctions b
On a.auction_id = cast(b.auction_id as string);
7.10.3 大表Join的數據偏斜
MapReduce編程模型下開發代碼需要考慮數據偏斜的問題,Hive代碼也是一樣。數據偏斜的原因包括以下兩點:
1. Map輸出key數量極少,導致reduce端退化為單機作業。
2. Map輸出key分布不均,少量key對應大量value,導致reduce端單機瓶頸。
Hive中我們使用MapJoin解決數據偏斜的問題,即將其中的某個表(全量)分發到所有Map端進行Join,從而避免了reduce。這要求分發的表可以被全量載入內存。
極限情況下,Join兩邊的表都是大表,就無法使用MapJoin。
這種問題最為棘手,目前已知的解決思路有兩種:
1. 如果是上述情況1,考慮先對Join中的一個表去重,以此結果過濾無用信息。這樣一般會將其中一個大表轉化為小表,再使用MapJoin 。
一個實例是廣告投放效果分析,例如將廣告投放者信息表i中的信息填充到廣告曝光日志表w中,使用投放者id關聯。因為實際廣告投放者數量很少(但是投放者信息表i很大),因此可以考慮先在w表中去重查詢所有實際廣告投放者id列表,以此Join過濾表i,這一結果必然是一個小表,就可以使用MapJoin。
2. 如果是上述情況2,考慮切分Join中的一個表為多片,以便將切片全部載入內存,然后采用多次MapJoin得到結果。
一個實例是商品瀏覽日志分析,例如將商品信息表i中的信息填充到商品瀏覽日志表w中,使用商品id關聯。但是某些熱賣商品瀏覽量很大,造成數據偏斜。例如,以下語句實現了一個inner join邏輯,將商品信息表拆分成2個表:
select * from
(
select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
from w left outer join i sampletable(1 out of 2 on id) i1
)
union all
(
select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat
from w left outer join i sampletable(1 out of 2 on id) i2
)
);
以下語句實現了left outer join邏輯:
select t1.id, t1.time, t1.amount,
coalease(t1.name, t2.name),
coalease(t1.loc, t2.loc),
coalease(t1.cat, t2.cat)
from (
select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
from w left outer join i sampletable(1 out of 2 on id) i1
) t1 left outer join i sampletable(2 out of 2 on id) t2;
上述語句使用Hive的sample table特性對表做切分。
7.11 合並小文件
文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的結果文件來消除這樣的影響:
hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True
hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False
hive.merge.size.per.task = 256*1000*1000 合並文件的大小
7.12 Group By
7.12.1 Map 端部分聚合:
並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果。
基於 Hash
參數包括:
- hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True
- hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目
7.12.2 有數據傾斜的時候進行負載均衡
hive.groupby.skewindata = false
當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
8. Hive實戰(實戰中所需表由數據倉庫建模時提供)
8.1 互聯網公司Order表相關常見Hql分析
8.1.1 管理報表-訂單基本統計信息(日粒度)
查看每個城市每天完成訂單數,取消訂單數,下單訂單數,下單用戶數。
select city_id,sum(case when order_status=5 then 1 else 0 end) as cnt_ord_succ_d,
sum(case when order_status=3 then 1 else 0 end) as cnt_ord_cacel_d,
sum(1) as cnt_ord_d,
count(distinct CUST_ID) as cnt_ord_user
FROM dw.dw_order WHERE dt='${day_01}'
group by city_id;
8.1.2 運營團隊-訂單數據信息(活動分析)
8.1.3 商品復購率Hql分析
需求列出的商品的7日,15日,30復購率,目的了解這幾款商品的周期.
計算口徑:當日購買部分商品的用戶數/7日重復購買此商品的用戶數。
每天查看每個城市每個商品當日購買用戶數,7日15日30日復購率。
SELECT t3.atdate AS cdate,t3.city_id,t3.goods_id,
COUNT(DISTINCT CASE WHEN days=0 THEN t3.cust_id END) AS cnt_buy_cust_d,
COUNT(DISTINCT CASE WHEN days>0 AND days<=7 THEN t3.cust_id END) AS cnt_buy_cust_7_d,
COUNT(DISTINCT CASE WHEN days>0 AND days<=15 THEN t3.cust_id END) AS cnt_buy_cust_15_d,
COUNT(DISTINCT CASE WHEN days>0 AND days<=30 THEN t3.cust_id END) AS cnt_buy_cust_30_d
FROM (
SELECT t1.atdate,t1.city_id,t1.cust_id,t1.goods_id,
DATEDIFF(t2.atdate, t1.atdate) days
FROM (
SELECT o.order_date AS atdate,o.city_id,
o.cust_id,og.goods_id
FROM dw.dw_order o INNER JOIN dw.dw_order_goods og
ON o.order_id=og.order_id
AND o.ORDER_STATUS = 5
AND og.source_id=1
AND o.dt = '20151010'
) t1 INNER JOIN (
SELECT o.order_date AS atdate,o.city_id,
o.cust_id,og.goods_id,
og.goods_name
FROM dw.dw_order o INNER JOIN dw.dw_order_goods og
ON o.order_id=og.order_id
AND o.ORDER_STATUS = 5
AND og.source_id=1
) t2 ON t1.cust_id=t2.cust_id AND t1.goods_id=t2.goods_id
) t3 GROUP BY t3.atdate,t3.city_id,t3.goods_id;
8.1.4 月平均日客戶數Hql分析
目前有一個合作資源,北京某度假酒店,價值幾百到8000不等的酒店套房,一共100套,可以給到購買200元以上訂單用戶,用於抽獎獎品,比如設置的獲獎條件:凡在9月,10月,11月的用戶,下單200元以上的訂單,即可獲得北京某度假酒店。目的帶動銷量,刺激用戶參與活動,同時給合作方導流。
合作方需要知道我們訂單金額在200以上的每天平均的用戶量是多少.
#客戶id是int類型 需注意用count
SELECT
SUM(CASE WHEN t.COMPLETION_DATE>='20151001' AND t.COMPLETION_DATE<='20151031' THEN 1 ELSE 0 END) AS cnt_ord_10_m
,COUNT(DISTINCT CASE WHEN t.COMPLETION_DATE>='20151001' AND t.COMPLETION_DATE<='20151031' THEN CUST_ID END) AS cnt_cust_10_m
FROM dw.dw_order t
WHERE t.COMPLETION_DATE>='20151001'
AND t.COMPLETION_DATE<='20151031'
AND CITY_ID=2
AND ORDER_TYPE <>6
AND PAYABLE_AMOUNT>100
AND t.ORDER_STATUS=5;
8.2 互聯網公司User相關常見Hql分析
8.2.1用戶總體數據信息Hql分析(歷史累計)
求每個用戶累計訂單數,累計應付金額
select nvl(t1.cust_id,t2.cust_id),
nvl(t2.order_cnt,0)+nvl(t1.order_cnt,0) as order_cnt,
nvl(t2.amount_sum,0)+nvl(t1.amount_sum,0) as amount_sum
from dw.dw_customer t1
full outer join (
select cust_id,count(1) as order_cnt,sum(payable_amount) as amount_sum from
dw.dw_order where dt='20151011' and order_status=5
group by cust_id
) t2 on t1.cust_id=t2.cust_id
and t1.dt=20151210 limit 100;
8.2.2新用戶統計信息Hql分析(日粒度)
select count(1) from dw.dw_customer
where dt='20151210' and
from_unixtime(unix_timestamp(register_time,'yyyy/MM/dd HH:mm'),'yyyyMMdd')='20140610';
8.2.3求5,6月各個渠道帶來的新用戶,以此來考核運營部門的kpi
select source_no,count(1) from dw.dw_customer
where dt=20151211 and
from_unixtime(unix_timestamp(register_time,'yyyy/MM/dd HH:mm'),'yyyyMMdd')>='20141201'
and from_unixtime(unix_timestamp(register_time,'yyyy/MM/dd HH:mm'),'yyyyMMdd')<='20150131'
and source_no is not null
group by source_no;
8.2.4統計各個渠道帶來的用戶,top10完成訂單數
#主要掌握求top 10 hql
select source_no,mobile,order_cnt,rn from (
select source_no,order_cnt,mobile,
row_number() over(partition by source_no order by order_cnt desc) as rn
from dw.dw_customer
where dt=20151211 and source_no is not null and order_cnt is not null
) t2 where rn <10;
8.2.4 分時段提取僵屍用戶數據Hql分析
僵屍用戶口徑定義:注冊后未下單及未成功下過訂單的用戶
select count(1) from dw.dw_customer where last_order_time is null;
8.3 大胃王項目數據統計
8.3.1 2015年消費賬單,打敗了多少人
8.3.2 2015年購買商品的top 10
9. ETL任務調度設計
9.1通過Liunx Crontab實現任務調度
9.1.1 Shell腳步實現ETL作業之間依賴關系
9.1.2 Hive ETL作業開發模板
9.1.3 Crontab配置模板
對於每一個表(table)或者分區, Hive可以進一步組織成桶,也就是說桶是更為細粒度的數據范圍划分。Hive也是 針對某一列進行桶的組織。Hive采用對列值哈希,然后除以桶的個數求余的方式決定該條記錄存放在哪個桶當中。
把表(或者分區)組織成桶(Bucket)有兩個理由:
(1)獲得更高的查詢處理效率。桶為表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,連接兩個在(包含連接列的)相同列上划分了桶的表,可以使用 Map 端連接 (Map-side join)高效的實現。比如JOIN操作。對於JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了桶操作。那么將保存相同列值的桶進行JOIN操作就可以,可以大大較少JOIN的數據量。
(2)使取樣(sampling)更高效。在處理大規模數據集時,在開發和修改查詢的階段,如果能在數據集的一小部分數據上試運行查詢,會帶來很多方便。
ADD是代表新增一字段,字段位置在所有列后面(partition列前),REPLACE則是表示替換表中所有字段。
1、order by 會對輸入做全局排序,因此只有一個reducer,會導致當輸入規模較大時,需要較長的計算時間。
2、sort by不是全局排序,其在數據進入reducer前完成排序。因此,如果用sort by進行排序,並且設置mapred.reduce.tasks>1,
則sort by只保證每個reducer的輸出有序,不保證全局有序。
3、distribute by根據distribute by指定的內容將數據分到同一個reducer。
4、Cluster by 除了具有Distribute by的功能外,還會對該字段進行排序。因此,常常認為cluster by = distribute by + sort by
Hive數據導入導出
Hive三種不同的數據導出的方式
(1) 導出到本地文件系統
insert overwrite local directory '/home/anjianbing/soft/export_data/app_order_city_d'
row format delimited
fields terminated by '\t'
select * from app.app_order_city_d limit 10;
通過insert overwrite local directory將hive中的表app_order_city_d的內容到本地文件系統的/home/anjianbing/soft/export_data下的app_order_city_d目錄下,這條HQL的執行需要啟用Mapreduce完成,
運行完這條語句之后,將會在本地文件系統的/home/anjianbing/soft/export_data/app_order_city_d目錄下生成文件,這個文件是Reduce產生的結果(這里生成的文件名是000000_0)
(2) 導出到hive的另一個表中
insert into table hive_student_test
select id,name,sex,salary from student;
(3) $ hive -e "select * from employees" >> export/out.txt
Hive Load方式加載數據
load data local inpath '/home/joe/Desktop/employees.tsv' overwrite into table employees;
Sqoop與關系型數據庫導入導出命令
1.列出所有數據庫
$SQOOP_HOME/bin/sqoop list-databases --connect jdbc:mysql://192.168.209.1:3306/ --username root --password 123456
2.mysql導出表到hive並創建表(默認default庫,如需設置路徑:--warehouse-dir /user/hive/warehouse/t_etl_log)
$SQOOP_HOME/bin/sqoop import --connect jdbc:mysql://192.168.209.1:3306/world --username root --password 123456 --table city --hive-import -m 1
3.hdfs導出到mysql
$SQOOP_HOME/bin/sqoop export --connect jdbc:mysql://192.168.209.1:3306/world --username root --password 123456 --table city --fields-terminated-by '\001' --export-dir /user/hive/warehouse/city
MySQL
一、 把 employees.tsv 文件數據導入數據表:
$ mysql --local-infile –u hadoopuser –p
mysql> load data local infile '/home/joe/Desktop/employees.tsv' into table employees fields
terminated by '\t' lines terminated by '\n';
二、 利用 MySQL 導出數據:
mysql> select * from employees into outfile '/tmp/mysql_out.csv' fields terminated by ',' lines
terminated by '\n';
Hive常用操作命令
創建數據庫
>create database db_name;
>create database if not exists db_name;//創建一個不存在的數據庫final
查看數據庫
>show databases;
選擇性查看數據庫
>show databases like 'f.*';
查看某一個數據庫的詳細信息
>describe database db_name;
刪除非空數據庫
>drop database db_name CASCADE;
創建數據庫時,指定數據庫位置
>create database db_name location '/home/database/'
創建數據庫的時候希望能夠給數據庫增加一些描述性東西
>create database db_name comment 'helloworld';
創建數據庫的時候,需要為數據庫增加屬性信息,可以使用with dbproperties信息
>create database db_name with dbproperties<'createor'='hello','date'='2018-3-3');
如果要使用自己已經存在的數據庫
>use db_name;
修改數據庫的屬性信息
>alter database db_name set dbproperties('edited-by'='hello');
創建表
>create table tab_name(id int,name string) row format delimited fields terminated by '\t';
創建一個表,該表和已有的某一個表的結構一樣(復制表結構)
>create table if not exists emp like employeel;
查看當前數據庫下的所有表
>show tables;
刪除一個已經存在的表
>drop table employee;
修改一個表明,重命名
>alter table old_name rename to new_name;
將hdfs上面的文件信息導入到hive表中(/home/bigdata代表文件在在HDFS上位置)使用改命令時一定要注意數據與數據之間在txt文件編輯的時候一定要Tab間隔
>load data local inpath '/home/bigdata' into table hive.dep;
修改某一個表的某一列的信息
>alter table tab_name change column key key_1 int comment 'h' after value;
給某一個表增加某一列的信息
>alter table tab_name add columns(value1 string,value2 string);
如果想替換表中的某一個列
>alter table tab_name replace columns(values string,value11 string);
修改表中某一列的屬性
>alter table tab_name set tblproperties('value'='hello');
hive成批向某一表插入數據
>insert overwrite table tab_name select * from tab_name2;
將 查詢結果保留到一個新表中去
>create table tab_name as select * from t_name2;
將查詢結果保存到指定的文件目錄(可以是本地,也可以HDFS)
>insert overwrite local directory '/home/hadoop/test' select *from t_name;
>insert overwrite directory '/aaa/bbb/' select *from t_p;
兩表內連
>select *from dual a join dual b on a.key=b.key;
將hive查詢結果輸出到本地特定目錄
insert overwrite local directory '/home/bigdata1/mydir' select *from test;
將hive查詢結果輸出到HDFS特定目錄
insert overwrite directory '/home/mydir' select *from test;
Hive和HBase區別
1. 兩者分別是什么?
Apache Hive是一個構建在Hadoop基礎設施之上的數據倉庫。通過Hive可以使用HQL語言查詢存放在HDFS上的數據。HQL是一種類SQL語言,這種語言最終被轉化為Map/Reduce. 雖然Hive提供了SQL查詢功能,但是Hive不能夠進行交互查詢--因為它只能夠在Haoop上批量的執行Hadoop。
Apache HBase是一種Key/Value系統,它運行在HDFS之上。和Hive不一樣,Hbase的能夠在它的數據庫上實時運行,而不是運行MapReduce任務。Hive被分區為表格,表格又被進一步分割為列簇。列簇必須使用schema定義,列簇將某一類型列集合起來(列不要求schema定義)。例如,“message”列簇可能包含:“to”, ”from” “date”, “subject”, 和”body”. 每一個 key/value對在Hbase中被定義為一個cell,每一個key由row-key,列簇、列和時間戳。在Hbase中,行是key/value映射的集合,這個映射通過row-key來唯一標識。Hbase利用Hadoop的基礎設施,可以利用通用的設備進行水平的擴展。
2. 兩者的特點
Hive幫助熟悉SQL的人運行MapReduce任務。因為它是JDBC兼容的,同時,它也能夠和現存的SQL工具整合在一起。運行Hive查詢會花費很長時間,因為它會默認遍歷表中所有的數據。雖然有這樣的缺點,一次遍歷的數據量可以通過Hive的分區機制來控制。分區允許在數據集上運行過濾查詢,這些數據集存儲在不同的文件夾內,查詢的時候只遍歷指定文件夾(分區)中的數據。這種機制可以用來,例如,只處理在某一個時間范圍內的文件,只要這些文件名中包括了時間格式。
HBase通過存儲key/value來工作。它支持四種主要的操作:增加或者更新行,查看一個范圍內的cell,獲取指定的行,刪除指定的行、列或者是列的版本。版本信息用來獲取歷史數據(每一行的歷史數據可以被刪除,然后通過Hbase compactions就可以釋放出空間)。雖然HBase包括表格,但是schema僅僅被表格和列簇所要求,列不需要schema。Hbase的表格包括增加/計數功能。
3. 限制
Hive目前不支持更新操作。另外,由於hive在hadoop上運行批量操作,它需要花費很長的時間,通常是幾分鍾到幾個小時才可以獲取到查詢的結果。Hive必須提供預先定義好的schema將文件和目錄映射到列,並且Hive與ACID不兼容。
HBase查詢是通過特定的語言來編寫的,這種語言需要重新學習。類SQL的功能可以通過Apache Phonenix實現,但這是以必須提供schema為代價的。另外,Hbase也並不是兼容所有的ACID特性,雖然它支持某些特性。最后但不是最重要的--為了運行Hbase,Zookeeper是必須的,zookeeper是一個用來進行分布式協調的服務,這些服務包括配置服務,維護元信息和命名空間服務。
4. 應用場景
Hive適合用來對一段時間內的數據進行分析查詢,例如,用來計算趨勢或者網站的日志。Hive不應該用來進行實時的查詢。因為它需要很長時間才可以返回結果。
Hbase非常適合用來進行大數據的實時查詢。Facebook用Hbase進行消息和實時的分析。它也可以用來統計Facebook的連接數。
5. 總結
Hive和Hbase是兩種基於Hadoop的不同技術--Hive是一種類SQL的引擎,並且運行MapReduce任務,Hbase是一種在Hadoop之上的NoSQL 的Key/vale數據庫。當然,這兩種工具是可以同時使用的。就像用Google來搜索,用FaceBook進行社交一樣,Hive可以用來進行統計查詢,HBase可以用來進行實時查詢,數據也可以從Hive寫到Hbase,設置再從Hbase寫回Hive。