3、Hive-sql優化,數據傾斜處理


一、Hive-sql 常用優化

MapReduce 流程:

Input->split->map->buffer(此處調整其大小)->spill->spill過多合並->merge->combine(減少reduce壓力)->shuffle(copy、merge)->spill->disk->reduce->Output

1.1、常用參數設置

#增加reducer任務數量(拉取數量分流)
set mapred.reduce.tasks=20;

#在同一個sql中的不同的job是否可以同時運行,默認為false
set hive.exec.parallel=true;

#增加同一個sql允許並行任務的最大線程數
set hive.exec.parallel.thread.number=8;

#設置reducer內存大小
set mapreduce.reduce.memory.mb=4096;
set mapreduce.reduce.java.opts=-Xmx3584m; -- -Xmx 設置堆的最大空間大小。

#設置執行引擎
set hive.execution.engine=mr; -- 執行MapReduce任務,也可以設置為spark
-- 設置內存大小
set mapreduce.reduce.memory.mb=8192; -- reduce 設置的是 Container 的內存上限,這個參數由 NodeManager 讀取並進行控制,當 Container 的內存大小超過了這個參數值,NodeManager 會負責 kill 掉 Container
set mapreduce.reduce.java.opts=-Xmx6144m; -- reduce Java 程序可以使用的最大堆內存數,要小於 mapreduce.reduce.memory.mb
set mapreduce.map.memory.mb=8192; -- map申請內存大小
set mapreduce.map.java.opts=-Xmx6144m;

#動態分區設置,參考:https://www.cnblogs.com/cssdongl/p/6831884.html
set hive.exec.dynamic.partition=true; 是開啟動態分區
set hive.exec.dynamic.partition.mode=nonstrict; 這個屬性默認值是strict,就是要求分區字段必須有一個是靜態的分區值,當前設置為nonstrict,那么可以全部動態分區

#其他
-- 開始負載均衡
set hive.groupby.skewindata=true
-- 開啟map端combiner
set hive.map.aggr=true

1.2、mapjoin

#mapjoin相關設置,小表加載到內存,無reduce
set hive.mapjoin.smalltable.filesize=25000000; -- 刷入內存表的大小(字節)。注意:設置太大也不會校驗,所以要根據自己的數據集調整
set hive.auto.convert.join = true; -- 開啟mapjoin,默認false
set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.6 ;--map join做group by操作時,可使用多大的內存來存儲數據。若數據太大則不會保存在內存里,默認0.55
set hive.mapjoin.localtask.max.memory.usage=0.90; -- 本地任務可以使用內存的百分比,默認值:0.90
-- 在設置成false時,可以手動的指定mapjoin /*+ MAPJOIN(c) */  。-->c:放到內存中的表
select /*+ MAPJOIN(c) */ * from user_install_status u
inner join country_dict c
on u.country=c.code
-- 如果不是做innerjoin, 做left join 、right join
-- A left join B,  把B放到內存
-- A right join B,  把A放到內存

1.3、小文件合並

產生原因:

  • hive動態分區插入數據,分區數據量小,產生大量的小文件
  • reduce數量越多,輸出大量小文件
  • 數據源本身就就是一些小文件

影響:

  • MapReduce原理上理解,小文件會開很多map,一個map開一個JVM去執行,每個任務的初始化、啟動、執行,都會消耗資源。
  • 在HDFS中,每個小文件對象約占150byte,如果小文件過多會占用大量內存。這樣NameNode內存容量限制了集群規模。

解決方案:

在hive里有兩種比較常見的處理辦法

第一是使用Combinefileinputformat,將多個小文件打包作為一個整體的 inputsplit,減少map任務數

set mapred.max.split.size=256000000; # 每個 Map 最大分割大小

set mapred.min.split.size.per.node=256000000; # 一個節點上 split 的最小值

set  Mapred.min.split.size.per.rack=256000000; #一個交換機下 split的最小值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #執行 map 前進行小文件的合並

第二是設置hive參數,將額外啟動一個MR Job打包小文件

set hive.merge.mapfiles = true; # 是否合並Map輸出文件

hive.merge.mapredfiles = false ; #是否合並 Reduce 輸出文件,默認為 False 

hive.merge.size.per.task = 256*1000*1000;# 合並文件的大小 

1.4、JVM 重用

  • MapReduce 執行過程中每個任務/task 會啟用一個 JVM 來執行 map 和 reduce 任務,這時 JVM 的啟動過程可能會造成資源的消耗,如果一個job包含大量的task任務的情況,此優化效果比較明顯。通過參數 mapred.job.reuse.jvm.num.tasks 來設置
  • 默認情況下,它被設置為 +1,這意味着每個 Map/Reduce 任務都會啟動一個新的JVM 。 相反,如果將它設置為 -1,那么可以通過無限數量的任務來使用 jvm 。 在這種情況下,任務連續執行一個,以使用相同的JVM 。
  • 為每個task啟動一個新的 JVM 將耗時1秒左右,對於運行時間較長(比如1分鍾以上)的job影響不大,但如果都是時間很短的task,那么頻繁啟停JVM會有開銷。
  • 如果我們想使用JVM重用技術來提高性能,那么可以將mapred.job.reuse.jvm.num.tasks設置成大於1的數。這表示屬於同一job的順序執行的task可以共享一個JVM,也就是說第二輪的map可以重用前一輪的JVM,而不是第一輪結束后關閉JVM,第二輪再啟動新的JVM。
  • 注意:
    • JVM重用技術不是指同一Job的兩個或兩個以上的task可以同時運行於同一JVM上,而是排隊按順序執行。
    • 如果task屬於不同的job,那么JVM重用機制無效,不同job的task需要不同的JVM來運行。

1.5、常見 sql 優化

  • 減少使用distinct
  • 查詢條件中減少使用函數
  • 避免使用select *
  • 多個union all可以使用insert into替換
  • 盡量避免一個SQL包含復雜邏輯,可以使用中間表來完成復雜的邏輯
  • 減少每個階段的數據量,對於分區表要加分區,同時只選擇需要使用到的字段

二、數據傾斜

2.1、數據傾斜的表現

  • 任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。
  • 單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。

2.2、數據傾斜的解決方案

參數調節:

  • 對於group by 產生傾斜的問題
    • 開啟map端combiner:【set hive.map.aggr=true;】
    • 開啟負載均衡:【set hive.groupby.skewindata=true;】
      • 有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計划會有兩個 MR Job。
      • 第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;
      • 第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。

SQL 語句調節:

  • 大小表Join:
    • 使用map join讓小的維度表先進內存。在map端完成join,不經過reduce。
  • 大表Join大表:
    • 非法數據太多,比如null,可以把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由於null值關聯不上,處理后並不影響最終結果。
    • 如果null值不要,可以通過where條件篩選掉;
    • 將大量的非法數據轉化成隨機數+字符串,這樣兩個表的數據不會join在一起。
  • count distinct大量相同特殊值:
    • count distinct時,將值為空的情況單獨處理
    • 如果是計算count distinct,可以不用處理,直接過濾,在最后結果中加1。
    • 如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
    • 采用sum group by的方式來替換 count(distinct) 完成計算。
  • 空值過多,集中到一個reduce處理--設置隨機數(case when a.column is null then concat('test',rand()) else column end = b.column)或過濾。
  • 特殊情況特殊處理:
    • 在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去
    • 比如:group by時維度過小,數據過於集中,數據自身傾斜,比如 北京的用戶比其它地方的用戶多很多
    • 此時可以把北京的數據單獨處理:先把北京的數據分成N塊,每塊的數據進行局部統計,再將每塊的局部統計結果進行匯總,最終統計出結果
    •  

       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM