Hive on MR調優


當HiveQL跑不出來時,基本上是數據傾斜了,比如出現count(distinct),groupby,join等情況,理解 MR 底層原理,同時結合實際的業務,數據的類型,分布,質量狀況等來實際的考慮如何進行系統性的優化。

Hive on MR 調優主要從三個層面進行,分別是基於MapReduce優化、Hive架構層優化和HiveQL層優化。

MapReduce調優

  如果能夠根據情況對shuffle過程進行調優,對於提供MapReduce性能很有幫助。一個通用的原則是給shuffle過程分配盡可能大的內存,當然你需要確保map和reduce有足夠的內存來運行業務邏輯。因此在實現Mapper和Reducer時,應該盡量減少內存的使用,例如避免在Map中不斷地疊加。
運行map和reduce任務的JVM,內存通過mapred.child.java.opts屬性來設置,盡可能設大內存。容器的內存大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設置,默認都是1024M。

1 map調優

  在map階段主要包括:數據的讀取、map處理以及寫出操作(排序和合並/sort&merge),其中可以針對spill文件輸出數量、Combiner的merge過程和數據壓縮進行優化,避免寫入多個spill文件可能達到最好的性能,一個spill文件是最好的。通過估計map的輸出大小,設置合理的mapreduce.task.io.sort.*屬性,使得spill文件數量最小。例如盡可能調大mapreduce.task.io.sort.mb。其次增加combine階段以及對輸出進行壓縮設置進行mapper調優。

(1) 合理設置map數

在執行map函數之前會先將HDFS上文件進行分片,得到的分片做為map函數的輸入,所以map數量取決於map的輸入分片(inputsplit),一個輸入分片對應於一個map task,輸入分片由三個參數決定:

參數名 默認值 備注
dfs.block.size 128M HDFS上數據塊的大小
mapreduce.min.split.size 0 最小分片數
mapreduce.max.split.size 256M 最大分片數

公式:分片大小=max(mapreduce.min.split.size,min(dfs.block.size, mapreduce.max.split.size)),默認情況下分片大小和dfs.block.size是一致的,即一個HDFS數據塊對應一個輸入分片,對應一個map task。這時候一個map task中只處理一台機器上的一個數據塊,不需要將數據跨網絡傳輸,提高了數據處理速度。

(2)spill文件輸出數量

--用於mapper輸出排序的內存大小,調大的話,會減少磁盤spill的次數,此時如果內存足夠的話,一般都會顯著提升性能
mapreduce.task.io.sort.mb(default100--開始spill的緩沖池閥值,默認0.80,spill一般會在Buffer空間大小的80%開始進行spill
mapreduce.map.sort.spill.percentdefault0.80

(3)combine(排序和合並/sort&merge)

--運行combiner的最低文件數,與reduce共用;調大來減少merge的次數,從而減少磁盤的操作;
mapreduce.task.io.sort.factor(default10--spill的文件數默認情況下由三個的時候就要進行combine操作,最終減少磁盤數據;
min.num.spill.for.combine 默認是3

(4)壓縮設置

--設置為true進行壓縮,數據會被壓縮寫入磁盤,,壓縮一般可以10倍的減少IO操作
mapreduce.map.output.compress(default:false)
--壓縮算法,推薦使用SnappyCodec;
mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)

2 reduce調優

  在reduce端,如果能夠讓所有數據都保存在內存中,可以達到最佳的性能。通常情況下,內存都保留給reduce函數,但是如果reduce函數對內存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發合並的map輸出文件數)設為0,mapreduce.reduce.input.buffer.percent(用於保存map輸出文件的堆內存比例)設為1.0,可以達到很好的性能提升。在2008年的TB級別數據排序性能測試中,Hadoop就是通過將reduce的中間數據都保存在內存中勝利的。

(1)對mapper端輸出數據的獲取

--mr程序reducer copy數據的線程數。當map很多並且完成的比較快的job的情況下調大,有利於reduce更快的獲取屬於自己部分的數據
mapreduce.reduce.shuffle.parallelcopies 默認5

(2)數據合並(sort&merge)

--reduce復制map數據的時候指定的內存堆大小百分比,適當的增加該值可以減少map數據的磁盤溢出,能夠提高系統能。
mapreduce.reduce.shuffle.input.buffer.percent 默認0.70;
--reduce進行shuffle的時候,用於啟動合並輸出和磁盤溢寫的過程的閥值。
--如果允許,適當增大其比例能夠減少磁盤溢寫次數,提高系統性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。
mapreduce.reduce.shuffle.merge.percent 默認0.66;

(3)reduce處理以及寫出操作

--reduce函數開始運行時,內存中的map輸出所占的堆內存比例不得高於這個值,默認情況內存都用於reduce函數,也就是map輸出都寫入到磁盤
set mapreduce.reduce.input.buffer.percent 默認0.0;
--開始spill的map輸出文件數閾值,小於等於0表示沒有閾值,此時只由緩沖池比例來控制
set mapreduce.reduce.merge.inmem.threshold 默認1000;
--服務於reduce提取結果的線程數量
mapreduce.shuffle.max.threads 默認0;
--修改reducer的個數,可以通過job.setNumReduceTasks方法來進行更改。
mapreduce.job.reduces 默認為1;

 (4)合理設置reduce數

reduce數決定參數

參數名 默認值 備注
hive.exec.reducers.bytes.per.reducer 1G 一個reduce數據量的大小
hive.exec.reducers.max 999 hive 最大的個數
mapred.reduce.tasks -1 reduce task 的個數,-1 是根據hive.exec.reducers.bytes.per.reducer 自動調整

所以可以用set mapred.reduce.tasks手動調整reduce task個數。

Hive架構層優化

1 不執行mapreduce

hive從HDFS讀取數據,有兩種方式:啟用mapreduce讀取、直接抓取。

set hive.fetch.task.conversion=more

hive.fetch.task.conversion參數設置成more,可以在 select、where 、limit 時啟用直接抓取方式,能明顯提升查詢速度。

2 本地執行mapreduce

hive在集群上查詢時,默認是在集群上N台機器上運行,需要多個機器進行協調運行,這個方式很好地解決了大數據量的查詢問題。但是當hive查詢處理的數據量比較小時,其實沒有必要啟動分布式模式去執行,因為以分布式方式執行就涉及到跨網絡傳輸、多節點協調等,並且消耗資源。這個時間可以只使用本地模式來執行mapreduce job,只在一台機器上執行,速度會很快。

3 JVM重用

因為hive語句最終要轉換為一系列的mapreduce job的,而每一個mapreduce job是由一系列的map task和Reduce task組成的,默認情況下,mapreduce中一個map task或者一個Reduce task就會啟動一個JVM進程,一個task執行完畢后,JVM進程就退出。這樣如果任務花費時間很短,又要多次啟動JVM的情況下,JVM的啟動時間會變成一個比較大的消耗,這個時候,就可以通過重用JVM來解決。這個設置就是制定一個jvm進程在運行多次任務之后再退出,這樣一來,節約了很多的 JVM的啟動時間。

--JVM重用特別是對於小文件場景或者task特別多的場景
set mapred.job.reuse.jvm.num.tasks=10; 

--啟動JVM虛擬機時,傳遞給虛擬機的啟動參數,表示這個 Java 程序可以使用的最大堆內存數,一旦超過這個大小,JVM 就會拋出 Out of Memory 異常,並終止進程。
--設置的是 Container 的內存上限,這個參數由 NodeManager 讀取並進行控制,當 Container 的內存大小超過了這個參數值,NodeManager 會負責 kill 掉 Container。
--mapreduce.map.java.opts一定要小於mapreduce.map.memory.mb。
mapreduce.map.java.opts 默認 -Xmx200m;
mapreduce.map.memory.mb
--同上
mapreduce.reduce.java.opts;
mapreduce.map.java.opts;

--是否啟動map階段的推測執行,其實一般情況設置為false比較好。可通過方法job.setMapSpeculativeExecution來設置。
mapreduce.map.speculative 默認為true;
--是否需要啟動reduce階段的推測執行,其實一般情況設置為fase比較好。可通過方法job.setReduceSpeculativeExecution來設置。
mapreduce.reduce.speculative 默認為true;

4 並行化

一個hive sql語句可能會轉為多個mapreduce job,每一個job就是一個stage,這些 job 順序執行,這個在hue的運行日志中也可以看到。但是有時候這些任務之間並不是是相互依賴的,如果集群資源允許的話,可以讓多個並不相互依賴stage並發執行,這樣就節約了時間,提高了執行速度,但是如果集群資源匱乏時,啟用並行化反倒是會導致各個job相互搶占資源而導致整體執行性能的下降。

--開啟任務並行執行
 set hive.exec.parallel=true;
--同一個sql允許並行任務的最大線程數 
set hive.exec.parallel.thread.number 默認為8;

HiveQL調優

1 利用分區表優化

分區表是在某一個或者某幾個維度上對數據進行分類存儲,一個分區對應於一個目錄。在這中的存儲方式,當查詢時,如果篩選條件里有分區字段,那么hive只需要遍歷對應分區目錄下的文件即可,不用全局遍歷數據,使得處理的數據量大大減少,提高查詢效率。
當一個hive表的查詢大多數情況下,會根據某一個字段進行篩選時,那么非常適合創建為分區表。

2 利用桶表優化

就是指定桶的個數后,存儲數據時,根據某一個字段進行哈希后,確定存儲再哪個桶里,這樣做的目的和分區表類似,也是使得篩選時不用全局遍歷所有的數據,只需要遍歷所在桶就可以了。

hive.optimize.bucketmapJOIN=true;
hive.input.format=org.apache.hadoop.hive.ql.io.bucketizedhiveInputFormat; 
hive.optimize.bucketmapjoin=true; 
hive.optimize.bucketmapjoin.sortedmerge=true;

3 對於整個sql的優化

(1)where 條件優化
where只在map端階段執行,不會在reduce階段執行,盡早地過濾數據,減少每個階段的數據量,對於分區表要加分區,同時只選擇需要使用到的字段。
(2)join優化

① 優先過濾后再join,最大限度地減少參與join的數據量。
② 小表join大表原則 
應該遵守小表join大表原則,原因是join操作的reduce階段,位於join左邊的表內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生內存溢出的幾率。join中執行順序是從做到右生成job,應該保證連續查詢中的表的大小從左到右是依次增加的。 
③ join on條件相同的放入一個job 
hive中,當多個表進行join時,如果join on的條件相同,那么他們會合並為一個mapreduce job,所以利用這個特性,可以將相同的join on的放入一個job來節省執行時間。
  select pt.page_id,count(t.url) PV 
  from rpt_page_type pt 
  join (select url_page_id,url from trackinfo where ds='2016-10-11' ) t on pt.page_id=t.url_page_id 
  join (select page_id from rpt_page_kpi_new where ds='2016-10-11' ) r on t.url_page_id=r.page_id group by pt.page_id;
④Common/shuffle/Reduce JOIN
發生在reduce 階段, 適用於大表 連接 大表(默認的方式)
⑤Map JOIN
連接發生在map階段 ,適用於小表 連接 大表,大表的數據從文件中讀取,小表的數據存放在內存中(hive中已經自動進行了優化,自動判斷小表,然后進行緩存)
  set hive.auto.convert.join=true; 
⑥SMB JOIN,Sort -Merge -Bucket Join 對大表連接大表的優化,用桶表的概念來進行優化。在一個桶內發送生笛卡爾積連接(需要是兩個桶表進行join)
  set hive.auto.convert.sortmerge.join=true; 
  set hive.optimize.bucketmapjoin = true;   set hive.optimize.bucketmapjoin.sortedmerge = true;   set hive.auto.convert.sortmerge.join.noconditionaltask=true;

(3) Group By數據傾斜優化
  Group By很容易導致數據傾斜問題,因為實際業務中,通常是數據集中在某些點上,這也符合常見的2/8原則,這樣會造成對數據分組后,某一些分組上數據量非常大,而其他的分組上數據量很小,而在mapreduce程序中,同一個分組的數據會分配到同一個reduce操作上去,導致某一些reduce壓力很大,其他的reduce壓力很小,這就是數據傾斜,整個job 執行時間取決於那個執行最慢的那個reduce。
解決這個問題的方法是配置一個參數:set hive.groupby.skewindata=true。 當選項設定為true,生成的查詢計划會有兩個MR job。第一個MR job 中,map的輸出結果會隨機分布到Reduce中,每個Reduce做部分聚合操作,並輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MR job再根據預處理的數據結果按照Group By Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最后完成最終的聚合操作。
(4) Order By 優化
  因為order by只能是在一個reduce進程中進行的,所以如果對一個大數據集進行order by,會導致一個reduce進程中處理的數據相當大,造成查詢執行超級緩慢。
(5)mapjoin
  mapjoin是將join雙方比較小的表直接分發到各個map進程的內存中,在map進程中進行join操作,這樣就省掉了reduce步驟,提高了速度。
但慎重使用mapjoin,一般行數小於2000行,大小小於1M(擴容后可以適當放大)的表才能使用,小表要注意放在join的左邊。否則會引起磁盤和內存的大量消耗。
(6) 桶表mapjoin
  當兩個分桶表join時,如果join on的是分桶字段,小表的分桶數時大表的倍數時,可以啟用map join來提高效率。啟用桶表mapjoin要啟用hive.optimize.bucketmapjoin參數。
(7) 消滅子查詢內的 group by 、 COUNT(DISTINCT),MAX,MIN。 可以減少job的數量。
(8) 不要使用count (distinct cloumn) ,改使用子查詢。
(9) 如果union all的部分個數大於2,或者每個union部分數據量大,應該拆成多個insert into 語句,這樣會提升執行的速度。盡量不要使用union (union 去掉重復的記錄)而是使用 union all 然后在用group by 去重。
(10) 中間臨時表使用orc、parquet等列式存儲格式。
(11) Join字段顯示類型轉換。
(12) 單個SQL所起的JOB個數盡量控制在5個以下。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM