Hive的10種常用優化總結,再也不怕MapReduce分配不均了


Hive作為大數據領域常用的數據倉庫組件,在平時設計和查詢時要特別注意效率。影響Hive效率的幾乎從不是數據量過大,而是數據傾斜、數據冗余、job或I/O過多、MapReduce分配不合理等等。對Hive的調優既包含對HiveSQL語句本身的優化,也包含Hive配置項和MR方面的調整。

列裁剪和分區裁剪

最基本的操作。所謂列裁剪就是在查詢時只讀取需要的列,分區裁剪就是只讀取需要的分區。以我們的日歷記錄表為例:

select uid,event_type,record_data
from calendar_record_log
where pt_date >= 20190201 and pt_date <= 20190224
and status = 0;

當列很多或者數據量很大時,如果select *或者不指定分區,全列掃描和全表掃描效率都很低。
Hive中與列裁剪優化相關的配置項是hive.optimize.cp,與分區裁剪優化相關的則是hive.optimize.pruner,默認都是true。在HiveSQL解析階段對應的則是ColumnPruner邏輯優化器。

謂詞下推

在關系型數據庫如MySQL中,也有謂詞下推(Predicate Pushdown,PPD)的概念。它就是將SQL語句中的where謂詞邏輯都盡可能提前執行,減少下游處理的數據量。
例如以下HiveSQL語句:

select a.uid,a.event_type,b.topic_id,b.title
from calendar_record_log a
left outer join (
select uid,topic_id,title from forum_topic
where pt_date = 20190224 and length(content) >= 100
) b on a.uid = b.uid
where a.pt_date = 20190224 and status = 0;

對forum_topic做過濾的where語句寫在子查詢內部,而不是外部。Hive中有謂詞下推優化的配置項hive.optimize.ppd,默認值true,與它對應的邏輯優化器是PredicatePushDown。該優化器就是將OperatorTree中的FilterOperator向上提,見下圖。

Hive的10種常用優化總結,再也不怕MapReduce分配不均了

 

sort by代替order by

HiveSQL中的order by與其他SQL方言中的功能一樣,就是將結果按某字段全局排序,這會導致所有map端數據都進入一個reducer中,在數據量大時可能會長時間計算不完。

如果使用sort by,那么還是會視情況啟動多個reducer進行排序,並且保證每個reducer內局部有序。為了控制map端數據分配到reducer的key,往往還要配合distribute by一同使用。如果不加distribute by的話,map端數據就會隨機分配到reducer。

group by代替distinct

當要統計某一列的去重數時,如果數據量很大,count(distinct)就會非常慢,原因與order by類似,count(distinct)邏輯只會有很少的reducer來處理。這時可以用group by來改寫:

select count(1) from (
select uid from calendar_record_log
where pt_date >= 20190101
group by uid
) t;

但是這樣寫會啟動兩個MR job(單純distinct只會啟動一個),所以要確保數據量大到啟動job的overhead遠小於計算耗時,才考慮這種方法。當數據集很小或者key的傾斜比較明顯時,group by還可能會比distinct慢。
那么如何用group by方式同時統計多個列?下面是解決方法:

selectt.a,sum(t.b),count(t.c),count(t.d)from(select a,b,null c,null d from some_tableunion allselect a,0 b,c,null d from some_table group by a,cunion allselect a,0 b,null c,d from some_table group by a,d) t;

group by配置調整

map端預聚合

group by時,如果先起一個combiner在map端做部分預聚合,可以有效減少shuffle數據量。預聚合的配置項是hive.map.aggr,默認值true,對應的優化器為GroupByOptimizer,簡單方便。
通過hive.groupby.mapaggr.checkinterval參數也可以設置map端預聚合的行數閾值,超過該值就會分拆job,默認值100000。

傾斜均衡配置項

group by時如果某些key對應的數據量過大,就會發生數據傾斜。Hive自帶了一個均衡數據傾斜的配置項hive.groupby.skewindata,默認值false。
其實現方法是在group by時啟動兩個MR job。第一個job會將map端數據隨機輸入reducer,每個reducer做部分聚合,相同的key就會分布在不同的reducer中。第二個job再將前面預處理過的數據按key聚合並輸出結果,這樣就起到了均衡的效果。
但是,配置項畢竟是死的,單純靠它有時不能根本上解決問題,因此還是建議自行了解數據傾斜的細節,並優化查詢語句。

join基礎優化

join優化是一個復雜的話題,下面先說5點最基本的注意事項。

build table(小表)前置

在最常見的hash join方法中,一般總有一張相對小的表和一張相對大的表,小表叫build table,大表叫probe table。如下圖所示。

Hive的10種常用優化總結,再也不怕MapReduce分配不均了

 

Hive在解析帶join的SQL語句時,會默認將最后一個表作為probe table,將前面的表作為build table並試圖將它們讀進內存。如果表順序寫反,probe table在前面,引發OOM的風險就高了。
在維度建模數據倉庫中,事實表就是probe table,維度表就是build table。假設現在要將日歷記錄事實表和記錄項編碼維度表來join:

selecta.event_type,a.event_code,a.event_desc,b.upload_timefrom calendar_event_code ainner join (select event_type,upload_time from calendar_record_logwhere pt_date = 20190225)bona.event_type=b.event_type;

多表join時key相同

這種情況會將多個join合並為一個MR job來處理,例如:

selecta.event_type,a.event_code,a.event_desc,b.upload_timefrom calendar_event_code ainner join (select event_type,upload_time from calendar_record_logwhere pt_date = 20190225) b on a.event_type = b.event_typeinner join (select event_type,upload_time from calendar_record_log_2where pt_date = 20190225)cona.event_type=c.event_type;

如果上面兩個join的條件不相同,比如改成a.event_code = c.event_code,就會拆成兩個MR job計算。
負責這個的是相關性優化器CorrelationOptimizer.

利用map join特性

map join特別適合大小表join的情況。Hive會將build table和probe table在map端直接完成join過程,消滅了reduce,效率很高。

select  a.event_type,b.upload_timefrom calendar_event_code ainner join (select event_type,upload_time from calendar_record_logwhere pt_date = 20190225) b on a.event_type < b.event_type;

上面的語句中加了一條map join hint,以顯式啟用map join特性。早在Hive 0.8版本之后,就不需要寫這條hint了。map join還支持不等值連接,應用更加靈活。
map join的配置項是hive.auto.convert.join,默認值true,對應邏輯優化器是MapJoinProcessor。
還有一些參數用來控制map join的行為,比如hive.mapjoin.smalltable.filesize,當build table大小小於該值就會啟用map join,默認值25000000(25MB)。還有hive.mapjoin.cache.numrows,表示緩存build table的多少行數據到內存,默認值25000。

分桶表map join

map join對分桶表還有特別的優化。由於分桶表是基於一列進行hash存儲的,因此非常適合抽樣(按桶或按塊抽樣)。
它對應的配置項是hive.optimize.bucketmapjoin,優化器是BucketMapJoinOptimizer。但我們的業務中用分桶表較少,所以就不班門弄斧了,只是提一句。

傾斜均衡配置項

這個配置與上面group by的傾斜均衡配置項異曲同工,通過hive.optimize.skewjoin來配置,默認false。

如果開啟了,在join過程中Hive會將計數超過閾值hive.skewjoin.key(默認100000)的傾斜key對應的行臨時寫進文件中,然后再啟動另一個job做map join生成結果。通過hive.skewjoin.mapjoin.map.tasks參數還可以控制第二個job的mapper數量,默認10000。
再重復一遍,通過自帶的配置項經常不能解決數據傾斜問題。join是數據傾斜的重災區,后面還要介紹在SQL層面處理傾斜的各種方法。

優化SQL處理join數據傾斜

空值或無意義值

這種情況很常見,比如當事實表是日志類數據時,往往會有一些項沒有記錄到,我們視情況會將它置為null,或者空字符串、-1等。如果缺失的項很多,在做join時這些空值就會非常集中,拖累進度。
因此,若不需要空值數據,就提前寫where語句過濾掉。需要保留的話,將空值key用隨機方式打散,例如將用戶ID為null的記錄隨機改為負值:

select a.uid,a.event_type,b.nickname,b.agefrom (select  (case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,  event_type from calendar_record_logwhere pt_date >= 20190201) a left outer join (select uid,nickname,age from user_info where status = 4) b on a.uid = b.uid;

單獨處理傾斜key

這其實是上面處理空值方法的拓展,不過傾斜的key變成了有意義的。一般來講傾斜的key都很少,我們可以將它們抽樣出來,對應的行單獨存入臨時表中,然后打上一個較小的隨機數前綴(比如0~9),最后再進行聚合。SQL語句與上面的相仿,不再贅述。

不同數據類型

這種情況不太常見,主要出現在相同業務含義的列發生過邏輯上的變化時。
舉個例子,假如我們有一舊一新兩張日歷記錄表,舊表的記錄類型字段是(event_type int),新表的是(event_type string)。為了兼容舊版記錄,新表的event_type也會以字符串形式存儲舊版的值,比如'17'。當這兩張表join時,經常要耗費很長時間。其原因就是如果不轉換類型,計算key的hash值時默認是以int型做的,這就導致所有“真正的”string型key都分配到一個reducer上。所以要注意類型轉換:

select a.uid,a.event_type,b.record_datafrom calendar_record_log aleft outer join (select uid,event_type from calendar_record_log_2where pt_date = 20190228) b on a.uid = b.uid and b.event_type = cast(a.event_type as string)where a.pt_date = 20190228;

build table過大

有時,build table會大到無法直接使用map join的地步,比如全量用戶維度表,而使用普通join又有數據分布不均的問題。這時就要充分利用probe table的限制條件,削減build table的數據量,再使用map join解決。代價就是需要進行兩次join。舉個例子:

select /*+mapjoin(b)*/ a.uid,a.event_type,b.status,b.extra_infofrom calendar_record_log aleft outer join (select /*+mapjoin(s)*/ t.uid,t.status,t.extra_infofrom (select distinct uid from calendar_record_log where pt_date = 20190228) sinner join user_info t on s.uid = t.uid) b on a.uid = b.uidwhere a.pt_date = 20190228;

MapReduce優化

Hive的10種常用優化總結,再也不怕MapReduce分配不均了

 

調整mapper數

mapper數量與輸入文件的split數息息相關,在Hadoop源碼org.apache.hadoop.mapreduce.lib.input.FileInputFormat類中可以看到split划分的具體邏輯。這里不貼代碼,直接敘述mapper數是如何確定的。

  • 可以直接通過參數mapred.map.tasks(默認值2)來設定mapper數的期望值,但它不一定會生效,下面會提到。
  • 設輸入文件的總大小為total_input_size。HDFS中,一個塊的大小由參數dfs.block.size指定,默認值64MB或128MB。在默認情況下,mapper數就是:
    default_mapper_num = total_input_size / dfs.block.size。
  • 參數mapred.min.split.size(默認值1B)和mapred.max.split.size(默認值64MB)分別用來指定split的最小和最大大小。split大小和split數計算規則是:
    split_size = MAX(mapred.min.split.size, MIN(mapred.max.split.size, dfs.block.size));
    split_num = total_input_size / split_size。
  • 得出mapper數:
    mapper_num = MIN(split_num, MAX(default_num, mapred.map.tasks))。

可見,如果想減少mapper數,就適當調高mapred.min.split.size,split數就減少了。如果想增大mapper數,除了降低mapred.min.split.size之外,也可以調高mapred.map.tasks。

一般來講,如果輸入文件是少量大文件,就減少mapper數;如果輸入文件是大量非小文件,就增大mapper數;至於大量小文件的情況,得參考下面“合並小文件”一節的方法處理。

調整reducer數

reducer數量的確定方法比mapper簡單得多。使用參數mapred.reduce.tasks可以直接設定reducer數量,不像mapper一樣是期望值。但如果不設這個參數的話,Hive就會自行推測,邏輯如下:

  • 參數hive.exec.reducers.bytes.per.reducer用來設定每個reducer能夠處理的最大數據量,默認值1G(1.2版本之前)或256M(1.2版本之后)。
  • 參數hive.exec.reducers.max用來設定每個job的最大reducer數量,默認值999(1.2版本之前)或1009(1.2版本之后)。
  • 得出reducer數:
    reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)。

reducer數量與輸出文件的數量相關。如果reducer數太多,會產生大量小文件,對HDFS造成壓力。如果reducer數太少,每個reducer要處理很多數據,容易拖慢運行時間或者造成OOM。

JVM重用

在MR job中,默認是每執行一個task就啟動一個JVM。如果task非常小而碎,那么JVM啟動和關閉的耗時就會很長。可以通過調節參數mapred.job.reuse.jvm.num.tasks來重用。例如將這個參數設成5,那么就代表同一個MR job中順序執行的5個task可以重復使用一個JVM,減少啟動和關閉的開銷。但它對不同MR job中的task無效。

並行執行與本地模式

  • 並行執行
    Hive中互相沒有依賴關系的job間是可以並行執行的,最典型的就是多個子查詢union all。在集群資源相對充足的情況下,可以開啟並行執行,即將參數hive.exec.parallel設為true。另外hive.exec.parallel.thread.number可以設定並行執行的線程數,默認為8,一般都夠用。
  • 本地模式
    Hive也可以不將任務提交到集群進行運算,而是直接在一台節點上處理。因為消除了提交到集群的overhead,所以比較適合數據量很小,且邏輯不復雜的任務。
    設置hive.exec.mode.local.auto為true可以開啟本地模式。但任務的輸入數據總量必須小於hive.exec.mode.local.auto.inputbytes.max(默認值128MB),且mapper數必須小於hive.exec.mode.local.auto.tasks.max(默認值4),reducer數必須為0或1,才會真正用本地模式執行。

嚴格模式

所謂嚴格模式,就是強制不允許用戶執行3種有風險的HiveSQL語句,一旦執行會直接失敗。這3種語句是:

  • 查詢分區表時不限定分區列的語句;
  • 兩表join產生了笛卡爾積的語句;
  • 用order by來排序但沒有指定limit的語句。

要開啟嚴格模式,需要將參數hive.mapred.mode設為strict。

結束

寫了這么多,肯定有遺漏或錯誤之處,歡迎各位大佬批評指正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM