一、Hive-sql 常用優化
MapReduce 流程:
Input->split->map->buffer(此處調整其大小)->spill->spill過多合並->merge->combine(減少reduce壓力)->shuffle(copy、merge)->spill->disk->reduce->Output
1.1、常用參數設置
#增加reducer任務數量(拉取數量分流) set mapred.reduce.tasks=20; #在同一個sql中的不同的job是否可以同時運行,默認為false set hive.exec.parallel=true; #增加同一個sql允許並行任務的最大線程數 set hive.exec.parallel.thread.number=8; #設置reducer內存大小 set mapreduce.reduce.memory.mb=4096; set mapreduce.reduce.java.opts=-Xmx3584m; -- -Xmx 設置堆的最大空間大小。
#設置執行引擎
set hive.execution.engine=mr; -- 執行MapReduce任務,也可以設置為spark
-- 設置內存大小
set mapreduce.reduce.memory.mb=8192; -- reduce 設置的是 Container 的內存上限,這個參數由 NodeManager 讀取並進行控制,當 Container 的內存大小超過了這個參數值,NodeManager 會負責 kill 掉 Container
set mapreduce.reduce.java.opts=-Xmx6144m; -- reduce Java 程序可以使用的最大堆內存數,要小於 mapreduce.reduce.memory.mb
set mapreduce.map.memory.mb=8192; -- map申請內存大小
set mapreduce.map.java.opts=-Xmx6144m;
#動態分區設置,參考:https://www.cnblogs.com/cssdongl/p/6831884.html
set hive.exec.dynamic.partition=true; 是開啟動態分區
set hive.exec.dynamic.partition.mode=nonstrict; 這個屬性默認值是strict,就是要求分區字段必須有一個是靜態的分區值,當前設置為nonstrict,那么可以全部動態分區
#其他
-- 開始負載均衡
set hive.groupby.skewindata=true
-- 開啟map端combiner
set hive.map.aggr=true
1.2、mapjoin
#mapjoin相關設置,小表加載到內存,無reduce set hive.mapjoin.smalltable.filesize=25000000; -- 刷入內存表的大小(字節)。注意:設置太大也不會校驗,所以要根據自己的數據集調整 set hive.auto.convert.join = true; -- 開啟mapjoin,默認false set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.6 ;--map join做group by操作時,可使用多大的內存來存儲數據。若數據太大則不會保存在內存里,默認0.55 set hive.mapjoin.localtask.max.memory.usage=0.90; -- 本地任務可以使用內存的百分比,默認值:0.90 -- 在設置成false時,可以手動的指定mapjoin /*+ MAPJOIN(c) */ 。-->c:放到內存中的表 select /*+ MAPJOIN(c) */ * from user_install_status u inner join country_dict c on u.country=c.code -- 如果不是做innerjoin, 做left join 、right join -- A left join B, 把B放到內存 -- A right join B, 把A放到內存
1.3、小文件合並
產生原因:
- hive動態分區插入數據,分區數據量小,產生大量的小文件
- reduce數量越多,輸出大量小文件
- 數據源本身就就是一些小文件
影響:
- MapReduce原理上理解,小文件會開很多map,一個map開一個JVM去執行,每個任務的初始化、啟動、執行,都會消耗資源。
- 在HDFS中,每個小文件對象約占150byte,如果小文件過多會占用大量內存。這樣NameNode內存容量限制了集群規模。
解決方案:
在hive里有兩種比較常見的處理辦法
第一是使用Combinefileinputformat,將多個小文件打包作為一個整體的 inputsplit,減少map任務數
set mapred.max.split.size=256000000; # 每個 Map 最大分割大小
set mapred.min.split.size.per.node=256000000; # 一個節點上 split 的最小值
set Mapred.min.split.size.per.rack=256000000; #一個交換機下 split的最小值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #執行 map 前進行小文件的合並
第二是設置hive參數,將額外啟動一個MR Job打包小文件
set hive.merge.mapfiles = true; # 是否合並Map輸出文件
hive.merge.mapredfiles = false ; #是否合並 Reduce 輸出文件,默認為 False
hive.merge.size.per.task = 256*1000*1000;# 合並文件的大小
1.4、JVM 重用
- MapReduce 執行過程中每個任務/task 會啟用一個 JVM 來執行 map 和 reduce 任務,這時 JVM 的啟動過程可能會造成資源的消耗,如果一個job包含大量的task任務的情況,此優化效果比較明顯。通過參數 mapred.job.reuse.jvm.num.tasks 來設置
- 默認情況下,它被設置為 +1,這意味着每個 Map/Reduce 任務都會啟動一個新的JVM 。 相反,如果將它設置為 -1,那么可以通過無限數量的任務來使用 jvm 。 在這種情況下,任務連續執行一個,以使用相同的JVM 。
- 為每個task啟動一個新的 JVM 將耗時1秒左右,對於運行時間較長(比如1分鍾以上)的job影響不大,但如果都是時間很短的task,那么頻繁啟停JVM會有開銷。
- 如果我們想使用JVM重用技術來提高性能,那么可以將mapred.job.reuse.jvm.num.tasks設置成大於1的數。這表示屬於同一job的順序執行的task可以共享一個JVM,也就是說第二輪的map可以重用前一輪的JVM,而不是第一輪結束后關閉JVM,第二輪再啟動新的JVM。
- 注意:
- JVM重用技術不是指同一Job的兩個或兩個以上的task可以同時運行於同一JVM上,而是排隊按順序執行。
- 如果task屬於不同的job,那么JVM重用機制無效,不同job的task需要不同的JVM來運行。
1.5、常見 sql 優化
- 減少使用distinct
- 查詢條件中減少使用函數
- 避免使用select *
- 多個union all可以使用insert into替換
- 盡量避免一個SQL包含復雜邏輯,可以使用中間表來完成復雜的邏輯
- 減少每個階段的數據量,對於分區表要加分區,同時只選擇需要使用到的字段
二、數據傾斜
2.1、數據傾斜的表現
- 任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。
- 單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。
2.2、數據傾斜的解決方案
參數調節:
- 對於group by 產生傾斜的問題
- 開啟map端combiner:【set hive.map.aggr=true;】
- 開啟負載均衡:【set hive.groupby.skewindata=true;】
- 有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計划會有兩個 MR Job。
- 第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;
- 第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
SQL 語句調節:
- 大小表Join:
- 使用map join讓小的維度表先進內存。在map端完成join,不經過reduce。
- 大表Join大表:
- 非法數據太多,比如null,可以把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理后並不影響最終結果。
- 如果null值不要,可以通過where條件篩選掉;
- 將大量的非法數據轉化成隨機數+字符串,這樣兩個表的數據不會join在一起。
- count distinct大量相同特殊值:
- count distinct時,將值為空的情況單獨處理
- 如果是計算count distinct,可以不用處理,直接過濾,在最后結果中加1。
- 如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
- 采用sum group by的方式來替換 count(distinct) 完成計算。
- 空值過多,集中到一個reduce處理--設置隨機數(case when a.column is null then concat('test',rand()) else column end = b.column)或過濾。
- 特殊情況特殊處理:
- 在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去
- 比如:group by時維度過小,數據過於集中,數據自身傾斜,比如 北京的用戶比其它地方的用戶多很多
- 此時可以把北京的數據單獨處理:先把北京的數據分成N塊,每塊的數據進行局部統計,再將每塊的局部統計結果進行匯總,最終統計出結果