Hive優化(整理版)


1. 概述

1.1 hive的特征

  1. 可以通過SQL輕松訪問數據的工具,從而實現數據倉庫任務,如提取/轉換/加載(ETL),報告和數據分析;
  2. 它可以使已經存儲的數據結構化;
  3. 可以直接訪問存儲在Apache HDFS或其他數據存儲系統(如Apache HBase)中的文件;
  4. Hive除了支持MapReduce計算引擎,還支持Spark和Tez這兩種分布式計算引擎;
  5. 它提供類似sql的查詢語句HiveQL對數據進行分析處理;
  6. 數據的存儲格式有多種,比如數據源是二進制格式,普通文本格式等等;

1.2 hive的優勢

  hive強大之處不要求數據轉換成特定的格式,而是利用hadoop本身InputFormat API來從不同的數據源讀取數據,同樣地使用OutputFormat API將數據寫成不同的格式。所以對於不同的數據源,或者寫出不同的格式就需要不同的對應的InputFormat和OutputFormat類的實現。以stored as textFile為例,其在底層java API中表現是輸入InputFormat格式:TextInputFormat以及輸出OutputFormat格式:HiveIgnoreKeyTextOutputFormat。這里InputFormat中定義了如何對數據源文本進行讀取划分,以及如何將切片分割成記錄存入表中。而OutputFormat定義了如何將這些切片寫回到文件里或者直接在控制台輸出。

  Hive擁有統一的元數據管理,所以和Spark、Impala等SQL引擎是通用的。通用是指,在擁有了統一的metastore之后,在Hive中創建一張表,在Spark/Impala中是能用的;反之在Spark中創建一張表,在Hive中也是能用的,只需要共用元數據,就可以切換SQL引擎,涉及到了Spark sql和Hive On Spark。

  不僅如此Hive使用SQL語法,提供快速開發的能力,還可以通過用戶定義的函數(UDF),用戶定義的聚合(UDAF)和用戶定義的表函數(UDTF)進行擴展,避免了去寫mapreducce,減少開發人員的學習成本。Hive中不僅可以使用逗號和制表符分隔值(CSV/TSV)文本文件,還可以使用Sequence File、RC、ORC、Parquet(知道這幾種存儲格式的區別)。當然Hive還可以通過用戶來自定義自己的存儲格式,基本上前面說到幾種格式完全夠了。Hive旨在最大限度地提高可伸縮性(通過向Hadoop集群動態田間更多機器擴展),性能,可擴展性,容錯性以及與其輸入格式的松散耦合。

  數據離線處理,比如日志分析,海量數據結構化分析。

2. Hive函數

Hive的SQL還可以通過用戶定義的函數(UDF),用戶定義的聚合(UDAF)和用戶定義的表函數(UDTF)進行擴展。

當Hive提供的內置函數無法滿足你的業務處理需要時,此時就可以考慮使用用戶自定義函數(UDF)。

UDF、UDAF、UDTF的區別:

  • UDF(User-Defined-Function)一進一出
  • UDAF(User-Defined Aggregation Funcation)聚集函數,多進一出
  • UDTF(User-Defined Table-Generating Functions)一進多出,如lateral view explore()

3. Hive優化

3.1 慎用api

 我們知道大數據場景下不害怕數據量大,害怕的是數據傾斜,怎樣避免數據傾斜,找到可能產生數據傾斜的函數尤為關鍵,數據量較大的情況下,慎用count(distinct),count(distinct)容易產生傾斜問題

3.2 自定義UDAF函數優化

  sum,count,max,min等UDAF,不怕數據傾斜問題,hadoop在map端匯總合並優化,是數據傾斜不成問題。

3.3 設置合理的map reduce的task數量

3.3.1 map階段優化

mapred.min.split.size: 指的是數據的最小分割單元大小;min的默認值是1B
mapred.max.split.size: 指的是數據的最大分割單元大小;max的默認值是256MB
通過調整max可以起到調整map數的作用,減小max可以增加map數,增大max可以減少map數。
需要提醒的是,直接調整mapred.map.tasks這個參數是沒有效果的。

 舉例:

  a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128M的塊和1個12M的塊),從而產生7個map書;

  b) 假設input目錄下有3個文件a,b,c,大小分別為10M,20M,130M,那么hadoop會分隔成4個塊(10M,20M,128M,2M),從而產生4個map數;

  注意:如果文件大於塊大小(128M),那么會拆分,如果小於塊大小,則把該文件當成一個塊。

  其實這就涉及到小文件的問題:如果一個任務有很多小文件(遠遠小於塊大小128M),則每個小文件也會當做一個塊,用一個map任務來完成。

  而一個map任務啟動和初始化的時間遠遠大於邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。那么,是不是保證每個map處理接近128M的文件塊,就高枕無憂了?答案也是不一定。比如有一個127M的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。 

   我們該如何去解決呢???

  我們需要采取兩種方式來解決:即減少map數和增加map數

  • 減少map數量
假設一個SQL任務:
Select count(1) from popt_tbaccountcopy_meswhere pt = '2012-07-04';
該任務的inputdir :  /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04
共有194個文件,其中很多事遠遠小於128M的小文件,總大小9G,正常執行會用194個map任務。
Map總共消耗的計算資源:SLOTS_MILLIS_MAPS= 623,020

通過以下方法來在map執行前合並小文件,減少map數:
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
再執行上面的語句,用了74個map任務,map消耗的計算資源:SLOTS_MILLIS_MAPS= 333,500
對於這個簡單SQL任務,執行時間上可能差不多,但節省了一半的計算資源。
大概解釋一下,100000000表示100M, 
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;這個參數表示執行前進行小文件合並,
前面三個參數確定合並文件塊的大小,大於文件塊大小128m的,按照128m來分隔,
小於128m,大於100m的,按照100m來分隔,把那些小於100m的(包括小文件和分隔大文件剩下的),
進行合並,最終生成了74個塊。
  • 增大map數量
如何適當的增加map數?
當input的文件都很大,任務邏輯復雜,map執行非常慢的時候,可以考慮增加Map數,
來使得每個map處理的數據量減少,從而提高任務的執行效率。

 假設有這樣一個任務:
    Select data_desc,
               count(1),
               count(distinct id),
               sum(case when ...),
               sum(case when ...),
               sum(...)
    from a group by data_desc

如果表a只有一個文件,大小為120M,但包含幾千萬的記錄,如果用1個map去完成這個任務,肯定是比較耗時的,
這種情況下,我們要考慮將這一個文件合理的拆分成多個,
這樣就可以用多個map任務去完成。
    set mapred.reduce.tasks=10;
      create table a_1 as 
      select * from a 
      distribute by rand(123);

這樣會將a表的記錄,隨機的分散到包含10個文件的a_1表中,再用a_1代替上面sql中的a表,則會用10個map任務去完成。
每個map任務處理大於12M(幾百萬記錄)的數據,效率肯定會好很多。

  注意:看上去,貌似這兩種有些矛盾,一個是要合並小文件,一個是要把大文件拆成小文件,這點正是重點需要關注的地方,使單個map任務處理合適的數據量;

3.3.2 reduce階段優化

  Reduce的個數對整個作業的運行性能有很大影響。如果Reduce設置的過大,那么將會產生很多小文件,對NameNode會產生一定的影響,而且整個作業的運行時間未必會減少;如果Reduce設置的過小,那么單個Reduce處理的數據將會加大,很可能會引起OOM異常。

  如果設置了mapred.reduce.tasks/mapreduce.job.reduces參數,那么Hive會直接使用它的值作為Reduce的個數;如果mapred.reduce.tasks/mapreduce.job.reduces的值沒有設置(也就是-1),那么Hive會根據輸入文件的大小估算出Reduce的個數。根據輸入文件估算Reduce的個數可能未必很准確,因為Reduce的輸入是Map的輸出,而Map的輸出可能會比輸入要小,所以最准確的數根據Map的輸出估算Reduce的個數。

1. Hive自己如何確定reduce數:

  reduce個數的設定極大影響任務執行效率,不指定reduce個數的情況下,Hive會猜測確定一個reduce個數,基於以下兩個設定:

  hive.exec.reducers.bytes.per.reducer(每個reduce任務處理的數據量,默認為1000^3=1G)

  hive.exec.reducers.max(每個任務最大的reduce數,默認為999)

  計算reducer數的公式很簡單N=min(參數2,總輸入數據量/參數1)

  即,如果reduce的輸入(map的輸出)總大小不超過1G,那么只會有一個reduce任務;

如:select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 
            /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 總大小為9G多,
  因此這句有10個reduce

2. 調整reduce個數方法一:

  調整hive.exec.reducers.bytes.per.reducer參數的值;

  set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

  select pt, count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;

  這次有20個reduce

3. 調整reduce個數方法二:

  set mapred.reduce.tasks=15;

  select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;

  這次有15個reduce

4. reduce個數並不是越多越好;

  同map一樣,啟動和初始化reduce也會消耗時間和資源;

  另外,有多少個reduce,就會有個多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務的輸入,則也會出現小文件過多的問題;

5. 什么情況下只有一個reduce;

  很多時候你會發現任務中不管數據量多大,不管你有沒有調整reduce個數的參數,任務中一直都只有一個reduce任務;其實只有一個reduce任務的情況,除了數據量小於hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:

  • 沒有group by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt; 寫成select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’; 這點非常常見,希望大家盡量改寫。
  • 用了Order by
  • 有笛卡爾積。

  注意:在設置reduce個數的時候也需要考慮這兩個原則:使大數據量利用合適的reduce數;是單個reduce任務處理合適的數據量;

3.4 小文件合並優化

  我們知道文件數目小,容易在文件存儲端造成瓶頸,給HDFS帶來壓力,影響處理效率。對此,可以通過合並Map和Reduce的結果文件來消除這樣的影響。

  用於設置合並的參數有:

    • 是否合並Map輸出文件:hive.merge.mapfiles=true(默認值為true)
    • 是否合並Reduce端輸出文件:hive.merge.mapredfiles=false(默認值為false)
    • 合並文件的大小:hive.merge.size.per.task=256*1000*1000(默認值為256000000)

3.4.1 Hive優化之小文件問題及其解決方案:

  小文件是如何產生的:

    • 動態分區插入數據,產生大量的小文件,從而導致map數量劇增;
    • reduce數量越多,小文件也越多(reduce的個數和輸出文件是對應的);
    • 數據源本身就包含大量的小文件。

  小文件問題的影響:

    • 從Hive的角度看,小文件會開很多map,一個map開一個JVM去執行,所以這些任務的初始化,啟動,執行會浪費大量的資源,嚴重影響性能。
    • 在HDFS中,每個小文件對象約占150byte,如果小文件過多會占用大量內存。這樣NameNode內存容量嚴重制約了集群的擴展。

  小文件問題的解決方案:

    從小文件產生的途徑就可以從源頭上控制小文件數量,方法如下:

    • 使用Sequencefile作為表存儲格式,不要用textfile,在一定程度上可以減少小文件;
    • 減少reduce的數量(可以使用參數進行控制);
    • 少用動態分區,用時記得按distribute by分區;

    對於已有的小文件,我們可以通過以下幾種方案解決:

    • 使用hadoop archive命令把小文件進行歸檔;
    • 重建表,建表時減少reduce數量;
    • 通過參數進行調節,設置map/reduce端的相關參數,如下:   
//每個Map最大輸入大小(這個值決定了合並后文件的數量)  
set mapred.max.split.size=256000000;    
//一個節點上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合並)  
set mapred.min.split.size.per.node=100000000;  
//一個交換機下split的至少的大小(這個值決定了多個交換機上的文件是否需要合並)    
set mapred.min.split.size.per.rack=100000000;  
//執行Map前進行小文件合並  
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;   

設置map輸出和reduce輸出進行合並的相關參數:
[java] view plain copy
//設置map端輸出進行合並,默認為true  
set hive.merge.mapfiles = true  
//設置reduce端輸出進行合並,默認為false  
set hive.merge.mapredfiles = true  
//設置合並文件的大小  
set hive.merge.size.per.task = 256*1000*1000  
//當輸出文件的平均大小小於該值時,啟動一個獨立的MapReduce任務進行文件merge。  
set hive.merge.smallfiles.avgsize=16000000 

3.5 SQL優化

3.5.1 列裁剪

  Hive在讀數據的時候,可以只讀取查詢中所需要用到的列,而忽略其他列。例如,若有以下查詢:

SELECT a,b FROM q WHERE e<10;

  在實施此項查詢中,Q表有5列(a,b,c,d,e),Hive只讀取查詢邏輯中真實需要的3列a、b、e, 而忽略列c,d;這樣做節省了讀取開銷,中間表存儲開銷和數據整合開銷。

  裁剪對應的參數項為:hive.optimize.cp=true(默認值為真)

3.5.2 分區裁剪

  可以在查詢的過程中減少不必要的分區。例如,若有以下查詢:

SELECT * FROM (SELECT a1, COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; # (多余分區)
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;

  查詢語句若將"subq.prtn=100"條件放入子查詢中更為高效,可以減少讀入的分區數目。Hive自動執行這種裁剪優化。

  分區參數為:hive.optimize.pruner=true(默認值為真)

3.5.3 熟練使用SQL提高查詢

  熟練地使用SQL,能寫出高效率的查詢語句。

  場景:有一張user表,為賣家每天收到表,user_id,ds(日期)為key,屬性有主營類目,指標有交易金額,交易筆數。每天要取前10天的總收入,總筆數,和最近一天的主營類目。

  解決方法 1 如下所示:常用方法

INSERT OVERWRITE TABLE t1
SELECT user_id, substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
WHERE ds=20120329 // 20120329 為日期列的值,實際代碼中可以用函數表示當天日期GROUP BY user_id;

INSERT OVERWRITE TABLE t2
SELECT user_id,sum(qty) AS qty, SUM(amt) AS amt FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id;

SELECT t1.user_id, t1.main_cat, t2.qty, t2.amt FROM t1
JOIN t2 ON t1.user_id=t2.user_id

  下面給出方法1的思路,實現步驟如下:

  第一步:利用分析函數,取每個user_id最近一天的主營類目,存入臨時表t1;

  第二步:匯總10天的總交易金額,交易筆數,存入臨時表t2;

  第三步:關聯t1、t2,得到最終的結果。

  解決方法 2 如下所示:優化方法

SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id

  在工作中我們總結出:方案2的開銷等於方案1的第二步開銷,性能提升,由原有的25分鍾完成,縮短為10分鍾以內完成。節省了兩個臨時表的讀寫是一個關鍵原因,這種方式也適用於Oracle中的數據查找工作。

  SQL具有普適性,很多SQL通用的優化方案在Hadoop分布式計算方式中也可以達到效果。

3.5.5 不同數據類型關聯產生的傾斜問題

  問題:不同數據類型id的關聯會產生數據傾斜問題。

  一張表的s8的日志,每個商品一條記錄,要和商品表關聯。但關聯卻碰到傾斜的問題。s8的日志中有32位字符串商品id,也有數值商品id,日志中類型是string的,但商品中的數值id是bigint的。猜想問題的原因是把s8的商品id轉成數值id做hash來分配Reduce,所以字符串id的s8日志,都到一個Reduce上了,解決的方法驗證了這個猜測。

  解決方法:把數據類型轉換成字符串類型

SELECT * FROM s8_log a LEFT OUTER
JOIN r_auction_auctions b ON a.auction_id=CAST(b.auction_id AS STRING)

  調優結果顯示:數據表處理由1小時30分鍾經代碼調整后可以在20分鍾內完成。

3.5.6 利用Hive對UNION ALL優化的特性

  多表union all會優化成一個job。

  問題:比如推廣效果表要和商品表關聯,效果表中的auction_id列既有32位字符串商品id,也有數字id,和商品表關聯得到商品的信息。

  解決方法:Hive SQL性能會比較好

SELECT * FROM effect a
JOIN
(SELECT auction_id AS auction_id FROM auctions
UNION ALL
SELECT auction_string_id AS auction_id FROM auctions) b
ON a.auction_id=b.auction_id

  比分別過濾數字id,字符串id然后分別和商品表關聯性能要好。

  這樣寫的好處:1個MapReduce作業,商品表只讀一次,推廣效果表只讀取一次。把這個SQL換成Map/Reduce代碼的話,Map的時候,把a表的記錄打上標簽a,商品表記錄每讀取一條,打上標簽b,變成兩個<key, value>對,<(b,數字id),value>,<(b,字符串id),value>。

  所以商品表的HDFS讀取只會是一次。

3.5.7 解決Hive對UNION ALL優化的短板

  Hive對union all的優化的特性:對union all優化只局限於非嵌套查詢

  • 消滅子查詢內的group by

  示例1:子查詢內有group by

SELECT * FROM
(SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3

  從業務邏輯上說,子查詢內的GROUP BY怎么看都是多余(功能上的多余,除非有COUNT(DISTINCT)),如果不是因為Hive Bug或者性能上的考量(曾經出現如果不執行子查詢GROUP BY,數據得不到正確的結果的Hive Bug)。所以這個Hive按經驗轉換成如下所示:

SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2) t3 GROUP BY c1,c2,c3

  調優結果:經過測試,並未出現union all的Hive Bug,數據是一致的。MapReduce的作業數由3減少到1。

  t1相當於一個目錄,t2相當於一個目錄,對Map/Reduce程序來說,t1、t2可以作為Map/Reduce作業的mutli inputs。這可以通過一個Map/Reduce來解決這個問題。Hadoop的計算框架,不怕數據多,就怕作業數多。

  但如果換成是其他計算平台如Oracle,那就不一定了,因為把大輸入拆成兩個輸入,分別排序匯總成merge(假如兩個子排序是並行的話),是有可能性能更優的(比如希爾排序比冒泡排序的性能更優)。

  • 消滅子查詢內的COUNT(DISTINCT),MAX,MIN
SELECT * FROM 
(SELECT * FROM t1
UNION ALL SELECT c1,c2,c3 count(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
GROUP BY c1,c2,c3

  由於子查詢里頭有COUNT(DISTINCT)操作,直接去GROUP BY將達不到業務目標。這時采用臨時表消滅COUNT(DISTINCT)作業不但能解決傾斜問題,還能有效減少jobs。

INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
(SELECT c1,c2,c3,income,0 AS uv FROM t1
UNION ALL
SELECT c1,c2,c3,0 AS income, 1 AS uv FROM t2) t3
GROUP BY c1,c2,c3;

  job數是2,減少一半,而且兩次Map/Reduce比COUNT(DISTINCT)效率更高。

  調優結果:千萬級別的類目表,member表,與10億級的商品表關聯。原先1963s的任務經過調整,1152s即完成。

  • 消滅子查詢內的JOIN  
SELECT * FROM
(SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
GROUP BY c1,c2;

  上面代碼運行會有5個jobs。加入先JOIN生存臨時表的話t5,然后UNION ALL,會變成2個jobs。

INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);

  調優結果顯示:針對千萬級別的廣告位表,由原先5個Job共15分鍾,分解為2個job,一個8-10分鍾,一個3分鍾。

3.5.8 COUNT(DISTINCT)

  計算uv的時候,經常會用到COUNT(DISTINCT),但在數據比較傾斜的時候COUNT(DISTINCT)會比較慢。這時可以嘗試用GROUP BY改寫代碼計算uv。數據量小的時候無所謂,數據量大的情況下,由於COUNT DISTINCT操作需要用一個Reduce Task來完成,這一個Reduce需要處理的數據量太大,就會導致整個Job很難完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替換:

  • 原有代碼
①
INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329) 
SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid;

②
select count(distinct id) from bigtable;

  關於COUNT(DISTINCT)的數據傾斜問題不能一概而論,要依情況而定,下面是我測試的一組數據:

  測試數據:169857條

①
#統計每日IP
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS FROM logdfs WHERE logdate='2014_12_29';
耗時:24.805 seconds
#統計每日IP(改造)
CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp;
耗時:46.833 seconds

②
 select count(id) from (select id from bigtable group by id) a;

測試結果表明:明顯改造后的語句比之前耗時,這時因為改造后的語句有2個SELECT,多了一個job,這樣在數據量小的時候,數據不會存在傾斜問題。

3.5.9 JOIN操作

3.5.9.1 小表、大表JOIN

  在使用寫有Join操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在Join操作符的左邊。原因是在Join操作的Reduce階段,位於Join操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生OOM錯誤的幾率;再進一步,可以使用Group讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce。

  實際測試發現:新版的hive已經對小表JOIN大表和大表JOIN小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。

  案例實操:

  (1)關閉mapjoin功能(默認是打開的)

    set hive.auto.convert.join=false;

  (2)執行小表JOIN大表語句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable  b
on b.id = s.id;

  Time taken: 35.921 seconds

  (3)執行大表JOIN大表語句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
left join smalltable  s
on s.id = b.id;

  Time taken: 34.196 seconds;

3.5.9.2 大表JOIN大表

1)空Key過濾

  問題:日志中常會出現信息丟失,比如每日約為20億的全網日志,其中的user_id為主鍵,在日志收集過程中會丟失,出現主鍵為null的情況,如果取其中的user_id和bmw_users關聯,就會碰到數據傾斜的問題。原因是Hive中,主鍵為null值的項會被當做相同的Key而分配進同一個計算Map。

  解決方法1:user_id為空的不參與關聯,子查詢過濾null

SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL

  解決方法2 如下所示:函數過濾null

SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive', RAND()) ELSE a.user_id END = b.user_id;

  調優結果:原先由於數據傾斜導致運行時長超過1小時,解決方法1運行每日平均時長25分鍾,解決方法2運行的每日平均時長在20分鍾左右。優化效果很明顯。

  我們在工作中總結出:解決方法2比解決方法1效果更好,不但IO少了,而且作業數也少了。解決方法1中log讀取兩次,job數為2。解決方法2中job數是1。這個優化適合無效id(比如-99,‘’,null等)產生的傾斜問題。把空值的key變成一個字符串加上隨機數,就能把傾斜的數據分到不同的Reduce上,從而解決數據傾斜問題。因為空值不參與關聯,即使分到不同的Reduce上,也不會影響最終的結果。附上Hadoop通用關聯的實現方法是:關聯通過二次排序實現的,關聯的列為partition key,關聯的列和表的tag組成排序的group key,根據partition key分配Reduce。同一Reduce內根據group key排序。

3.5.9.3 MAP JOIN操作

  如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。可以用MapJoin把小表全部加載到內存在map端進行join,避免reducer處理。

  • 開啟MapJoin參數設置:

    1) 設置自動選擇MapJoin

      set hive.auto.convert.join = true;默認為true

    2) 大表小表的閥值設置(默認25M一下認為是小表):

      set hive.mapjoin.smalltable.filesize=25000000;

  • MapJoin工作機制

  

 

  上圖是Hive MapJoin的原理圖,從圖中可以看出MapJoin分為兩個階段:

  (1)通過MapReduce Local Task,將小表讀入內存,生成內存HashTableFiles上傳至Distributed Cache中,這里會對HashTableFiles進行壓縮。

  (2)MapReduce Job在Map階段,每個Mapper從Distributed Cache讀取HashTableFiles到內存中,順序掃描大表,在Map階段直接進行Join,將數據傳遞給下一個MapReduce任務。也就是在map端進行join避免了shuffle。

  Join操作在Map階段完成,不再需要Reduce,有多少個Map Task,就有多少個結果文件。

  實例:

  (1)開啟MapJoin功能

    set hive.auto.convert.join = true; 默認為true

  (2)執行小表JOIN大表語句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable  b
on s.id = b.id;

  Time taken: 24.594 seconds

  (3)執行大表JOIN小表語句

insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
join smalltable  s
on s.id = b.id;

  Time taken: 24.315 seconds

3.5.9.3 GROUP BY操作

  默認情況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。進行GROUP BY操作時需要注意以下幾點:

  • Map端部分聚合

  事實上並不是所有的聚合操作都需要在reduce部分進行,很多聚合操作都可以先在Map端進行部分聚合,然后reduce端得出最終結果。

  (1)開啟Map端聚合參數設置

    set hive.map.aggr=true

  (2)在Map端進行聚合操作的條目數目

    set hive.grouby.mapaggr.checkinterval=100000

  (3)有數據傾斜的時候進行負載均衡(默認是false)

    set hive.groupby.skewindata = true

  • 有數據傾斜時進行負載均衡

  此處需要設定hive.groupby.skewindata,當選項設定為true時,生成的查詢計划有兩個MapReduce任務。在第一個MapReduce中,map的輸出結果集合會隨機分布到reduce中,每個reduce做部分聚合操作,並輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的reduce中,從而達到負載均衡的目的;第二個MapReduce任務再根據預處理的數據結果按照Group By Key分布到reduce中(這個過程可以保證相同的Group By Key分布到同一個reduce中),最后完成最終的聚合操作。

3.5.10 優化in/exists語句

  雖然經過測驗,hive1.2.1也支持in/exists操作,但還是推薦使用hive的一個高效替代方案:left semi join

  比如說:    

      select a.id, a.name from a where a.id in (select b.id from b);
      select a.id, a.name from a where exists (select id from b where a.id = b.id);

  應該轉換成:

      select a.id, a.name from a left semi join b on a.id = b.id;

3.5.11 排序選擇

  • cluster by: 對同一字段分桶並排序,不能和sort by連用;
  • distribute by + sort by: 分桶,保證同一字段值只存在一個結果文件當中,結合sort by 保證每個reduceTask結果有序;
  • sort by: 單機排序,單個reduce結果有序
  • order by:全局排序,缺陷是只能使用一個reduce

 

3.6 存儲格式

  可以使用列裁剪,分區裁剪,orc,parquet等這些列式存儲格式,因為列式存儲的表,每一列的數據在物理上是存儲在一起的,Hive查詢時會只遍歷需要列數據,大大減少處理的數據量。

  Hive支持ORCfile,這是一種新的表格存儲格式,通過諸如謂詞下推,壓縮等技術來提高執行速度提升。對於每個HIVE表使用ORCfile應該是一件容易的事情,並且對於獲得HIVE查詢的快速響應時間非常有益。

  作為一個例子,考慮兩個大表A和B(作為文本存儲,其中一些列未在此處指定,即行式存儲的缺點)以及一個簡單的查詢,如:

  SELECT A.customerID,A.name,A.age,A.address join B.role,B.department,B.salary ON A.customerID=B.customerID;

  此查詢可能需要很長時間才能執行,因為表A和B都以TEXT形式存儲,進行全表掃描。

  將這些表格轉換為ORCFile格式通常會顯着減少查詢時間;

  ORC支持壓縮存儲(使用ZLIB或如上所示使用SNAPPY),但也支持未壓縮的存儲。

CREATE TABLE A_ORC (
  customerID int,name string,age int, address string
) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY");

INSERT INTO TABLE A_ORC SELECT * FROM A;

CREATE TABLE B_ORC (
       customerID int, role string, salary float, department string
) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY");

INSERT INTO TABLE B_ORC SELECT * FROM B;

SELECT A_ORC.customerID, A_ORC.name, A_ORC.age, A_ORC.address join B_ORC.role,B_ORC.department, B_ORC.salary
ON A_ORC.customerID=B_ORC.customerID;

3.7 壓縮格式

  大數據場景下存儲格式壓縮格式尤為關鍵,可以提升計算速度,減少存儲空間,降低網絡io,磁盤io,所以要選擇合適的壓縮格式和存儲格式,那么首先就了解這些東西。參考該博客

3.7.1 壓縮的原因

  Hive最終是轉為MapReduce程序來執行的,而MapReduce的性能瓶頸在於網絡IO和磁盤IO,要解決性能瓶頸,最主要的是減少數據量,對數據進行壓縮是個好的方式。壓縮雖然是減少了數據量,但是壓縮過程要消耗CPU的,但是在Hadoop中,往往性能瓶頸不在於CPU,CPU壓力並不大,所以壓縮充分利用了比較空閑的CPU。

3.7.2 常用壓縮方法對比

    

  各個壓縮方式所對應的Class類:

  

3.7.3 壓縮方式的選擇

  壓縮比率,壓縮解壓縮速度,是否支持Split

3.7.4 壓縮使用

  Job輸出文件按照block以Gzip的方式進行壓縮:

set mapreduce.output.fileoutputformat.compress=true // 默認值是 false
set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默認值是 Record
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec

  Map輸出結果也以Gzip進行壓縮:

set mapred.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默認值是 org.apache.hadoop.io.compress.DefaultCodec 

  對Hive輸出結果和中間都進行壓縮:

set hive.exec.compress.output=true // 默認值是 false,不壓縮
set hive.exec.compress.intermediate=true // 默認值是 false,為 true 時 MR 設置的壓縮才啟用

3.8 引擎的選擇

  Hive可以使用Apache Tez執行引擎而不是古老的Map-Reduce引擎。沒有在環境中沒有默認打開,在Hive查詢開頭將以下內容設置為‘true’來使用Tez:“設置hive.execution.engine = tez; ”,通過上述設置,你執行的每個HIVE查詢都將利用Tez。目前Hive On Spark還處於試驗階段,慎用。

3.9 使用向量化查詢

  向量化查詢執行通過一次性批量執行1024行而不是每次單行執行,從而提供掃描、聚合、篩選器和連接等操作的性能。在Hive 0.13中引入,此功能顯着提高了查詢執行時間,並可通過兩個參數設置輕松啟用: 

  設置hive.vectorized.execution.enabled = true;
  設置hive.vectorized.execution.reduce.enabled = true;

3.10 設置cost based query optimization

  Hive自0.14.0開始,加入了一項“Cost based Optimizer”來對HQL執行計划進行優化,這個功能通過“hive.cbo.enable”來開啟。在Hive 1.1.0之后,這個feature是默認開啟的,它可以自動優化HQL中多個JOIN的順序,並選擇合適的JOIN算法。

  Hive在提供最終執行前,優化每個查詢的執行邏輯和物理執行計划。這些優化工作是交給底層來完成的。根據查詢成本執行進一步的優化,從而產生潛在的不同決策:如何排序連接,執行哪種類型的連接,並行度等等。要使用基於成本的優化(也稱為CBO),請在查詢開始設置以下參數:

  設置hive.cbo.enable = true;
  設置hive.compute.query.using.stats = true;
  設置hive.stats.fetch.column.stats = true;
  設置hive.stats.fetch.partition.stats = true;

3.11 模式選擇

  • 本地模式

  對於大多數情況,Hive可以通過本地模式在單台機器上處理所有任務。對於小數據,執行時間可以明顯被縮短。通過set hive.exec.mode.local.auto = true(默認為false)設置為本地模式,本地模式涉及到三個參數:

   set hive.exec.mode.local.auto=true; 是打開hive自動判斷是否啟動本地模式的開關,但是只是打開這個參數不能保證啟動本地模式,要當map任務數不超過hive.exec.mode.local.auto.input.files.max的個數並且map輸入文件大小不超過hive.exec.mode.local.auto.inputbytes.max所指定的大小時,才能啟動本地模式。

  如下:用戶可以通過設置hive.exec.mode.local.auto的值為true,來讓Hive在適當的時候自動啟動這個優化。

set hive.exec.mode.local.auto=true;  //開啟本地mr
//設置local mr的最大輸入數據量,當輸入數據量小於這個值時采用local  mr的方式,默認為134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
//設置local mr的最大輸入文件個數,當輸入文件個數小於這個值時采用local mr的方式,默認為4
set hive.exec.mode.local.auto.input.files.max=10;
  • 並行模式

  Hive會將一個查詢轉化成一個或多個階段。這樣的階段可以是MapReduce階段、抽樣階段、合並階段、limit階段。默認情況下,Hive一次只會執行一個階段,由於job包含多個階段,而這些階段並非完全相互依賴,即:這些階段可以並行執行,可以縮短整個job的執行時間。設置參數,set hive.exec.parallel=true,或者通過配置文件來完成:

  hive> set hive.exec.parallel;

  • 嚴格模式

  Hive提供一個嚴格模式,可以防止用戶執行那些可能產生意想不到的影響查詢,通過設置Hive.mapred.modestrict來完成。

  set Hive.mapred.modestrict;

3.12 JVM重用

  Hadoop通常是使用派生JVM來執行map和reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含成百上千的task任務的情況。JVM重用可以使得JVM示例在同一個job中時候,通過參數mapred.job.reuse.jvm.num.tasks來設置。

3.13 推測執行

  Hadoop推測執行可以觸發執行一些重復的任務,盡管因對重復的數據進行計算而導致消耗更多的計算資源,不過這個功能的目標是通過加快獲取單個task的結果以偵測執行慢的TaskTracker加入到沒名單的方式來提高整體的任務執行效率。Hadoop的推測執行功能由2個配置控制着,通過mapred-site.xml中配置 

  mapred.map.tasks.speculative.execution=true
  mapred.reduce.tasks.speculative.execution=true

4. 總結

【參考資料】

https://blog.csdn.net/yu0_zhang0/article/details/81776459

https://www.cnblogs.com/smartloli/p/4356660.html

https://blog.csdn.net/zdy0_2004/article/details/81613230

https://blog.51cto.com/12445535/2352789

https://blog.csdn.net/BabyFish13/article/details/52055927

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM