SparkSubmit參數及參數性能調優


首先擺出常用的參數設定

bin/spark-submit \
--class com.xyz.bigdata.calendar.PeriodCalculator \
--master yarn \
--deploy-mode cluster \
--queue default_queue \
--num-executors 50 \
--executor-cores 2 \
--executor-memory 4G \
--driver-memory 2G \
--conf "spark.default.parallelism=250" \
--conf "spark.shuffle.memoryFraction=0.3" \
--conf "spark.storage.memoryFraction=0.5" \
--conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
--verbose \
${PROJECT_DIR}/bigdata-xyz-0.1.jar

 

關於spark-submit的執行過程,讀Spark Core的源碼能夠獲得一個大致的印象。今天事情比較多,所以之后會另寫文章專門敘述關於Spark on YARN的事情(又挖了一個坑,上一個坑是關於Java String和JVM的,需要盡快填上了)。

num-executors

  • 含義:設定Spark作業要用多少個Executor進程來執行。
  • 設定方法:根據我們的實踐,設定在30~100個之間為最佳。如果不設定,默認只會啟動非常少的Executor。如果設得太小,無法充分利用計算資源。設得太大的話,又會搶占集群或隊列的資源,導致其他作業無法順利執行。

executor-cores

  • 含義:設定每個Executor能夠利用的CPU核心數(這里核心指的是vCore)。核心數越多,並行執行Task的效率也就越高。
  • 設定方法:根據我們的實踐,設定在2~6之間都是可以的,主要是根據業務類型和數據處理邏輯的復雜程度來定,一般來講設2或者3就夠用了。需要注意的是,num-executors * executor-cores不能將隊列中的CPU資源耗盡,最好不要超過總vCore數的1/3,以給其他作業留下剩余資源。

executor-memory

  • 含義:設定每個Executor的內存量(堆內內存)。這個參數比executor-cores更為重要,因為Spark作業的本質就是內存計算,內存的大小直接影響性能,並且與磁盤溢寫、OOM等都相關。
  • 設定方法:一般設定在2G~8G之間,需要根據數據量慎重做決定。如果作業執行非常慢,出現頻繁GC或者OOM,就得適當調大內存。並且與上面相同,num-executors * executor-memory也不能過大,最好不要超過隊列總內存量的一半。
    另外,還有一個配置項spark.executor.memoryOverhead,用來設定每個Executor可使用的堆外內存大小,默認值是executor-memory的0.1倍,最小值384M。一般來講都夠用,不用特意設置。

driver-memory

  • 含義:設定Driver進程的內存量(堆內內存)。
  • 設定方法:由於我們幾乎不會使用collect()之類的算子把大量RDD數據都拉到Driver上來處理,所以它的內存可以不用設得過大,2G可以應付絕大多數情況。但是,如果Spark作業處理完后數據膨脹比較多,那么還是應該酌情加大這個值。
    與上面一項相同,spark.driver.memoryOverhead用來設定Driver可使用的堆外內存大小。

spark.default.parallelism

  • 含義:對於shuffle算子,如reduceByKey()或者join(),這個參數用來指定父RDD中最大分區數。由於分區與Task有一一對應關系,因此也可以理解為Task數。其名稱的字面意義是“並行度”,不能直接表達出這種含義。
  • 設定方法:Spark官方文檔中推薦每個CPU core執行2~3個Task比較合適,因此這個值要設定為(num-executors * executor-cores)的2~3倍。這個參數同樣非常重要,因為如果不設定的話,分區數就會由RDD本身的分區來決定,這樣往往會使得計算效率低下。

spark.shuffle.memoryFraction

  • 含義:shuffle操作(聚合、連接、分組等等)能夠使用的可用堆內存(堆大小減去300MB保留空間)的比例,默認值是0.2。如果shuffle階段使用的內存比例超過這個值,就會溢寫到磁盤。
  • 設定方法:取決於計算邏輯中shuffle邏輯的復雜度,如果會產生大量數據,那么一定要調高。在我們的實踐中,一般都設定在0.3左右。但是,如果調太高之后發現頻繁GC,那么就是執行用戶代碼的execution內存不夠用了,適當降低即可。

spark.storage.memoryFraction

  • 含義:緩存操作(persist/cache)能夠使用的可用堆內存的比例,默認值是0.6。
  • 設定方法:如果經常需要緩存非常大的RDD,那么就需要調高。否則,如果shuffle操作更為重量級,適當調低也無妨。我們一般設定在0.5左右。

其實,spark.shuffle/storage.memoryFraction是舊版的靜態內存管理(StaticMemoryManager)的遺產。在Spark 1.6版本之后的文檔中已經標記成了deprecated。目前取代它們的是spark.memory.fraction和spark.memory.storageFraction這兩項,參考新的統一內存管理(UnifiedMemoryManager)機制可以得到更多細節。
前者的含義是總內存占堆的比例,即execution+storage+shuffle內存的總量。后者則是storage內存占前者的比例。默認值分別為0.75(最新版變成了0.6)和0.5。

spark.driver/executor.extraJavaOptions

  • 含義:Driver或Executor進程的其他JVM參數。
  • 設定方法:一般可以不設置。如果設置,常見的情景是使用-Xmn加大年輕代內存的大小,或者手動指定垃圾收集器(最上面的例子中使用了G1,也有用CMS的時候)及其相關參數。

舉例

舉例,以我上一個老東家的參數命令,假設數據量400G,要求1個半小時內跑完
 
離線:

spark-submit --master yarn \
--class com.sddt.spark.web.WebETL \
--driver-memory 10G \
--executor-memory 24G \
--num-executors 20 \
--executor-cores 4 \
--conf spark.web.etl.inputBaseDir=hdfs://master:9999/user/hive/warehouse/rawdata.db/web \
--conf spark.web.etl.outputBaseDir=hdfs://master:9999/user/hadoop-twq/traffic-analysis/web \
--conf spark.web.etl.startDate=20180617 \
--conf spark.driver.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master" \
--conf spark.executor.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master -Dcom.sun.management.jmxremote.port=1119 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
/home/hadoop-twq/traffice-analysis/jars/spark-sessionization-etl-1.0-SNAPSHOT-jar-with-dependencies.jar prod

實時流:

#!/usr/bin/env bash

# the two most important settings:
# 取決於如下因素:
# 1:每一秒接收到的events,尤其是在高峰時間
# 2:數據源的緩沖能力
# 3:可以忍受的最大的滯后時間
# 我們通過將我們的Streaming程序在准生產環境中跑幾天來確定以上的因素
# 進而確定我們的executors的個數
num_executors=6
# 取決於如下因素:
# 1:每一個batch需要處理的數據的大小
# 2:transformations API的種類,如果使用的transformations需要shuffle的話,則需要的內存更大一點
# 3:使用狀態Api的話,需要的內存更加大一點,因為需要內存緩存每一個key的狀態
executor_memory=6g

# 每一個executor配置3到5個cores是比較好的,因為3到5個並發寫HDFS是最優的
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
executor_cores=3

# backpressure
receiver_max_rate=100
receiver_initial_rate=30

spark-submit
--master yarn -
-deploy-mode cluster \
--name "Real_Time_SessionizeData" \
--class com.sddt.sessionize.SessionizeData \
--driver-memory 2g \
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \
--queue "realtime_queue" \
--files "hdfs:master:9999/user/hadoop-master/real-time-analysis/log4j-yarn.properties" \
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer`\
--conf spark.locality.wait=10 `# 減少Spark Delay Scheduling從而提高數據處理的並行度, 默認是3000ms` \
--conf spark.task.maxFailures=8 `# 增加job失敗前最大的嘗試次數, 默認是4`\
--conf spark.ui.killEnabled=false `# 禁止從Spark UI上殺死Spark Streaming的程序`\
--conf spark.logConf=true `# 在driver端打印Spark Configuration的日志`\
`# SPARK STREAMING CONFIGURATION` \
--conf spark.streaming.blockInterval=200 `# 生成數據塊的時間, 默認是200ms`\
--conf spark.streaming.backpressure.enabled=true `# 打開backpressure的功能`\
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# direct模式讀取kafka每一個分區數據的速度`\
`# YARN CONFIGURATION` \
--conf spark.yarn.driver.memoryOverhead=512 `# Set if --driver-memory < 5GB`\
--conf spark.yarn.executor.memoryOverhead=1024 `# Set if --executor-memory < 10GB`\
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts`\
--conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour`\
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures`\
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour`\
--driver-java-options hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092
/home/hadoop-master/real-time-analysis/realtime-streaming-sessionization-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092

  

 

性能調優

Spark性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。

資源的分配在使用腳本提交Spark任務時進行指定,標准的Spark任務提交腳本如代碼清單2-1所示:

/usr/opt/modules/spark/bin/spark-submit \
--class com.sddt.spark.Analysis \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \

 

可以進行分配的資源如表2-1所示:

名稱

說明

--num-executors

配置Executor的數量

--driver-memory

配置Driver內存(影響不大)

--executor-memory

配置每個Executor的內存大小

--executor-cores

配置每個Executor的CPU core數量

 

調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。

對於具體資源的分配,我們分別討論Spark的兩種Cluster運行模式:

第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15台機器,每台機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。

第二種是Spark Yarn模式,由於Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。

對表2-1中的各項資源進行了調節后,得到的性能提升如表2-2所示:

名稱

解析

 

 

增加Executor·個數

在資源允許的情況下,增加Executor的個數可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將Executor的個數增加到8個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。

 

 

 

增加每個Executor的CPU core個數

  在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。

 

 

 

 

 

 

 

增加每個Executor的內存量

  在資源允許的情況下,增加每個Executor的內存量以后,對性能的提升有三點:

  1. 可以緩存更多的數據(即對RDD進行cache),寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;
  2. 可以為shuffle操作提供更多內存,即有更多空間來存放reduce端拉取的數據,寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO;
  3. 可以為task的執行提供更多內存,在task的執行過程中可能創建很多對象,內存較小時會引發頻繁的GC,增加內存后,可以避免頻繁的GC,提升整體性能。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM