https://blog.csdn.net/rlnLo2pNEfx9c/article/details/81572016
hive on spark 性能遠比hive on mr 要好,而且提供了一樣的功能。用戶的sql無需修改就可以直接運行於hive on spark。 udf函數也是全部支持。本文主要是想講hive on spark 在運行於yarn模式的情況下如何調優。下文舉例講解的yarn節點機器配置,假設有32核,120GB內存。
1. yarn配置
yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb,這兩個參數決定這集群資源管理器能夠有多少資源用於運行yarn上的任務。 這兩個參數的值是由機器的配置及同時在機器上運行的其它進程共同決定。本文假設僅有hdfs的datanode和yarn的nodemanager運行於該節點。
1.1 配置cores
基本配置是datanode和nodemanager各一個核,操作系統兩個核,然后剩下28核配置作為yarn資源。也即是
yarn.nodemanager.resource.cpu-vcores=28
1.2 配置內存
對於內存,預留20GB給操作系統,datanode,nodemanager,剩余100GB作為yarn資源。也即是
yarn.nodemanager.resource.memory-mb=100*1024
2. spark配置
給yarn分配資源以后,那就要想着spark如何使用這些資源了,主要配置對象:execurtor 和driver內存,executro配額,並行度
2.1 executor內存
設置executor內存需要考慮如下因素:
- executor內存越多,越能為更多的查詢提供map join的優化。由於垃圾回收的壓力會導致開銷增加。
- 某些情況下hdfs的 客戶端不能很好的處理並發寫入,所以過多的核心可能會導致競爭。
為了最大化使用core,建議將core設置為4,5,6(多核心會導致並發問題,所以寫代碼的時候尤其是靜態的鏈接等要考慮並發問題)具體分配核心數要結合yarn所提供的核心數。 由於本文中涉及到的node節點是28核,那么很明顯分配為4的化可以被整除,spark.executor.cores設置為4 不會有多余的核剩下,設置為5,6都會有core剩余。 spark.executor.cores=4,由於總共有28個核,那么最大可以申請的executor數是7。總內存處以7,也即是 100/7,可以得到每個executor約14GB內存。
要知道 spark.executor.memory 和spark.executor.memoryOverhead 共同決定着 executor內存。建議 spark.executor.memoryOverhead占總內存的 15%-20%。 那么最終 spark.executor.memoryOverhead=2 G 和spark.executor.memory=12 G
根據上面的配置的化,每個主機就可以申請7個executor,每個executor可以運行4個任務,每個core一個task。那么每個task的平均內存是 14/4 = 3.5GB。在executor運行的task共享內存。 其實,executor內部是用newCachedThreadPool運行task的。
確保 spark.executor.memoryOverhead和 spark.executor.memory的和不超過yarn.scheduler.maximum-allocation-mb !!!
2.2 driver內存
對於drvier的內存配置,也包括兩個參數:
- spark.driver.memoryOverhead 每個driver能從yarn申請的堆外內存的大小。
- spark.driver.memory 當運行hive on spark的時候,每個spark driver能申請的最大jvm 堆內存。該參數結合 spark.driver.memoryOverhead共同決定着driver的內存大小。
driver的內存大小並不直接影響性能,但是也不要job的運行受限於driver的內存. 這里給出spark driver內存申請的方案,假設yarn.nodemanager.resource.memory-mb是 X。
- driver內存申請12GB,假設 X > 50GB
- driver內存申請 4GB,假設 12GB < X <50GB
- driver內存申請1GB,假設 1GB < X < 12 GB
- driver內存申請256MB,假設 X < 1GB
這些數值是 spark.driver.memory和 spark.driver.memoryOverhead內存的總和。對外內存站總內存的10%-15%。 假設 yarn.nodemanager.resource.memory-mb=100*1024MB,那么driver內存設置為12GB,此時 spark.driver.memory=10.5gb和spark.driver.memoryOverhead=1.5gb
注意,資源多少直接對應的是數據量的大小。所以要結合資源和數據量進行適當縮減和增加。
2.3 executor數
executor的數目是由每個節點運行的executor數目和集群的節點數共同決定。如果你有四十個節點,那么hive可以使用的最大executor數就是 280(40*7). 最大數目可能比這個小點,因為driver也會消耗1core和12GB。當前假設是沒有yarn應用在跑。
Hive性能與用於運行查詢的executor數量直接相關。 但是,不通查詢還是不通。 通常,性能與executor的數量成比例。 例如,查詢使用四個executor大約需要使用兩個executor的一半時間。 但是,性能在一定數量的executor中達到峰值,高於此值時,增加數量不會改善性能並且可能產生不利影響。
在大多數情況下,使用一半的集群容量(executor數量的一半)可以提供良好的性能。 為了獲得最佳性能,最好使用所有可用的executor。 例如,設置spark.executor.instances = 280。 對於基准測試和性能測量,強烈建議這樣做。
2.4 動態executor申請
雖然將spark.executor.instances設置為最大值通常可以最大限度地提高性能,但不建議在多個用戶運行Hive查詢的生產環境中這樣做。 避免為用戶會話分配固定數量的executor,因為如果executor空閑,executor不能被其他用戶查詢使用。 在生產環境中,應該好好計划executor分配,以允許更多的資源共享。
Spark允許您根據工作負載動態擴展分配給Spark應用程序的集群資源集。 要啟用動態分配,請按照動態分配中的步驟進行操作。 除了在某些情況下,強烈建議啟用動態分配
2.5 並行度
要使可用的executor得到充分利用,必須同時運行足夠的任務(並行)。在大多數情況下,Hive會自動確定並行度,但也可以在調優並發度方面有一些控制權。
在輸入端,map任務的數量等於輸入格式生成的split數。對於Hive on Spark,輸入格式為CombineHiveInputFormat,它可以根據需要對基礎輸入格式生成的split進行分組。 可以更好地控制stage邊界的並行度。
調整hive.exec.reducers.bytes.per.reducer以控制每個reducer處理的數據量,Hive根據可用的executor,執行程序內存,以及其他因素來確定最佳分區數。 實驗表明,只要生成足夠的任務來保持所有可用的executor繁忙,Spark就比MapReduce對hive.exec.reducers.bytes.per.reducer指定的值敏感度低。 為獲得最佳性能,請為該屬性選擇一個值,以便Hive生成足夠的任務以完全使用所有可用的executor。
3 hive配置
Hive on spark 共享了很多hive性能相關的配置。可以像調優hive on mapreduce一樣調優hive on spark。 然而,hive.auto.convert.join.noconditionaltask.size是基於統計信息將基礎join轉化為map join的閾值,可能會對性能產生重大影響。 盡管該配置可以用hive on mr和hive on spark,但是兩者的解釋不同。
數據的大小有兩個統計指標:
- totalSize- 數據在磁盤上的近似大小。
- rawDataSize- 數據在內存中的近似大小。
hive on mr用的是totalSize。hive on spark使用的是rawDataSize。由於可能存在壓縮和序列化,這兩個值會有較大的差別。 對於hive on spark 需要將 hive.auto.convert.join.noconditionaltask.size指定為更大的值,才能將與hive on mr相同的join轉化為map join。
可以增加此參數的值,以使地圖連接轉換更具凶猛。 將common join 轉換為 map join 可以提高性能。 如果此值設置得太大,則來自小表的數據將使用過多內存,任務可能會因內存不足而失敗。 根據群集環境調整此值。
通過參數 hive.stats.collect.rawdatasize 可以控制是否收集 rawDataSize 統計信息。
對於hiveserver2,建議再配置兩個額外的參數: hive.stats.fetch.column.stats=true 和 hive.optimize.index.filter=true.
Hive性能調優通常建議使用以下屬性:
hive.optimize.reducededuplication.min.reducer=4 hive.optimize.reducededuplication=true hive.merge.mapfiles=true hive.merge.mapredfiles=false hive.merge.smallfiles.avgsize=16000000 hive.merge.size.per.task=256000000 hive.merge.sparkfiles=true hive.auto.convert.join=true hive.auto.convert.join.noconditionaltask=true hive.auto.convert.join.noconditionaltask.size=20M(might need to increase for Spark, 200M) hive.optimize.bucketmapjoin.sortedmerge=false hive.map.aggr.hash.percentmemory=0.5 hive.map.aggr=true hive.optimize.sort.dynamic.partition=false hive.stats.autogather=true hive.stats.fetch.column.stats=true hive.compute.query.using.stats=true hive.limit.pushdown.memory.usage=0.4 (MR and Spark) hive.optimize.index.filter=true hive.exec.reducers.bytes.per.reducer=67108864 hive.smbjoin.cache.rows=10000 hive.fetch.task.conversion=more hive.fetch.task.conversion.threshold=1073741824 hive.optimize.ppd=true
4 預啟動YARN容器
在開始新會話后提交第一個查詢時,在查看查詢開始之前可能會遇到稍長的延遲。還會注意到,如果再次運行相同的查詢,它的完成速度比第一個快得多。
Spark執行程序需要額外的時間來啟動和初始化yarn上的Spark,這會導致較長的延遲。此外,Spark不會等待所有executor在啟動作業之前全部啟動完成,因此在將作業提交到群集后,某些executor可能仍在啟動。 但是,對於在Spark上運行的作業,作業提交時可用executor的數量部分決定了reducer的數量。當就緒executor的數量未達到最大值時,作業可能沒有最大並行度。這可能會進一步影響第一個查詢的性能。
在用戶較長期會話中,這個額外時間不會導致任何問題,因為它只在第一次查詢執行時發生。然而,諸如Oozie發起的Hive工作之類的短期session可能無法實現最佳性能。
為減少啟動時間,可以在作業開始前啟用容器預熱。只有在請求的executor准備就緒時,作業才會開始運行。這樣,在reduce那一側不會減少短會話的並行性。
要啟用預熱功能,請在發出查詢之前將hive.prewarm.enabled設置為true。還可以通過設置hive.prewarm.numcontainers來設置容器數量。默認值為10。
預熱的executor的實際數量受spark.executor.instances(靜態分配)或spark.dynamicAllocation.maxExecutors(動態分配)的值限制。 hive.prewarm.numcontainers的值不應超過分配給用戶會話的值。
注意:預熱需要幾秒鍾,對於短會話來說是一個很好的做法,特別是如果查詢涉及reduce階段。 但是,如果hive.prewarm.numcontainers的值高於群集中可用的值,則該過程最多可能需要30秒。 請謹慎使用預熱。