基於CDH 5.9.1 搭建 Hive on Spark 及相關配置和調優


 

  Hive默認使用的計算框架是MapReduce,在我們使用Hive的時候通過寫SQL語句,Hive會自動將SQL語句轉化成MapReduce作業去執行,但是MapReduce的執行速度遠差與Spark。通過搭建一個Hive On Spark可以修改Hive底層的計算引擎,將MapReduce替換成Spark,從而大幅度提升計算速度。接下來就如何搭建Hive On Spark展開描述。

  注:本人使用的是CDH5.9.1,使用的Spark版本是1.6.0,使用的集群配置為4個節點,每台內存32+G,4 Core。

1.   配置Yarn

  Yarn需要配置兩個參數:yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb。yarn.nodemanager.resource.cpu-vcores代表可以為container分配的CPU 內核的數量。yarn.nodemanager.resource.memory-mb代表可分配給容器的物理內存大小。

1)   配置cpu core

  為每個服務分配一個core,為操作系統預留2個core,剩余的可用的core分配給yarn。我使用的集群共有16個core,留出4個,剩余的12個core分配給yarn。

   

 

2)   配置內存

  設置Yarn內存為36G

   

 

2.   配置Spark

  給Yarn分配完資源后,需要配置一些Spark的參數,設置Spark可使用的資源。包括executor和Driver的內存,分配executor和設置並行度。

3)   配置executor內存

  在配置executor的內存大小的時候,需要考慮以下因素:

  • 增加executor的內存可以優化map join。但是會增加GC的時間。
  • 在某些情況下,HDFS客戶端沒有並行處理多個寫請求,在有多個請求競爭資源的時候會出現一個executor使用過多的core。
  • 盡可能的減少空閑的core的個數,cloudera推薦設置spark.executor.cores為4,5,6,這取決於給yarn分配的資源。

  比如說,因為我們有12個core可用,我們可以設置為4,這樣12/4余數為0,設置為5的話會剩余兩個空閑。設置4個可使得空閑的core盡可能的少。

  這樣配置之后我們可以同時運行三個executor,每個executor最多可以運行4個任務(每個core一個)。

  還有一點是要求spark.executor.memoryOverhead和spark.executor.memory的和不能超過yarn.scheduler.maximum-allocation-mb設置的值。我的scheduler請求最大內存分配的是12G。

   

 

4)   配置Driver內存

  Spark Driver端的配置如下:

  • spark.driver.memory---當hive運行在spark上時,driver端可用的最大Java堆內存。
  • spark.yarn.driver.memoryOverhead---每個driver可以額外從yarn請求的堆內存大小。這個參數加上spark.driver.memory就是yarn為driver端的JVM分配的總內存。

  Spark在Driver端的內存不會直接影響性能,但是在沒有足夠內存的情況下在driver端強制運行Spark任務需要調整。

5)   設置executor個數

  集群的executor個數設置由集群中每個節點的executor個數和集群的worker個數決定,如果集群中有3個worker,則Hive On Spark可以使用的executor最大個數是12個(3 * 4)。

  Hive的性能受可用的executor的個數影響很明顯,一般情況下,性能和executor的個數成正比,4個executor的性能大約是2個executor性能的一倍,但是性能在executor設置為一定數量的時候會達到極值,達到這個極值之后再增加executor的個數不會增加性能,反而有可能會為集群增加負擔。

6)   動態分配executor

  設置spark.executor.instances到最大值可以使得Spark集群發揮最大性能。但是這樣有個問題是當集群有多個用戶運行Hive查詢時會有問題,應避免為每個用戶的會話分配固定數量的executor,因為executor分配后不能回其他用戶的查詢使用,如果有空閑的executor,在生產環境中,計划分配好executor可以更充分的利用Spark集群資源。

  Spark允許動態的給Spark作業分配集群資源,cloudera推薦開啟動態分配。

7)   設置並行度

  為了更加充分的利用executor,必須同時允許足夠多的並行任務。在大多數情況下,hive會自動決定並行度,但是有時候我們可能會手動的調整並行度。在輸入端,map task的個數等於輸入端按照一定格式切分的生成的數目,Hive On Spark的輸入格式是CombineHiveInputFormat,可以根據需要切分底層輸入格式。調整hive.exec.reducers.bytes.per.reducer控制每個reducer處理多少數據。但是實際情況下,Spark相比於MapReduce,對於指定的hive.exec.reducers.bytes.per.reducer不敏感。我們需要足夠的任務讓可用的executor保持工作不空閑,當Hive能夠生成足夠多的任務,盡可能的利用空閑的executor。

3.   配置Hive

  Hive on Spark的配置大部分即使不使用Hive,也可以對這些參數調優。但是hive.auto.convert.join.noconditionaltask.size這個參數是將普通的join轉化成map join的閾值,這個參數調優對於性能有很大影響。MapReduce和Spark都可以通過這個參數進行調優,但是這個參數在Hive On MR上的含義不同於Hive On Spark。

  數據的大小由兩個統計量標識:

  • ·totalSize 磁盤上數據的大小
  • ·rawDataSize 內存中數據的大小

  Hive On MapReduce使用的是totalSize,Spark使用rawDataSize。數據由於經過一系列壓縮、序列化等操作,即使是相同的數據集,也會有很大的不同,對於Hive On Spark,需要設置   hive.auto.convert.join.noconditionaltask.size,將普通的join操作轉化成map join來提升性能,集群資源充足的情況下可以把這個參數的值適當調大,來更多的觸發map join。但是設置太高的話,小表的數據會占用過多的內存導致整個任務因為內存耗盡而失敗,所有這個參數需要根據集群的資源來進行調整。

  Cloudera推薦配置兩個額外的配置項:

  hive.stats.fetch.column.stats=true

  hive.optimize.index.filter=true

  以下還整理了一些配置項用於hive調優:

hive.optimize.reducededuplication.min.reducer=4

hive.optimize.reducededuplication=true

hive.merge.mapfiles=true

hive.merge.mapredfiles=false

hive.merge.smallfiles.avgsize=16000000

hive.merge.size.per.task=256000000

hive.merge.sparkfiles=true

hive.auto.convert.join=true

hive.auto.convert.join.noconditionaltask=true

hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M)

hive.optimize.bucketmapjoin.sortedmerge=false

hive.map.aggr.hash.percentmemory=0.5

hive.map.aggr=true

hive.optimize.sort.dynamic.partition=false

hive.stats.autogather=true

hive.stats.fetch.column.stats=true

hive.compute.query.using.stats=true

hive.limit.pushdown.memory.usage=0.4 (MR and Spark)

hive.optimize.index.filter=true

hive.exec.reducers.bytes.per.reducer=67108864

hive.smbjoin.cache.rows=10000

hive.fetch.task.conversion=more

hive.fetch.task.conversion.threshold=1073741824

hive.optimize.ppd=true

 

8)   設置Pre-warming Yarn Container

  我們使用Hive On Spark的時候,提交第一個查詢時,看到查詢結果可能會有比較長的延遲,但是再次運行相同的SQL查詢,完成速度要比第一個查詢快得多。

  當Spark使用yarn管理資源調度時,Spark executor需要額外的時間來啟動和初始化,在程序運行之前,Spark不會等待所有的executor准備好之后運行,所有在任務提交到集群之后,仍有一些executor處於啟動狀態。在Spark上運行的作業運行速度與executor個數相關,當可用的executor的個數沒有達到最大值的時候,作業達不到最大的並行性,所有Hive上提交的第一個SQL查詢會慢。

  如果是在長時間會話這個應該問題影響很小,因為只有執行第一個SQL的時候會慢,問題不大,但是很多時候我們寫的Hive腳本,需要用一些調度框架去啟動(如Oozie)。這時候我們需要考慮進行優化。

  為了減少啟動時間,我們可以開啟container pre-warming機制,開啟后只有當任務請求的所有executor准備就緒,作業才會開始運行。這樣會提升Spark作業的並行度。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM