1. 概述
1.1 hive的特征:
- 可以通過SQL輕松訪問數據的工具,從而實現數據倉庫任務,如提取/轉換/加載(ETL),報告和數據分析;
- 它可以使已經存儲的數據結構化;
- 可以直接訪問存儲在Apache HDFS或其他數據存儲系統(如Apache HBase)中的文件;
- Hive除了支持MapReduce計算引擎,還支持Spark和Tez這兩種分布式計算引擎;
- 它提供類似sql的查詢語句HiveQL對數據進行分析處理;
- 數據的存儲格式有多種,比如數據源是二進制格式,普通文本格式等等;
1.2 hive的優勢:
hive強大之處不要求數據轉換成特定的格式,而是利用hadoop本身InputFormat API來從不同的數據源讀取數據,同樣地使用OutputFormat API將數據寫成不同的格式。所以對於不同的數據源,或者寫出不同的格式就需要不同的對應的InputFormat和OutputFormat類的實現。以stored as textFile為例,其在底層java API中表現是輸入InputFormat格式:TextInputFormat以及輸出OutputFormat格式:HiveIgnoreKeyTextOutputFormat。這里InputFormat中定義了如何對數據源文本進行讀取划分,以及如何將切片分割成記錄存入表中。而OutputFormat定義了如何將這些切片寫回到文件里或者直接在控制台輸出。
Hive擁有統一的元數據管理,所以和Spark、Impala等SQL引擎是通用的。通用是指,在擁有了統一的metastore之后,在Hive中創建一張表,在Spark/Impala中是能用的;反之在Spark中創建一張表,在Hive中也是能用的,只需要共用元數據,就可以切換SQL引擎,涉及到了Spark sql和Hive On Spark。
不僅如此Hive使用SQL語法,提供快速開發的能力,還可以通過用戶定義的函數(UDF),用戶定義的聚合(UDAF)和用戶定義的表函數(UDTF)進行擴展,避免了去寫mapreducce,減少開發人員的學習成本。Hive中不僅可以使用逗號和制表符分隔值(CSV/TSV)文本文件,還可以使用Sequence File、RC、ORC、Parquet(知道這幾種存儲格式的區別)。當然Hive還可以通過用戶來自定義自己的存儲格式,基本上前面說到幾種格式完全夠了。Hive旨在最大限度地提高可伸縮性(通過向Hadoop集群動態田間更多機器擴展),性能,可擴展性,容錯性以及與其輸入格式的松散耦合。
數據離線處理,比如日志分析,海量數據結構化分析。
2. Hive函數
Hive的SQL還可以通過用戶定義的函數(UDF),用戶定義的聚合(UDAF)和用戶定義的表函數(UDTF)進行擴展。
當Hive提供的內置函數無法滿足你的業務處理需要時,此時就可以考慮使用用戶自定義函數(UDF)。
UDF、UDAF、UDTF的區別:
- UDF(User-Defined-Function)一進一出
- UDAF(User-Defined Aggregation Funcation)聚集函數,多進一出
- UDTF(User-Defined Table-Generating Functions)一進多出,如lateral view explore()
3. Hive優化
3.1 慎用api
我們知道大數據場景下不害怕數據量大,害怕的是數據傾斜,怎樣避免數據傾斜,找到可能產生數據傾斜的函數尤為關鍵,數據量較大的情況下,慎用count(distinct),count(distinct)容易產生傾斜問題。
3.2 自定義UDAF函數優化
sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端匯總合並優化,是數據傾斜不成問題。
3.3 設置合理的map reduce的task數量
3.3.1 map階段優化
mapred.min.split.size: 指的是數據的最小分割單元大小;min的默認值是1B mapred.max.split.size: 指的是數據的最大分割單元大小;max的默認值是256MB 通過調整max可以起到調整map數的作用,減小max可以增加map數,增大max可以減少map數。 需要提醒的是,直接調整mapred.map.tasks這個參數是沒有效果的。
舉例:
a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128M的塊和1個12M的塊),從而產生7個map書;
b) 假設input目錄下有3個文件a,b,c,大小分別為10M,20M,130M,那么hadoop會分隔成4個塊(10M,20M,128M,2M),從而產生4個map數;
注意:如果文件大於塊大小(128M),那么會拆分,如果小於塊大小,則把該文件當成一個塊。
其實這就涉及到小文件的問題:如果一個任務有很多小文件(遠遠小於塊大小128M),則每個小文件也會當做一個塊,用一個map任務來完成。
而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。那么,是不是保證每個map處理接近128M的文件塊,就高枕無憂了?答案也是不一定。比如有一個127M的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。
我們該如何去解決呢???
我們需要采取兩種方式來解決:即減少map數和增加map數
- 減少map數量
假設一個SQL任務: Select count(1) from popt_tbaccountcopy_meswhere pt = '2012-07-04'; 該任務的inputdir : /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 共有194個文件,其中很多事遠遠小於128M的小文件,總大小9G,正常執行會用194個map任務。 Map總共消耗的計算資源:SLOTS_MILLIS_MAPS= 623,020 通過以下方法來在map執行前合並小文件,減少map數: set mapred.max.split.size=100000000; set mapred.min.split.size.per.node=100000000; set mapred.min.split.size.per.rack=100000000; set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 再執行上面的語句,用了74個map任務,map消耗的計算資源:SLOTS_MILLIS_MAPS= 333,500 對於這個簡單SQL任務,執行時間上可能差不多,但節省了一半的計算資源。 大概解釋一下,100000000表示100M, set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;這個參數表示執行前進行小文件合並, 前面三個參數確定合並文件塊的大小,大於文件塊大小128m的,按照128m來分隔, 小於128m,大於100m的,按照100m來分隔,把那些小於100m的(包括小文件和分隔大文件剩下的), 進行合並,最終生成了74個塊。
- 增大map數量
如何適當的增加map數? 當input的文件都很大,任務邏輯復雜,map執行非常慢的時候,可以考慮增加Map數, 來使得每個map處理的數據量減少,從而提高任務的執行效率。 假設有這樣一個任務: Select data_desc, count(1), count(distinct id), sum(case when ...), sum(case when ...), sum(...) from a group by data_desc 如果表a只有一個文件,大小為120M,但包含幾千萬的記錄,如果用1個map去完成這個任務,肯定是比較耗時的, 這種情況下,我們要考慮將這一個文件合理的拆分成多個, 這樣就可以用多個map任務去完成。 set mapred.reduce.tasks=10; create table a_1 as select * from a distribute by rand(123); 這樣會將a表的記錄,隨機的分散到包含10個文件的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務去完成。 每個map任務處理大於12M(幾百萬記錄)的數據,效率肯定會好很多。
注意:看上去,貌似這兩種有些矛盾,一個是要合並小文件,一個是要把大文件拆成小文件,這點正是重點需要關注的地方,使單個map任務處理合適的數據量;
3.3.2 reduce階段優化
Reduce的個數對整個作業的運行性能有很大影響。如果Reduce設置的過大,那么將會產生很多小文件,對NameNode會產生一定的影響,而且整個作業的運行時間未必會減少;如果Reduce設置的過小,那么單個Reduce處理的數據將會加大,很可能會引起OOM異常。
如果設置了mapred.reduce.tasks/mapreduce.job.reduces參數,那么Hive會直接使用它的值作為Reduce的個數;如果mapred.reduce.tasks/mapreduce.job.reduces的值沒有設置(也就是-1),那么Hive會根據輸入文件的大小估算出Reduce的個數。根據輸入文件估算Reduce的個數可能未必很准確,因為Reduce的輸入是Map的輸出,而Map的輸出可能會比輸入要小,所以最准確的數根據Map的輸出估算Reduce的個數。
1. Hive自己如何確定reduce數:
reduce個數的設定極大影響任務執行效率,不指定reduce個數的情況下,Hive會猜測確定一個reduce個數,基於以下兩個設定:
hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的數據量,默認為1000^3=1G)
hive.exec.reducers.max(每個任務最大的reduce數,默認為999)
計算reducer數的公式很簡單N=min(參數2,總輸入數據量/參數1)
即,如果reduce的輸入(map的輸出)總大小不超過1G,那么只會有一個reduce任務;
如:select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 總大小為9G多, 因此這句有10個reduce
2. 調整reduce個數方法一:
調整hive.exec.reducers.bytes.per.reducer參數的值;
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
select pt, count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;
這次有20個reduce
3. 調整reduce個數方法二:
set mapred.reduce.tasks=15;
select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;
這次有15個reduce
4. reduce個數並不是越多越好;
同map一樣,啟動和初始化reduce也會消耗時間和資源;
另外,有多少個reduce,就會有個多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題;
5. 什么情況下只有一個reduce;
很多時候你會發現任務中不管數據量多大,不管你有沒有調整reduce個數的參數,任務中一直都只有一個reduce任務;其實只有一個reduce任務的情況,除了數據量小於hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:
- 沒有group by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt; 寫成select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’; 這點非常常見,希望大家盡量改寫。
- 用了Order by
- 有笛卡爾積。
注意:在設置reduce個數的時候也需要考慮這兩個原則:使大數據量利用合適的reduce數;是單個reduce任務處理合適的數據量;
3.4 小文件合並優化
我們知道文件數目小,容易在文件存儲端造成瓶頸,給HDFS帶來壓力,影響處理效率。對此,可以通過合並Map和Reduce的結果文件來消除這樣的影響。
用於設置合並的參數有:
-
- 是否合並Map輸出文件:hive.merge.mapfiles=true(默認值為true)
- 是否合並Reduce端輸出文件:hive.merge.mapredfiles=false(默認值為false)
- 合並文件的大小:hive.merge.size.per.task=256*1000*1000(默認值為256000000)
3.4.1 Hive優化之小文件問題及其解決方案:
小文件是如何產生的:
-
- 動態分區插入數據,產生大量的小文件,從而導致map數量劇增;
- reduce數量越多,小文件也越多(reduce的個數和輸出文件是對應的);
- 數據源本身就包含大量的小文件。
小文件問題的影響:
-
- 從Hive的角度看,小文件會開很多map,一個map開一個JVM去執行,所以這些任務的初始化,啟動,執行會浪費大量的資源,嚴重影響性能。
- 在HDFS中,每個小文件對象約占150byte,如果小文件過多會占用大量內存。這樣NameNode內存容量嚴重制約了集群的擴展。
小文件問題的解決方案:
從小文件產生的途徑就可以從源頭上控制小文件數量,方法如下:
-
- 使用Sequencefile作為表存儲格式,不要用textfile,在一定程度上可以減少小文件;
- 減少reduce的數量(可以使用參數進行控制);
- 少用動態分區,用時記得按distribute by分區;
對於已有的小文件,我們可以通過以下幾種方案解決:
-
- 使用hadoop archive命令把小文件進行歸檔;
- 重建表,建表時減少reduce數量;
- 通過參數進行調節,設置map/reduce端的相關參數,如下:
//每個Map最大輸入大小(這個值決定了合並后文件的數量) set mapred.max.split.size=256000000; //一個節點上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合並) set mapred.min.split.size.per.node=100000000; //一個交換機下split的至少的大小(這個值決定了多個交換機上的文件是否需要合並) set mapred.min.split.size.per.rack=100000000; //執行Map前進行小文件合並 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 設置map輸出和reduce輸出進行合並的相關參數: [java] view plain copy //設置map端輸出進行合並,默認為true set hive.merge.mapfiles = true //設置reduce端輸出進行合並,默認為false set hive.merge.mapredfiles = true //設置合並文件的大小 set hive.merge.size.per.task = 256*1000*1000 //當輸出文件的平均大小小於該值時,啟動一個獨立的MapReduce任務進行文件merge。 set hive.merge.smallfiles.avgsize=16000000
3.5 SQL優化
3.5.1 列裁剪
Hive在讀數據的時候,可以只讀取查詢中所需要用到的列,而忽略其他列。例如,若有以下查詢:
SELECT a,b FROM q WHERE e<10;
在實施此項查詢中,Q表有5列(a,b,c,d,e),Hive只讀取查詢邏輯中真實需要的3列a、b、e, 而忽略列c,d;這樣做節省了讀取開銷,中間表存儲開銷和數據整合開銷。
裁剪對應的參數項為:hive.optimize.cp=true(默認值為真)
3.5.2 分區裁剪
可以在查詢的過程中減少不必要的分區。例如,若有以下查詢:
SELECT * FROM (SELECT a1, COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; # (多余分區) SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;
查詢語句若將"subq.prtn=100"條件放入子查詢中更為高效,可以減少讀入的分區數目。Hive自動執行這種裁剪優化。
分區參數為:hive.optimize.pruner=true(默認值為真)
3.5.3 熟練使用SQL提高查詢
熟練地使用SQL,能寫出高效率的查詢語句。
場景:有一張user表,為賣家每天收到表,user_id,ds(日期)為key,屬性有主營類目,指標有交易金額,交易筆數。每天要取前10天的總收入,總筆數,和最近一天的主營類目。
解決方法 1 如下所示:常用方法
INSERT OVERWRITE TABLE t1 SELECT user_id, substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users WHERE ds=20120329 // 20120329 為日期列的值,實際代碼中可以用函數表示當天日期GROUP BY user_id; INSERT OVERWRITE TABLE t2 SELECT user_id,sum(qty) AS qty, SUM(amt) AS amt FROM users WHERE ds BETWEEN 20120301 AND 20120329 GROUP BY user_id; SELECT t1.user_id, t1.main_cat, t2.qty, t2.amt FROM t1 JOIN t2 ON t1.user_id=t2.user_id
下面給出方法1的思路,實現步驟如下:
第一步:利用分析函數,取每個user_id最近一天的主營類目,存入臨時表t1;
第二步:匯總10天的總交易金額,交易筆數,存入臨時表t2;
第三步:關聯t1、t2,得到最終的結果。
解決方法 2 如下所示:優化方法
SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users WHERE ds BETWEEN 20120301 AND 20120329 GROUP BY user_id
在工作中我們總結出:方案2的開銷等於方案1的第二步開銷,性能提升,由原有的25分鍾完成,縮短為10分鍾以內完成。節省了兩個臨時表的讀寫是一個關鍵原因,這種方式也適用於Oracle中的數據查找工作。
SQL具有普適性,很多SQL通用的優化方案在Hadoop分布式計算方式中也可以達到效果。
3.5.5 不同數據類型關聯產生的傾斜問題
問題:不同數據類型id的關聯會產生數據傾斜問題。
一張表的s8的日志,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題。s8的日志中有32位字符串商品id,也有數值商品id,日志中類型是string的,但商品中的數值id是bigint的。猜想問題的原因是把s8的商品id轉成數值id做hash來分配Reduce,所以字符串id的s8日志,都到一個Reduce上了,解決的方法驗證了這個猜測。
解決方法:把數據類型轉換成字符串類型
SELECT * FROM s8_log a LEFT OUTER
JOIN r_auction_auctions b ON a.auction_id=CAST(b.auction_id AS STRING)
調優結果顯示:數據表處理由1小時30分鍾經代碼調整后可以在20分鍾內完成。
3.5.6 利用Hive對UNION ALL優化的特性
多表union all會優化成一個job。
問題:比如推廣效果表要和商品表關聯,效果表中的auction_id列既有32位字符串商品id,也有數字id,和商品表關聯得到商品的信息。
解決方法:Hive SQL性能會比較好
SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION ALL
SELECT auction_string_id AS auction_id FROM auctions) b
ON a.auction_id=b.auction_id
比分別過濾數字id,字符串id然后分別和商品表關聯性能要好。
這樣寫的好處:1個MapReduce作業,商品表只讀一次,推廣效果表只讀取一次。把這個SQL換成Map/Reduce代碼的話,Map的時候,把a表的記錄打上標簽a,商品表記錄每讀取一條,打上標簽b,變成兩個<key, value>對,<(b,數字id),value>,<(b,字符串id),value>。
所以商品表的HDFS讀取只會是一次。
3.5.7 解決Hive對UNION ALL優化的短板
Hive對union all的優化的特性:對union all優化只局限於非嵌套查詢
- 消滅子查詢內的group by
示例1:子查詢內有group by
SELECT * FROM (SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3) t3 GROUP BY c1,c2,c3
從業務邏輯上說,子查詢內的GROUP BY怎么看都是多余(功能上的多余,除非有COUNT(DISTINCT)),如果不是因為Hive Bug或者性能上的考量(曾經出現如果不執行子查詢GROUP BY,數據得不到正確的結果的Hive Bug)。所以這個Hive按經驗轉換成如下所示:
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2) t3 GROUP BY c1,c2,c3
調優結果:經過測試,並未出現union all的Hive Bug,數據是一致的。MapReduce的作業數由3減少到1。
t1相當於一個目錄,t2相當於一個目錄,對Map/Reduce程序來說,t1、t2可以作為Map/Reduce作業的mutli inputs。這可以通過一個Map/Reduce來解決這個問題。Hadoop的計算框架,不怕數據多,就怕作業數多。
但如果換成是其他計算平台如Oracle,那就不一定了,因為把大輸入拆成兩個輸入,分別排序匯總成merge(假如兩個子排序是並行的話),是有可能性能更優的(比如希爾排序比冒泡排序的性能更優)。
- 消滅子查詢內的COUNT(DISTINCT),MAX,MIN
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT c1,c2,c3 count(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3 GROUP BY c1,c2,c3
由於子查詢里頭有COUNT(DISTINCT)操作,直接去GROUP BY將達不到業務目標。這時采用臨時表消滅COUNT(DISTINCT)作業不但能解決傾斜問題,還能有效減少jobs。
INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3; SELECT c1,c2,c3,SUM(income),SUM(uv) FROM (SELECT c1,c2,c3,income,0 AS uv FROM t1 UNION ALL SELECT c1,c2,c3,0 AS income, 1 AS uv FROM t2) t3 GROUP BY c1,c2,c3;
job數是2,減少一半,而且兩次Map/Reduce比COUNT(DISTINCT)效率更高。
調優結果:千萬級別的類目表,member表,與10億級的商品表關聯。原先1963s的任務經過調整,1152s即完成。
- 消滅子查詢內的JOIN
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x GROUP BY c1,c2;
上面代碼運行會有5個jobs。加入先JOIN生存臨時表的話t5,然后UNION ALL,會變成2個jobs。
INSERT OVERWRITE TABLE t5 SELECT * FROM t2 JOIN t3 ON t2.id=t3.id; SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);
調優結果顯示:針對千萬級別的廣告位表,由原先5個Job共15分鍾,分解為2個job,一個8-10分鍾,一個3分鍾。
3.5.8 COUNT(DISTINCT)
計算uv的時候,經常會用到COUNT(DISTINCT),但在數據比較傾斜的時候COUNT(DISTINCT)會比較慢。這時可以嘗試用GROUP BY改寫代碼計算uv。數據量小的時候無所謂,數據量大的情況下,由於COUNT DISTINCT操作需要用一個Reduce Task來完成,這一個Reduce需要處理的數據量太大,就會導致整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:
- 原有代碼
① INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329) SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid; ② select count(distinct id) from bigtable;
關於COUNT(DISTINCT)的數據傾斜問題不能一概而論,要依情況而定,下面是我測試的一組數據:
測試數據:169857條
① #統計每日IP CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS FROM logdfs WHERE logdate='2014_12_29'; 耗時:24.805 seconds #統計每日IP(改造) CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp; 耗時:46.833 seconds ② select count(id) from (select id from bigtable group by id) a;
測試結果表明:明顯改造后的語句比之前耗時,這時因為改造后的語句有2個SELECT,多了一個job,這樣在數據量小的時候,數據不會存在傾斜問題。
3.5.9 JOIN操作
3.5.9.1 小表、大表JOIN
在使用寫有Join操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在Join操作符的左邊。原因是在Join操作的Reduce階段,位於Join操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生OOM錯誤的幾率;再進一步,可以使用Group讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce。
實際測試發現:新版的hive已經對小表JOIN大表和大表JOIN小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。
案例實操:
(1)關閉mapjoin功能(默認是打開的)
set hive.auto.convert.join=false;
(2)執行小表JOIN大表語句
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s left join bigtable b on b.id = s.id;
Time taken: 35.921 seconds
(3)執行大表JOIN大表語句
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable b left join smalltable s on s.id = b.id;
Time taken: 34.196 seconds;
3.5.9.2 大表JOIN大表
1)空Key過濾
問題:日志中常會出現信息丟失,比如每日約為20億的全網日志,其中的user_id為主鍵,在日志收集過程中會丟失,出現主鍵為null的情況,如果取其中的user_id和bmw_users關聯,就會碰到數據傾斜的問題。原因是Hive中,主鍵為null值的項會被當做相同的Key而分配進同一個計算Map。
解決方法1:user_id為空的不參與關聯,子查詢過濾null
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL
解決方法2 如下所示:函數過濾null
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive', RAND()) ELSE a.user_id END = b.user_id;
調優結果:原先由於數據傾斜導致運行時長超過1小時,解決方法1運行每日平均時長25分鍾,解決方法2運行的每日平均時長在20分鍾左右。優化效果很明顯。
我們在工作中總結出:解決方法2比解決方法1效果更好,不但IO少了,而且作業數也少了。解決方法1中log讀取兩次,job數為2。解決方法2中job數是1。這個優化適合無效id(比如-99,‘’,null等)產生的傾斜問題。把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不同的Reduce上,從而解決數據傾斜問題。因為空值不參與關聯,即使分到不同的Reduce上,也不會影響最終的結果。附上Hadoop通用關聯的實現方法是:關聯通過二次排序實現的,關聯的列為partition key,關聯的列和表的tag組成排序的group key,根據partition key分配Reduce。同一Reduce內根據group key排序。
3.5.9.3 MAP JOIN操作
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。可以用MapJoin把小表全部加載到內存在map端進行join,避免reducer處理。
- 開啟MapJoin參數設置:
1) 設置自動選擇MapJoin
set hive.auto.convert.join = true;默認為true
2) 大表小表的閥值設置(默認25M一下認為是小表):
set hive.mapjoin.smalltable.filesize=25000000;
- MapJoin工作機制
上圖是Hive MapJoin的原理圖,從圖中可以看出MapJoin分為兩個階段:
(1)通過MapReduce Local Task,將小表讀入內存,生成內存HashTableFiles上傳至Distributed Cache中,這里會對HashTableFiles進行壓縮。
(2)MapReduce Job在Map階段,每個Mapper從Distributed Cache讀取HashTableFiles到內存中,順序掃描大表,在Map階段直接進行Join,將數據傳遞給下一個MapReduce任務。也就是在map端進行join避免了shuffle。
Join操作在Map階段完成,不再需要Reduce,有多少個Map Task,就有多少個結果文件。
實例:
(1)開啟MapJoin功能
set hive.auto.convert.join = true; 默認為true
(2)執行小表JOIN大表語句
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s join bigtable b on s.id = b.id;
Time taken: 24.594 seconds
(3)執行大表JOIN小表語句
insert overwrite table jointable select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from bigtable b join smalltable s on s.id = b.id;
Time taken: 24.315 seconds
3.5.9.3 GROUP BY操作
默認情況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。進行GROUP BY操作時需要注意以下幾點:
- Map端部分聚合
事實上並不是所有的聚合操作都需要在reduce部分進行,很多聚合操作都可以先在Map端進行部分聚合,然后reduce端得出最終結果。
(1)開啟Map端聚合參數設置
set hive.map.aggr=true
(2)在Map端進行聚合操作的條目數目
set hive.grouby.mapaggr.checkinterval=100000
(3)有數據傾斜的時候進行負載均衡(默認是false)
set hive.groupby.skewindata = true
- 有數據傾斜時進行負載均衡
此處需要設定hive.groupby.skewindata,當選項設定為true時,生成的查詢計划有兩個MapReduce任務。在第一個MapReduce中,map的輸出結果集合會隨機分布到reduce中,每個reduce做部分聚合操作,並輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的reduce中,從而達到負載均衡的目的;第二個MapReduce任務再根據預處理的數據結果按照Group By Key分布到reduce中(這個過程可以保證相同的Group By Key分布到同一個reduce中),最后完成最終的聚合操作。
3.5.10 優化in/exists語句
雖然經過測驗,hive1.2.1也支持in/exists操作,但還是推薦使用hive的一個高效替代方案:left semi join
比如說:
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
應該轉換成:
select a.id, a.name from a left semi join b on a.id = b.id;
3.5.11 排序選擇
- cluster by: 對同一字段分桶並排序,不能和sort by連用;
- distribute by + sort by: 分桶,保證同一字段值只存在一個結果文件當中,結合sort by 保證每個reduceTask結果有序;
- sort by: 單機排序,單個reduce結果有序
- order by:全局排序,缺陷是只能使用一個reduce
3.6 存儲格式
可以使用列裁剪,分區裁剪,orc,parquet等這些列式存儲格式,因為列式存儲的表,每一列的數據在物理上是存儲在一起的,Hive查詢時會只遍歷需要列數據,大大減少處理的數據量。
Hive支持ORCfile,這是一種新的表格存儲格式,通過諸如謂詞下推,壓縮等技術來提高執行速度提升。對於每個HIVE表使用ORCfile應該是一件容易的事情,並且對於獲得HIVE查詢的快速響應時間非常有益。
作為一個例子,考慮兩個大表A和B(作為文本存儲,其中一些列未在此處指定,即行式存儲的缺點)以及一個簡單的查詢,如:
SELECT A.customerID,A.name,A.age,A.address join B.role,B.department,B.salary ON A.customerID=B.customerID;
此查詢可能需要很長時間才能執行,因為表A和B都以TEXT形式存儲,進行全表掃描。
將這些表格轉換為ORCFile格式通常會顯着減少查詢時間;
ORC支持壓縮存儲(使用ZLIB或如上所示使用SNAPPY),但也支持未壓縮的存儲。
CREATE TABLE A_ORC ( customerID int,name string,age int, address string ) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY"); INSERT INTO TABLE A_ORC SELECT * FROM A; CREATE TABLE B_ORC ( customerID int, role string, salary float, department string ) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY"); INSERT INTO TABLE B_ORC SELECT * FROM B; SELECT A_ORC.customerID, A_ORC.name, A_ORC.age, A_ORC.address join B_ORC.role,B_ORC.department, B_ORC.salary ON A_ORC.customerID=B_ORC.customerID;
3.7 壓縮格式
大數據場景下存儲格式壓縮格式尤為關鍵,可以提升計算速度,減少存儲空間,降低網絡io,磁盤io,所以要選擇合適的壓縮格式和存儲格式,那么首先就了解這些東西。參考該博客
3.7.1 壓縮的原因
Hive最終是轉為MapReduce程序來執行的,而MapReduce的性能瓶頸在於網絡IO和磁盤IO,要解決性能瓶頸,最主要的是減少數據量,對數據進行壓縮是個好的方式。壓縮雖然是減少了數據量,但是壓縮過程要消耗CPU的,但是在Hadoop中,往往性能瓶頸不在於CPU,CPU壓力並不大,所以壓縮充分利用了比較空閑的CPU。
3.7.2 常用壓縮方法對比
各個壓縮方式所對應的Class類:
3.7.3 壓縮方式的選擇
壓縮比率,壓縮解壓縮速度,是否支持Split
3.7.4 壓縮使用
Job輸出文件按照block以Gzip的方式進行壓縮:
set mapreduce.output.fileoutputformat.compress=true // 默認值是 false set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec
Map輸出結果也以Gzip進行壓縮:
set mapred.map.output.compress=true set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec
對Hive輸出結果和中間都進行壓縮:
set hive.exec.compress.output=true // 默認值是 false,不壓縮 set hive.exec.compress.intermediate=true // 默認值是 false,為 true 時 MR 設置的壓縮才啟用
3.8 引擎的選擇
Hive可以使用Apache Tez執行引擎而不是古老的Map-Reduce引擎。沒有在環境中沒有默認打開,在Hive查詢開頭將以下內容設置為‘true’來使用Tez:“設置hive.execution.engine = tez; ”,通過上述設置,你執行的每個HIVE查詢都將利用Tez。目前Hive On Spark還處於試驗階段,慎用。
3.9 使用向量化查詢
向量化查詢執行通過一次性批量執行1024行而不是每次單行執行,從而提供掃描、聚合、篩選器和連接等操作的性能。在Hive 0.13中引入,此功能顯着提高了查詢執行時間,並可通過兩個參數設置輕松啟用:
設置hive.vectorized.execution.enabled = true;
設置hive.vectorized.execution.reduce.enabled = true;
3.10 設置cost based query optimization
Hive自0.14.0開始,加入了一項“Cost based Optimizer”來對HQL執行計划進行優化,這個功能通過“hive.cbo.enable”來開啟。在Hive 1.1.0之后,這個feature是默認開啟的,它可以自動優化HQL中多個JOIN的順序,並選擇合適的JOIN算法。
Hive在提供最終執行前,優化每個查詢的執行邏輯和物理執行計划。這些優化工作是交給底層來完成的。根據查詢成本執行進一步的優化,從而產生潛在的不同決策:如何排序連接,執行哪種類型的連接,並行度等等。要使用基於成本的優化(也稱為CBO),請在查詢開始設置以下參數:
設置hive.cbo.enable = true;
設置hive.compute.query.using.stats = true;
設置hive.stats.fetch.column.stats = true;
設置hive.stats.fetch.partition.stats = true;
3.11 模式選擇
- 本地模式
對於大多數情況,Hive可以通過本地模式在單台機器上處理所有任務。對於小數據,執行時間可以明顯被縮短。通過set hive.exec.mode.local.auto = true(默認為false)設置為本地模式,本地模式涉及到三個參數:
set hive.exec.mode.local.auto=true; 是打開hive自動判斷是否啟動本地模式的開關,但是只是打開這個參數不能保證啟動本地模式,要當map任務數不超過hive.exec.mode.local.auto.input.files.max的個數並且map輸入文件大小不超過hive.exec.mode.local.auto.inputbytes.max所指定的大小時,才能啟動本地模式。
如下:用戶可以通過設置hive.exec.mode.local.auto的值為true,來讓Hive在適當的時候自動啟動這個優化。
set hive.exec.mode.local.auto=true; //開啟本地mr //設置local mr的最大輸入數據量,當輸入數據量小於這個值時采用local mr的方式,默認為134217728,即128M set hive.exec.mode.local.auto.inputbytes.max=50000000; //設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時采用local mr的方式,默認為4 set hive.exec.mode.local.auto.input.files.max=10;
- 並行模式
Hive會將一個查詢轉化成一個或多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合並階段、limit階段。默認情況下,Hive一次只會執行一個階段,由於job包含多個階段,而這些階段並非完全相互依賴,即:這些階段可以並行執行,可以縮短整個job的執行時間。設置參數,set hive.exec.parallel=true,或者通過配置文件來完成:
hive> set hive.exec.parallel;
- 嚴格模式
Hive提供一個嚴格模式,可以防止用戶執行那些可能產生意想不到的影響查詢,通過設置Hive.mapred.modestrict來完成。
set Hive.mapred.modestrict;
3.12 JVM重用
Hadoop通常是使用派生JVM來執行map和reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含成百上千的task任務的情況。JVM重用可以使得JVM示例在同一個job中時候,通過參數mapred.job.reuse.jvm.num.tasks來設置。
3.13 推測執行
Hadoop推測執行可以觸發執行一些重復的任務,盡管因對重復的數據進行計算而導致消耗更多的計算資源,不過這個功能的目標是通過加快獲取單個task的結果以偵測執行慢的TaskTracker加入到沒名單的方式來提高整體的任務執行效率。Hadoop的推測執行功能由2個配置控制着,通過mapred-site.xml中配置
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
4. 總結
【參考資料】
https://blog.csdn.net/yu0_zhang0/article/details/81776459
https://www.cnblogs.com/smartloli/p/4356660.html
https://blog.csdn.net/zdy0_2004/article/details/81613230
https://blog.51cto.com/12445535/2352789
https://blog.csdn.net/BabyFish13/article/details/52055927