Hive默認使用的計算框架是MapReduce,在我們使用Hive的時候通過寫SQL語句,Hive會自動將SQL語句轉化成MapReduce作業去執行,但是MapReduce的執行速度遠差與Spark。通過搭建一個Hive On Spark可以修改Hive底層的計算引擎,將MapReduce替換成Spark,從而大幅度提升計算速度。接下來就如何搭建Hive On Spark展開描述。
注:本人使用的是CDH5.9.1,使用的Spark版本是1.6.0,使用的集群配置為4個節點,每台內存32+G,4 Core。
1. 配置Yarn
Yarn需要配置兩個參數:yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb。yarn.nodemanager.resource.cpu-vcores代表可以為container分配的CPU 內核的數量。yarn.nodemanager.resource.memory-mb代表可分配給容器的物理內存大小。
1) 配置cpu core
為每個服務分配一個core,為操作系統預留2個core,剩余的可用的core分配給yarn。我使用的集群共有16個core,留出4個,剩余的12個core分配給yarn。

2) 配置內存
設置Yarn內存為36G

2. 配置Spark
給Yarn分配完資源后,需要配置一些Spark的參數,設置Spark可使用的資源。包括executor和Driver的內存,分配executor和設置並行度。
3) 配置executor內存
在配置executor的內存大小的時候,需要考慮以下因素:
- 增加executor的內存可以優化map join。但是會增加GC的時間。
- 在某些情況下,HDFS客戶端沒有並行處理多個寫請求,在有多個請求競爭資源的時候會出現一個executor使用過多的core。
- 盡可能的減少空閑的core的個數,cloudera推薦設置spark.executor.cores為4,5,6,這取決於給yarn分配的資源。
比如說,因為我們有12個core可用,我們可以設置為4,這樣12/4余數為0,設置為5的話會剩余兩個空閑。設置4個可使得空閑的core盡可能的少。
這樣配置之后我們可以同時運行三個executor,每個executor最多可以運行4個任務(每個core一個)。
還有一點是要求spark.executor.memoryOverhead和spark.executor.memory的和不能超過yarn.scheduler.maximum-allocation-mb設置的值。我的scheduler請求最大內存分配的是12G。

4) 配置Driver內存
Spark Driver端的配置如下:
- spark.driver.memory---當hive運行在spark上時,driver端可用的最大Java堆內存。
- spark.yarn.driver.memoryOverhead---每個driver可以額外從yarn請求的堆內存大小。這個參數加上spark.driver.memory就是yarn為driver端的JVM分配的總內存。
Spark在Driver端的內存不會直接影響性能,但是在沒有足夠內存的情況下在driver端強制運行Spark任務需要調整。
5) 設置executor個數
集群的executor個數設置由集群中每個節點的executor個數和集群的worker個數決定,如果集群中有3個worker,則Hive On Spark可以使用的executor最大個數是12個(3 * 4)。
Hive的性能受可用的executor的個數影響很明顯,一般情況下,性能和executor的個數成正比,4個executor的性能大約是2個executor性能的一倍,但是性能在executor設置為一定數量的時候會達到極值,達到這個極值之后再增加executor的個數不會增加性能,反而有可能會為集群增加負擔。
6) 動態分配executor
設置spark.executor.instances到最大值可以使得Spark集群發揮最大性能。但是這樣有個問題是當集群有多個用戶運行Hive查詢時會有問題,應避免為每個用戶的會話分配固定數量的executor,因為executor分配后不能回其他用戶的查詢使用,如果有空閑的executor,在生產環境中,計划分配好executor可以更充分的利用Spark集群資源。
Spark允許動態的給Spark作業分配集群資源,cloudera推薦開啟動態分配。
7) 設置並行度
為了更加充分的利用executor,必須同時允許足夠多的並行任務。在大多數情況下,hive會自動決定並行度,但是有時候我們可能會手動的調整並行度。在輸入端,map task的個數等於輸入端按照一定格式切分的生成的數目,Hive On Spark的輸入格式是CombineHiveInputFormat,可以根據需要切分底層輸入格式。調整hive.exec.reducers.bytes.per.reducer控制每個reducer處理多少數據。但是實際情況下,Spark相比於MapReduce,對於指定的hive.exec.reducers.bytes.per.reducer不敏感。我們需要足夠的任務讓可用的executor保持工作不空閑,當Hive能夠生成足夠多的任務,盡可能的利用空閑的executor。
3. 配置Hive
Hive on Spark的配置大部分即使不使用Hive,也可以對這些參數調優。但是hive.auto.convert.join.noconditionaltask.size這個參數是將普通的join轉化成map join的閾值,這個參數調優對於性能有很大影響。MapReduce和Spark都可以通過這個參數進行調優,但是這個參數在Hive On MR上的含義不同於Hive On Spark。
數據的大小由兩個統計量標識:
- ·totalSize 磁盤上數據的大小
- ·rawDataSize 內存中數據的大小
Hive On MapReduce使用的是totalSize,Spark使用rawDataSize。數據由於經過一系列壓縮、序列化等操作,即使是相同的數據集,也會有很大的不同,對於Hive On Spark,需要設置 hive.auto.convert.join.noconditionaltask.size,將普通的join操作轉化成map join來提升性能,集群資源充足的情況下可以把這個參數的值適當調大,來更多的觸發map join。但是設置太高的話,小表的數據會占用過多的內存導致整個任務因為內存耗盡而失敗,所有這個參數需要根據集群的資源來進行調整。
Cloudera推薦配置兩個額外的配置項:
hive.stats.fetch.column.stats=true
hive.optimize.index.filter=true
以下還整理了一些配置項用於hive調優:
|
hive.optimize.reducededuplication.min.reducer=4 hive.optimize.reducededuplication=true hive.merge.mapfiles=true hive.merge.mapredfiles=false hive.merge.smallfiles.avgsize=16000000 hive.merge.size.per.task=256000000 hive.merge.sparkfiles=true hive.auto.convert.join=true hive.auto.convert.join.noconditionaltask=true hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M) hive.optimize.bucketmapjoin.sortedmerge=false hive.map.aggr.hash.percentmemory=0.5 hive.map.aggr=true hive.optimize.sort.dynamic.partition=false hive.stats.autogather=true hive.stats.fetch.column.stats=true hive.compute.query.using.stats=true hive.limit.pushdown.memory.usage=0.4 (MR and Spark) hive.optimize.index.filter=true hive.exec.reducers.bytes.per.reducer=67108864 hive.smbjoin.cache.rows=10000 hive.fetch.task.conversion=more hive.fetch.task.conversion.threshold=1073741824 hive.optimize.ppd=true |
8) 設置Pre-warming Yarn Container
我們使用Hive On Spark的時候,提交第一個查詢時,看到查詢結果可能會有比較長的延遲,但是再次運行相同的SQL查詢,完成速度要比第一個查詢快得多。
當Spark使用yarn管理資源調度時,Spark executor需要額外的時間來啟動和初始化,在程序運行之前,Spark不會等待所有的executor准備好之后運行,所有在任務提交到集群之后,仍有一些executor處於啟動狀態。在Spark上運行的作業運行速度與executor個數相關,當可用的executor的個數沒有達到最大值的時候,作業達不到最大的並行性,所有Hive上提交的第一個SQL查詢會慢。
如果是在長時間會話這個應該問題影響很小,因為只有執行第一個SQL的時候會慢,問題不大,但是很多時候我們寫的Hive腳本,需要用一些調度框架去啟動(如Oozie)。這時候我們需要考慮進行優化。
為了減少啟動時間,我們可以開啟container pre-warming機制,開啟后只有當任務請求的所有executor准備就緒,作業才會開始運行。這樣會提升Spark作業的並行度。
