Hadoop Hive概念學習系列之hive里的優化和高級功能(十四)


  在一些特定的業務場景下,使用hive默認的配置對數據進行分析,雖然默認的配置能夠實現業務需求,但是分析效率可能會很低。

Hive有針對性地對不同的查詢進行了優化。在Hive里可以通過修改配置的方式進行優化。

  以下,幾種方式調優的屬性。

 

1、列裁剪

  在通過Hive讀取數據的時候,並不是所有的需求都要獲取表內的所有的數據。有些只需要讀取所有列中的幾列,而忽略其他列的的數據。

例如,表Table1包含5個列Column1、Column2、Column3、Column4、Column5。下面的語句只會在表Table1中讀取Column1、Column2、Column5三列,

Column3和Column4將被忽略。

  SELECT Column1,Column2  FROM Table1 WHERE Column5<1000;

  列裁剪的設置為hive.optimize.cp,默認為true。

  

2、分區裁剪

  在Hive中,可以根據多個維度對Hive表進行分區操作,且分區也可以多層嵌套。當有需要對目標表的某一個區域內的數據進行分析而不需要設計其他區域時,可以使用分區裁剪,將目標區域以條件的形式放在HiveQL中。

SELECT * FROM(SELECT c1 FROM T GROUP BY c1) idi

  WHERE idi.prtn = 100;

SELECT * FROM T1 JOIN(SELECT * FROM T2) idi ON (T1.c1=idi.c2)

  WHERE idi.prtn=100;

  以上語句在執行時,會在子查詢中考慮idi.prtn=100這個條件,從而減少讀入的數據的分區數目。分區裁剪的配置為hive.optimize.pruner,默認為hive。

 

3、join

  Hive同樣支持join多表連接查詢,例如內連接、外連接(左外連接、右外連接、全外連接)、半連接等。對於一條語句中有多個join的情況,當join的條件相同時,即使有多張表,都會合並為一個MapReduce,並不是多個MapReduce。

SELECT pv.ppid, u.name FROM propertie_view pc
JOIN user u ON(pv.userid = u.userid)
JOIN usergroup x ON(u.userid = x.userid)

  如果join的條件不相同,MapReduce的任務數目和join操作的數據是對應的,例如:

SELECT pv.ppid, u.name FROM propertie_view pv
JOIN user u ON(pv.userid = u.userid)
JOIN usergroup x ON(u.age = x.age)

 

  在使用寫有join操作的查詢語句時,由於在join操作的Reduce階段,join執行時會將join操作符左邊的表的內容加載進內存,所以寫語句時應將條目少的表/子查詢放在join操作符的左邊,這樣可以有效減少內存溢出錯誤的幾率。

 

 

 

4、MapJoin

  MapJoin的合理使用同樣能起到調優的效果。在實際的應用中有可能遇到join執行效率很低甚至不能執行的狀況。例如,需要做不等值join,或者在join時有一個表極小。由於MapJoin會把小表全部讀入內存中,Join操作在Map階段完成,在Map階段直接將另外一個表的數據和內存中表數據做匹配,所以不會對任務進行速度產生很大影響。即使笛卡爾積也是如此。

  例如:

SELECT /*+ MAPJOIN(pc) */ pv.ppid, u.name
FROM propertie_view pv
JOIN user u ON (pv.userid=u.userid)

 

 

5、Group By優化

  (1)Map端局部聚合

  眾所周知,MapReduce計算框架中Reduce起到聚合操作的作用,但並不是所有的聚合操作都需要在Reduce端完成,很多聚合操作可以先在Map端進行局部聚合,最后在Reduce端做全局聚合得出最終結果。是否在Map端進行聚合由hive.map.agger控制,默認為True。

  而hive.groupby.mapagger.checkinterval用於控制在Map端進行聚合操作的條目數目,默認為100000。

  (2)數據傾斜

  有數據傾斜的時候需要進行負載均衡。是否需要進行負載均衡由hive.groupby.skewindata控制,默認為false。

  當選項設定為true時,編譯器生成的查詢計划會有兩個MR Job。

  第一個MR Job中的Map的輸出結果集合會隨機分布到第一個MR Job的Reduce中,每個Reduce做局部聚合操作作為輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不同的Reduce中,從而實現負載均衡的目的;

  第二個MR Job再根據預處理的數據結果按照Group By Key分布到相應的Reduce中,最后完成最終的聚合操作。

 

 

 

 

 

Hive性能調優

1、優化的常用手段 

2、Hive的數據類型方面的優化

3、Hive的數據類型方面的優化

 

 

 

什么是數據傾斜?  

     由於數據的不均衡原因,導致數據分布不均勻,造成數據大量的集中到一點,造成數據熱點

 

 

Hadoop框架的特性 

  不怕數據大,怕數據傾斜
  jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯多次匯總,產生十幾個jobs,耗時很長。
  原因是map reduce作業初始化的時間是比較長的
  sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端的匯總合並優化,使數據傾斜不成問題
  count(distinct ),在數據量大的情況下,效率較低,因為count(distinct)是按group by 字段分組,按distinct字段排序,一般這種分布方式是很傾斜的

 

 

1、優化的常用手段 

  解決數據傾斜問題

  減少job數

  設置合理的map reduce的task數,能有效提升性能。

  了解數據分布,自己動手解決數據傾斜問題是個不錯的選擇

  數據量較大的情況下,慎用count(distinct)。

  對小文件進行合並,是行至有效的提高調度效率的方法。

  優化時把握整體,單個作業最優不如整體最優。

 

 

 

 

 

2、Hive的數據類型方面的優化

優化原則

  合理的設置Buckets。在一些大數據join的情況下,map join有時候會內存不夠。
  如果使用Bucket Map Join的話,可以只把其中的一個bucket放到內存中,內存中原來放不下的內存表就變得可以放下。
  這需要使用buckets的鍵進行join的條件連結,並且需要如下設置

  set hive.optimize.bucketmapjoin = true

 

 

 

 

3、Hive的操作方面的優化

3.1 全排序

  Hive的排序關鍵字是SORT BY,
  它有意區別於傳統數據庫的ORDER BY也是為了強調兩者的區別–SORT BY只能在單機范圍內排序。

 

3.2 做笛卡爾積

  當Hive設定為嚴格模式(hive.mapred.mode=strict)時,不允許在HQL語句中出現笛卡爾積
  MapJoin是的解決辦法
  MapJoin,顧名思義,會在Map端完成Join操作。這需要將Join操作的一個或多個表完全讀入內存
  MapJoin的用法是在查詢/子查詢的SELECT關鍵字后面添加/*+ MAPJOIN(tablelist) */提示優化器轉化為MapJoin(目前Hive的優化器不能自動優化MapJoin)
  其中tablelist可以是一個表,或以逗號連接的表的列表。tablelist中的表將會讀入內存,應該將小表寫在這里
  在大表和小表做笛卡爾積時,規避笛卡爾積的方法是,給Join添加一個Join key,原理很簡單:將小表擴充一列join key,並將小表的條目復制數倍,join key各不相同;將大表擴充一列join key為隨機數。

 

 

3.3 控制Hive的Map數

  通常情況下,作業會通過input的目錄產生一個或者多個map任務
  主要的決定因素有:
  input的文件總個數,
  input的文件大小,
集群設置的文件塊大小(目前為128M, 可在hive中通過set dfs.block.size;命令查看到,該參數不能自定義修改)

 

是不是map數越多越好?
  答案是否定的。如果一個任務有很多小文件(遠遠小於塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。

 

是不是保證每個map處理接近128m的文件塊,就高枕無憂了?
  答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。

 

針對上面的問題3和4,我們需要采取兩種方式來解決:即減少map數和增加map數;

  舉例
a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數
b)假設input目錄下有3個文件a,b,c,大小分別為10m,20m,130m,那么hadoop會分隔成4個塊(10m,20m,128m,2m),從而產生4個map數
即如果文件大於塊大小(128m),那么會拆分,如果小於塊大小,則把該文件當成一個塊

 

 

 

3.4 決定reducer個數

  Hadoop MapReduce程序中,reducer個數的設定極大影響執行效率

  不指定reducer個數的情況下,Hive會猜測確定一個reducer個數,基於以下兩個設定:
參數1:hive.exec.reducers.bytes.per.reducer(默認為1G)
參數2 :hive.exec.reducers.max(默認為999)

 

  計算reducer數的公式
N=min(參數2,總輸入數據量/參數1)

  依據Hadoop的經驗,可以將參數2設定為0.95*(集群中TaskTracker個數)
  reduce個數並不是越多越好
  同map一樣,啟動和初始化reduce也會消耗時間和資源;

  另外,有多少個reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題。


什么情況下只有一個reduce?
  很多時候你會發現任務中不管數據量多大,不管你有沒有設置調整reduce個數的參數,任務中一直都只有一個reduce任務;

  其實只有一個reduce任務的情況,除了數據量小於hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:
a)沒有group by的匯總
b)用了Order by

 

 

 

 

3.5 合並 MapReduce 操作

Multi-group by
  Multi-group by是Hive的一個非常好的特性,它使得Hive中利用中間結果變得非常方便。

 

FROM log
insert overwrite table test1 select log.id group by log.id
insert overwrite table test2 select log.name group by log.name

  上述查詢語句使用了Multi-group by特性連續group by了2次數據,使用不同的group by key。這一特性可以減少一次MapReduce操作。

 

Bucket 與 Sampling
  Bucket是指將數據以指定列的值為key進行hash,hash到指定數目的桶中。這樣就可以支持高效采樣了。

 

  Sampling可以在全體數據上進行采樣,這樣效率自然就低,它還是要去訪問所有數據。而如果一個表已經對某一列制作了bucket,就可以采樣所有桶中指定序號的某個桶,這就減少了訪問量。

  如下例所示就是采樣了test中32個桶中的第三個桶。
SELECT * FROM test 、、、TABLESAMPLE(BUCKET 3 OUT OF 32);

 

 

 

3.6 JOIN 原則

  在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊

  原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率。

 

Map Join
Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到。

 

例如:
INSERT OVERWRITE TABLE phone_traffic
SELECT /*+ MAPJOIN(phone_location) */ l.phone,p.location,l.traffic from phone_location p join log l on (p.phone=l.phone)

相關的參數為:

hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000

 

 

 

 

3.7 Group By

  Map 端部分聚合
並不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果基於 Hash

  參數包括:
hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True
hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目。

 

  有數據傾斜的時候進行負載均衡
hive.groupby.skewindata = false
  當選項設定為 true,生成的查詢計划會有兩個 MR Job。第一個 MR Job 中,
  Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,
  這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;
  第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),
最后完成最終的聚合操作。

 

 

 

3.8 合並小文件

  文件數目過多,會給 HDFS 帶來壓力,並且會影響處理效率,可以通過合並 Map 和 Reduce 的結果文件來消除這樣的影響:

hive.merge.mapfiles = true 是否和並 Map 輸出文件,默認為 True
hive.merge.mapredfiles = false 是否合並 Reduce 輸出文件,默認為 False
hive.merge.size.per.task = 256*1000*1000 合並文件的大小

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM