hive的調優經驗


規范:

1.開發規范

  1. SQL子查詢嵌套不宜超過3層。
  2. 少用或者不用Hint,hive2.0以后增強HiveSQL對於成本調優(CBO)的支持
  3. 避免SQL 代碼的復制、粘貼。如果有多處邏輯一致的代碼,可以將執行結果存儲到臨時表中。
  4. 盡可能使用SQL 自帶的高級命令做操作。在多維統計分析中使用cube、grouping set和rollup等命令去替代多個SQL子句的union all。
  5. 使用set命令,進行配置屬性的更改,要有注釋。
  6. 保持一個查詢語句所處理的表類型單一。例如,一個SQL語句中的表都是ORC類型的表,或者都是Parquet表。
  7. 關注NULL值的數據處理。
  8.  多層嵌套,內層嵌套表的過濾條件不要寫到外層

2.設計規范

  1. 表結構要有注釋。
  2. 列等屬性字段需要有注釋。
  3. 盡量不要使用索引。hive的索引不會自動維護,且hive的數據量很大,一般通過數據存儲格式為orc和parquet來代替。
  4. 創建內部表(托管表)不允許指定數據存儲路徑。使用默認即可。
  5. 分區表分桶表的使用。

3. 命名規范

  1. 表以tb_開頭。
  2. 臨時表以tmp_開頭。
  3. 視圖以v_開頭。
  4. 自定義函數以udf_開頭。
  5. 原始數據所在的庫以db_org_開頭,明細數據所在庫以db_detail_開頭,數據倉庫以db_dw_開頭。

1.改寫sql進行調優

1. hive本身對union all這樣的命令進行了優化。

2. 通過改寫sql,啟動較少的job任務,每啟動一個job,就說明集群多執行了一次MapReduce作業,MapReduce作業越多則代表數據就要經歷更多次的磁盤讀寫和網絡通信。隨着數據量增多,磁盤和網絡的負載會越來越大,耗在每個MapReduce過程的時間延遲也會越來越長。

例如執行多插入模式相比union all的語法會少啟動job任務。

 

 3. 使用grouping sets代替union的SQL優化

 


 

2.hdfs數據本地化率對hive性能產生影響

    在數據大小一定的情況下,500個128M的文件和2個30G的文件 跑hive任務,性能是有差異的,兩者最大的區別在於,后者在讀取文件時,需要跨網絡傳輸,而前者為本地讀寫。數據本地化率問題。

 


 

3.不同數據格式對性能的提升

  hive提供text,sequenceFile,RCFile,ORC,Parquest等格式。

  sequenceFile是一個二進制key/value對結構的平面文件,廣泛應用於MapReduce中。

  Parquet時一種列式存儲格式,兼容多種數據引擎,MapReduce和Spark。

  ORC時對RCFile的一種優化,主流選擇之一。

 


 

4.分區表和分桶表對性能的提升

 分區表:

總結:分區表的意思,其實想明白了就很簡單。就是在系統上建立文件夾,把分類數據放在不同文件夾下面,加快查詢速度。 關鍵點1:partitioned by (dt String,country string); 創建表格時,指明了這是一個分區表。將建立雙層目錄,第一次目錄的名字和第二層目錄名字規則 PARTITIONED BY子句中定義列,是表中正式的列,成為分區列。但是數據文件中並沒有這些值,僅代表目錄。 關鍵點2: partition (dt='2001-01-01',country='GB'); 上傳數據時,把數據分別上傳到不同分區中。也就是分別放在不同的子目錄下。 理解分區就是文件夾分而治之,查詢的時候可以當作列名來顯示查詢的范圍。

 

 動態分區表: 

關閉嚴格分區模式 動態分區模式時是嚴格模式,也就是至少有一個靜態分區。 set hive.exec.dynamic.partition.mode=nonstrict    //分區模式,默認nostrict set hive.exec.dynamic.partition=true            //開啟動態分區,默認true set hive.exec.max.dynamic.partitions=1000        //最大動態分區數,默認1000

 

為什么要使用動態分區呢,我們舉個例子,假如中國有50個省,每個省有50個市,每個市都有100個區,那我們都要使用靜態分區要使用多久才能搞完。所有我們要使用動態分區。

動態分區默認是沒有開啟。開啟后默認是以嚴格模式執行的,在這種模式下需要至少一個分區字段是靜態的。
這有助於阻止因設計錯誤導致導致查詢差生大量的分區。列如:用戶可能錯誤使用時間戳作為分區表字段。然后導致每秒都對應一個分區!這樣我們也可以采用相應的措施:

 

分桶表:

每一個表或者分區,Hive可以進一步組織成桶。也就是說,桶為細粒度的數據范圍划分。 分桶規則:對分桶字段值進行哈希,哈希值除以桶的個數求余,余數決定了該條記錄在哪個桶中,也就是余數相同的在一個桶中。分桶不會改變原有表和原有分區目錄的組織方式。只是更改了數據在文件中的分布。 優點:1、提高join查詢效率 2、提高抽樣效率

 

可以用 desc formatted [表名] 來查看目錄組織方式

 


 

5.干預sql的運行方式

  • 改寫sql,實現對計算引擎執行過程的調優
  • 通過sql-hint語法,實現對計算引擎執行過程的干預
  • 通過數據庫開放的一些配置,實現對計算引擎的干預

  具體如下:

1.使用grouping sets grouping__id rollup cube等代替group by +union all

hive對group by+ union all的寫法進行了優化

 

2.使用 group by 來代替 distinct

在數據沒有發生數據傾斜的情況下,或者數據量較小時,采用distinct要比group by要好

默認情況下,distinct會被hive翻譯成一個全局唯一reduce任務來做去重操作,因而並行度為1,而且有導致數據傾斜的可能。

而group by則會被hive翻譯成分組聚合運算,會有多個reduce任務並行處理,每個reduce對收到的一部分數據組,進行每組聚合(去重)

注意:

最新的hive版本中:新增了對count(distinct)的優化,通過配置hive.optimize.countdistinct
即使真的出現數據傾斜也可以自動優化。

 

3.使用Hinit

使用mapjoin(b) 括號中指定的是數據量較小的表,表示在map階段完成a,b兩表的連接。

將原來在Reduce中進行的連接操作,前推到了Map階段。

SELECT /* + MAPJOIN(b) */ a.key,a.value FROM a JOIN b ON a.key = b.key;

 

大表在右邊使用streamtable的sql

--STRSEAMTABLE(),括號中指定數據量大的表 --默認情況下,在reduce階段進行連接,hive把坐標中的數據放在緩存中,右表的數據作為流數據表
SELECT /*+ STREAMTABLE(a) */ a.val,b.val,c.val
FROM a
JOIN b ON (a.key = b.key)
JOIN c ON (c.key = b.key)

 

普通表的join又被稱為 Replartition Join,通常shuflle操作發生在此階段

也可以通過設置hive.smalltable.filesize or hive.mapjoin.smalltable.filesize 
如果大小表在進行連接時,小表連接小於這個默認值,則自動開啟Mapjoin優化,

 


 

4.配置的一些優化

開啟向量化

set hive.vectorized.execution.enabled=true;

目前mapreduce只支持map端的向量化,tez和spark可以支持map和reduce端的向量化操作

默認是關閉的,將一個普通的查詢轉化為向量化查詢。大大減少了掃描,過濾等查詢,標准查詢時系統一次處理一行,矢量化查詢可以一次性查詢1024行數據,減少了系統上下文切換的開銷。

開啟並行化

並行執行是大數據分布式計算的核心概念。SQL開發者提交的每個SQL都會盡量被分解成各個可以並行的任務去執行。

--開啟並行執行 set hive.exec.parallel=true;

 開啟map端聚合

hive.map.aggr 默認值為true

 


 

 

5.調整mapTask數量

set mapred.map.tasks= task數量

 

但是這個並不能完全控制mapTask數量,調節task數量需要一套完整的算法。於mapreduce的切片大小有關。
顧名思義就是將數據進行切分,切分為數據片,其實這個切片關乎於map階段的map個數,以及每個map處理的數據量的大小。
mapreduce中,一個job的map個數, 每個map處理的數據量是如何決定的呢? 另外每個map又是如何讀取輸入文件的內容呢?
用戶是否可以自己決定輸入方式, 決定map個數呢?


mapreduce作業會根據輸入目錄產生多個map任務, 通過多個map任務並行執行來提高作業運行速度,
但如果map數量過少, 並行量低, 作業執行慢, 如果map數過多, 資源有限,也
會增加調度開銷.

因此, 根據輸入產生合理的map數, 為每個map分配合適的數據量, 能有效的提升資源利用率, 並使作業運行速度加快。

 

1.默認情況下,Map的個數defaultNum =目標文件或數據的總大小 totalSize/hdfs 集群文件塊的大小blockSize. 2.當用戶指定mapred.map.tasks,即為用戶期望的Map大小,用expNum表示,但是這個值並不
    會被立即采納。他會獲取mapred.map.tasks與defaultNum的較大值,作為待定選項。
3.獲取文件分片的大小和分片個數,分片大小參數為 mapred.min.split.size 和blockSize間的較大值,
    用splitMaxSize表示,將目標文件或數據總大小除以splitMaxSize 即為真是分片個數,用realSplitNum表示。
4.獲取realSplitNum於expMaxNum 較小值為實際的Map個數。

通過上面的邏輯:

減少Map個數,需要增大mapred.min.split.size的值,減少mapred.map.tasks的值
增大Map個數,需要減少mapred.min.split.size的值,增大mapred.map.tasks的值

在之前的學習的union all案例中,單純的減少,增大map.tasks的數量,並不能改變map個數,讀者可以自行嘗試。

 


 

6.調整reduce相關配置

設置reducer的數量

mapred.reduce.tasks 默認值為-1,代表有系統根據需要自行決定reducer的數量

設置每個reducer能處理的數據量

hive.exec.reducers.bytes.per.reducer  設置每個reducer處理的處理量,默認256M

表示數據量需要按相同的鍵再次聚合,可減少重復的聚合操作

hive.optimize.reducededuplication=true;

 


 

7.使用explain dependency查看數據輸入依賴

explain dependency用於描述一段sql需要的數據來源

explain dependency 有兩個使用場景

注意在使用join時,不同的join,如inner join left join中有非等值過濾條件,過濾效果不同。

場景一:快速排除 快速排除因為讀取不到相應分區的數據而島主任務數據輸出異常,上游任務因為生產過程中不可控因素出現異常或者空跑,導致下游任務引發異常。

場景二:幫助清理表的輸入,特別是有助於理解有多重自查詢,多表連接的依賴輸入。

案例:

下面有兩個sql:

select a.s_no from student_orc_partition a inner join student_orc_partition_only b on a.s_no=b.s_no and a.part=b.part and a.part>=1 and b.part<=2;

 

select a.s_no from student_orc_partition a inner join student_orc_partition_only b on a.s_no=b.s_no and a.part = b.part where a.part>=1 and b.part<=2;

 

通過explain dependency,其實上述的兩個sql並不等價,在內連接中連接條件中假如非等值的過濾條件后,並沒有將內連接的左右兩個表按照過濾條件進行過濾,內連接在執行過程中會多讀取part=0的分區數據

 

案例二:

select a.s_no from student_orc_partition a leftjoin student_orc_partition_only b on a.s_no=b.s_no and a.part=b.part and a.part>=1 and b.part<=2;

 

select a.s_no from student_orc_partition a leftjoin student_orc_partition_only b on a.s_no=b.s_no and a.part=b.part and a.part>=1 and a.part<=2;

 

通過expalin dependency,對於左外連接在連接條件中加入非等值過濾的條件,如果過濾條件是作用於右表(b表)有起到過濾效果,右表只掃描了2個分區,但是左表(a表)會進行全表掃描,

如果過濾條件是針對的是左表,則完全沒有起到過濾的作用,那么兩個表將會進行全表掃描。

所以通常的優化的是盡早過濾掉不需要的數據。

select a.s_no from ( select s_no,part from student_orc_partiton where part>=1 and part<=2 ) a left outer join student_orc_partition_only b on a.s_no=b.s_no and a.part = b.part;    

 


 

Map join的原理

一般的join 都是Repartition Join,發生在shuffle 和Reduce 階段,如果不特殊聲明,就是Repartition Join。

Map join是先啟動一個作業,讀取小表的數據,在內存中構建哈希表,將哈希表寫入本地磁盤,然后將哈希表上傳到HDFS上並添加到分布式緩存中,再啟動一個任務讀取表B的數據,在進行連接時Map對獲取緩存中的數據並存入到哈希表中,B表會與哈希表的數據進行匹配,時間復雜度是O(1),匹配完后將結果進行輸出。

一般不建議使用 hinit /*+mapjoin(b) */ 這樣的用法,最壞的情況下容易發生內存溢出問題。

可以使用配置來嘗試將repartition連接轉化為Map連接,hive.smalltable.filesize

桶的Map 連接將普通的Map連接轉化為桶連接,分桶的Hive表會將桶列的值計算Hash值取桶數的模,余數相同會發往相同的桶,每個桶對應一個文件。在兩表進行連接的時候,可以快速過濾掉不要的數據,

注意使用 桶的map連接要保證連接的兩張表的分桶數之前是倍數關系。


 

Skew Join傾斜連接

當有數據傾斜時的表連接。出現數據傾斜時,會引起個別任務花費大量時間和資源在處理傾斜鍵的數據,從而變為整個作業的瓶頸。Skew Join在工作是會將數據分為兩部分,一部分為傾斜鍵數據,一部分是余下的所有的數據,由兩個作業分別處理。

 

set hive.optimize.skewjoin = true;

 


ORC與hive相關配置

orc.compress 表示orc的文件壓縮類型,可選類型有NONE,ZLIB,SNAPPY

orc.bloom.filter.columns 需要創建布隆過濾的組

orc.bloom.filter.fpp 使用布隆過濾器的假正概率 默認0.05

 

hive中使用bloom過濾器,可以用較少的文件空間快速判定數據是否存在於表中

 


 

hive相關組件的調優

yarn的配置:

YARN Client一般用於測試,YARN Cluster用於實際生產環境。

yarn.nodemanager.resource.cpu-vcores 默認表示集群中每個節點可被分配的虛擬CPU個數為8。

 

為什么這里不是物理CPU個數?

 

因為考慮一個集群中所有的機器配置不可能一樣,即使同樣是16核心的CPU性能也會有所差異,所以YARN在物理CPU和用戶之間加了一層虛擬CPU,一個物理CPU可以被划分成多個虛擬的CPU。

 

yarn.nodemanager.resource.memory-mb 默認表示集群中每個節點可被分配的物理內存是8GB。

yarn.scheduler.minimum-allocation-mb 每個容器請求被分配的最小內存

yarn.scheduler.maximum-allocation-mb 每個容器請求被分配的最大內存

yarn.scheduler.minimum-allocation-vcores 每個容器請求被分配的最小cpu數

yarn.scheduler.minimum-allocation-vcores 每個容器請求被分配的最大cpu數

yarn.nodemanager.resource.percentage-physical-cpu-limit 一個節點內所有容器所能使用的物理CPU的占比,默認為100%即如果一台機器有16核,CPU的使用率最大為1600%,且該比值為100%,則所有容器最多能使用的CPU資源為1600%

 

hdfs配置

  • Hive 作業生成的小文件,過多的小文件會加重NameNode 的負擔,導致集群整體性能下降。
  • 設置合理的HDFS文件塊的大小,可以減輕NameNode的負擔,增加數據本地化操作的概率,提升程序性能。
  • 適當增大NameNode的Java堆,調整JVM的參數可以提升NameNode性能。
  • HDFS 短路讀,Client會繞開DataNode自己去讀取數據

 


從數據處理的角度來說,這些語法本質上可以被分成3種模式,即過濾模式、聚合模式和連接模式。

  1. 過濾模式,即對數據的過濾,從過濾的粒度來看,分為數據行過濾、數據列過濾、文件過濾和目錄過濾4種方式。
  2. 聚合模式,即數據的聚合,數據聚合的同時也意味着在處理數據過程中存在Shuffle的過程。
  3. 連接模式,即表連接的操作,這類操作分為兩大類:有Shuffle的連接操作和無Shuffle的連接操作。

 

count(列):如果列中有null 值,那么這一列不會被記入統計的行數。

count(*):不會出現count(列)在行是null值的情況下,不計入行數的問題。

count(1)和count(*)類似

 

 


數據傾斜

現象就是任務需要處理大量相同鍵的數據,這種情況有以下4中表現:

  • 數據含有大量無意義的數據,如空值(NULL)、空字符串
  • 含有傾斜數據在進行聚合計算時,無法聚合中間結果,大量數據都需要經過Shuffle階段的處理,引起數據傾斜
  • 數據在計算時做多維數據集合,導致維度膨脹引起的數據傾斜
  • 兩表進行join,都含有大量相同的傾斜數據鍵

不可拆分大文件引發的數據傾斜

當對文件使用Gzip壓縮等不支持分揀分割操作的壓縮方式,當以后有作業讀取壓縮文件時,改文件只會被一個任務所讀取,如果該壓縮文件很大,則該map會成為性能瓶頸。

假如一個文件為200M,預先設置每個Map處理數據量為128M,但是計算引擎無法切分這個文件,鎖這個文件不會交給兩個Map任務去讀取,有且只有一個Map任務在操作。

可以采用bzip2和zip等支持文件切分的壓縮算法

 

業務無關的數據引發的數據傾斜

對於空值,NULL這樣的,需要在計算過程中排除這些即可。

解決方案 1:user_id 為空的不參與關聯

select * from log a join user b on a.user_id is not null and a.user_id = b.user_id union all select * from log c where c.user_id is null;

解決方案 2:賦予空值新的 key 值

select * from log a left outer join user b on
case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id

總結

方法 2 比方法 1 效率更好,不但 IO 少了,而且作業數也少了,方案 1 中,log 表 讀了兩次,jobs 肯定是 2,而方案 2 是 1。這個優化適合無效 id(比如-99,’’,null)產 生的數據傾斜,把空值的 key 變

成一個字符串加上一個隨機數,就能把造成數據傾斜的 數據分到不同的 reduce 上解決數據傾斜的問題。

改變之處:使本身為 null 的所有記錄不會擁擠在同一個 reduceTask 了,會由於有替代的 隨機字符串值,而分散到了多個 reduceTask 中了,由於 null 值關聯不上,處理后並不影響最終結果。

 

多維度聚合計算數據膨脹引起的數據傾斜

對於如下場景 select a,b,c count(1) from T group by a,b,c with rollup;

對於上面這個sql 可以拆分為 (a,b,c),(a,b,null),(a,null,null),(null,null,null)

方法一:手動拆分這個sql

方法二:可以通過參數來自動控制作業的拆解,hive.new.job.grouping.cardinality 針對grouping sets ,rollup,cubes 這類多維度聚合操作,如果最后拆解的組合大於默認配置,會啟動信的任務去處理大於該值之外的組合

 

兩個hive數據表連接時引發的數據傾斜

兩個普通表進行Repartition join時,如果表連接鍵存在傾斜,那么shuffle階段必然會引起數據傾斜

通常這種情況還是啟用兩個作業,第一個作業處理沒有傾斜的數據,第二個作業將傾斜的數據存到分布式緩存中,分到各個Map任務所在節點,在Map階段完成join操作,避免shuffle,從而避免數據傾斜。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM