首先擺出常用的參數設定
bin/spark-submit \ --class com.xyz.bigdata.calendar.PeriodCalculator \ --master yarn \ --deploy-mode cluster \ --queue default_queue \ --num-executors 50 \ --executor-cores 2 \ --executor-memory 4G \ --driver-memory 2G \ --conf "spark.default.parallelism=250" \ --conf "spark.shuffle.memoryFraction=0.3" \ --conf "spark.storage.memoryFraction=0.5" \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \ --verbose \ ${PROJECT_DIR}/bigdata-xyz-0.1.jar
關於spark-submit的執行過程,讀Spark Core的源碼能夠獲得一個大致的印象。今天事情比較多,所以之后會另寫文章專門敘述關於Spark on YARN的事情(又挖了一個坑,上一個坑是關於Java String和JVM的,需要盡快填上了)。
num-executors
- 含義:設定Spark作業要用多少個Executor進程來執行。
- 設定方法:根據我們的實踐,設定在30~100個之間為最佳。如果不設定,默認只會啟動非常少的Executor。如果設得太小,無法充分利用計算資源。設得太大的話,又會搶占集群或隊列的資源,導致其他作業無法順利執行。
executor-cores
- 含義:設定每個Executor能夠利用的CPU核心數(這里核心指的是vCore)。核心數越多,並行執行Task的效率也就越高。
- 設定方法:根據我們的實踐,設定在2~6之間都是可以的,主要是根據業務類型和數據處理邏輯的復雜程度來定,一般來講設2或者3就夠用了。需要注意的是,num-executors * executor-cores不能將隊列中的CPU資源耗盡,最好不要超過總vCore數的1/3,以給其他作業留下剩余資源。
executor-memory
- 含義:設定每個Executor的內存量(堆內內存)。這個參數比executor-cores更為重要,因為Spark作業的本質就是內存計算,內存的大小直接影響性能,並且與磁盤溢寫、OOM等都相關。
- 設定方法:一般設定在2G~8G之間,需要根據數據量慎重做決定。如果作業執行非常慢,出現頻繁GC或者OOM,就得適當調大內存。並且與上面相同,num-executors * executor-memory也不能過大,最好不要超過隊列總內存量的一半。
另外,還有一個配置項spark.executor.memoryOverhead,用來設定每個Executor可使用的堆外內存大小,默認值是executor-memory的0.1倍,最小值384M。一般來講都夠用,不用特意設置。
driver-memory
- 含義:設定Driver進程的內存量(堆內內存)。
- 設定方法:由於我們幾乎不會使用collect()之類的算子把大量RDD數據都拉到Driver上來處理,所以它的內存可以不用設得過大,2G可以應付絕大多數情況。但是,如果Spark作業處理完后數據膨脹比較多,那么還是應該酌情加大這個值。
與上面一項相同,spark.driver.memoryOverhead用來設定Driver可使用的堆外內存大小。
spark.default.parallelism
- 含義:對於shuffle算子,如reduceByKey()或者join(),這個參數用來指定父RDD中最大分區數。由於分區與Task有一一對應關系,因此也可以理解為Task數。其名稱的字面意義是“並行度”,不能直接表達出這種含義。
- 設定方法:Spark官方文檔中推薦每個CPU core執行2~3個Task比較合適,因此這個值要設定為(num-executors * executor-cores)的2~3倍。這個參數同樣非常重要,因為如果不設定的話,分區數就會由RDD本身的分區來決定,這樣往往會使得計算效率低下。
spark.shuffle.memoryFraction
- 含義:shuffle操作(聚合、連接、分組等等)能夠使用的可用堆內存(堆大小減去300MB保留空間)的比例,默認值是0.2。如果shuffle階段使用的內存比例超過這個值,就會溢寫到磁盤。
- 設定方法:取決於計算邏輯中shuffle邏輯的復雜度,如果會產生大量數據,那么一定要調高。在我們的實踐中,一般都設定在0.3左右。但是,如果調太高之后發現頻繁GC,那么就是執行用戶代碼的execution內存不夠用了,適當降低即可。
spark.storage.memoryFraction
- 含義:緩存操作(persist/cache)能夠使用的可用堆內存的比例,默認值是0.6。
- 設定方法:如果經常需要緩存非常大的RDD,那么就需要調高。否則,如果shuffle操作更為重量級,適當調低也無妨。我們一般設定在0.5左右。
其實,spark.shuffle/storage.memoryFraction是舊版的靜態內存管理(StaticMemoryManager)的遺產。在Spark 1.6版本之后的文檔中已經標記成了deprecated。目前取代它們的是spark.memory.fraction和spark.memory.storageFraction這兩項,參考新的統一內存管理(UnifiedMemoryManager)機制可以得到更多細節。
前者的含義是總內存占堆的比例,即execution+storage+shuffle內存的總量。后者則是storage內存占前者的比例。默認值分別為0.75(最新版變成了0.6)和0.5。
spark.driver/executor.extraJavaOptions
- 含義:Driver或Executor進程的其他JVM參數。
- 設定方法:一般可以不設置。如果設置,常見的情景是使用-Xmn加大年輕代內存的大小,或者手動指定垃圾收集器(最上面的例子中使用了G1,也有用CMS的時候)及其相關參數。
舉例
舉例,以我上一個老東家的參數命令,假設數據量400G,要求1個半小時內跑完spark-submit --master yarn \
--class com.sddt.spark.web.WebETL \
--driver-memory 10G \
--executor-memory 24G \
--num-executors 20 \
--executor-cores 4 \
--conf spark.web.etl.inputBaseDir=hdfs://master:9999/user/hive/warehouse/rawdata.db/web \
--conf spark.web.etl.outputBaseDir=hdfs://master:9999/user/hadoop-twq/traffic-analysis/web \
--conf spark.web.etl.startDate=20180617 \
--conf spark.driver.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master" \
--conf spark.executor.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master -Dcom.sun.management.jmxremote.port=1119 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
/home/hadoop-twq/traffice-analysis/jars/spark-sessionization-etl-1.0-SNAPSHOT-jar-with-dependencies.jar prod
實時流:
#!/usr/bin/env bash
# the two most important settings:
# 取決於如下因素:
# 1:每一秒接收到的events,尤其是在高峰時間
# 2:數據源的緩沖能力
# 3:可以忍受的最大的滯后時間
# 我們通過將我們的Streaming程序在准生產環境中跑幾天來確定以上的因素
# 進而確定我們的executors的個數
num_executors=6
# 取決於如下因素:
# 1:每一個batch需要處理的數據的大小
# 2:transformations API的種類,如果使用的transformations需要shuffle的話,則需要的內存更大一點
# 3:使用狀態Api的話,需要的內存更加大一點,因為需要內存緩存每一個key的狀態
executor_memory=6g
# 每一個executor配置3到5個cores是比較好的,因為3到5個並發寫HDFS是最優的
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
executor_cores=3
# backpressure
receiver_max_rate=100
receiver_initial_rate=30
spark-submit
--master yarn -
-deploy-mode cluster \
--name "Real_Time_SessionizeData" \
--class com.sddt.sessionize.SessionizeData \
--driver-memory 2g \
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \
--queue "realtime_queue" \
--files "hdfs:master:9999/user/hadoop-master/real-time-analysis/log4j-yarn.properties" \
--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer`\
--conf spark.locality.wait=10 `# 減少Spark Delay Scheduling從而提高數據處理的並行度, 默認是3000ms` \
--conf spark.task.maxFailures=8 `# 增加job失敗前最大的嘗試次數, 默認是4`\
--conf spark.ui.killEnabled=false `# 禁止從Spark UI上殺死Spark Streaming的程序`\
--conf spark.logConf=true `# 在driver端打印Spark Configuration的日志`\
`# SPARK STREAMING CONFIGURATION` \
--conf spark.streaming.blockInterval=200 `# 生成數據塊的時間, 默認是200ms`\
--conf spark.streaming.backpressure.enabled=true `# 打開backpressure的功能`\
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# direct模式讀取kafka每一個分區數據的速度`\
`# YARN CONFIGURATION` \
--conf spark.yarn.driver.memoryOverhead=512 `# Set if --driver-memory < 5GB`\
--conf spark.yarn.executor.memoryOverhead=1024 `# Set if --executor-memory < 10GB`\
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts`\
--conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour`\
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures`\
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour`\
--driver-java-options hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092
/home/hadoop-master/real-time-analysis/realtime-streaming-sessionization-1.0-SNAPSHOT-jar-with-dependencies.jar \
hdfs://master:9999/user/hadoop-master/real-time-analysis/output real_time_session s hdfs://master:9999/user/hadoop-master/real-time-analysis/checkpoint session master:9092
性能調優
Spark性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。
資源的分配在使用腳本提交Spark任務時進行指定,標准的Spark任務提交腳本如代碼清單2-1所示:
/usr/opt/modules/spark/bin/spark-submit \ --class com.sddt.spark.Analysis \ --num-executors 80 \ --driver-memory 6g \ --executor-memory 6g \ --executor-cores 3 \ /usr/opt/modules/spark/jar/spark.jar \
可以進行分配的資源如表2-1所示:
名稱 |
說明 |
--num-executors |
配置Executor的數量 |
--driver-memory |
配置Driver內存(影響不大) |
--executor-memory |
配置每個Executor的內存大小 |
--executor-cores |
配置每個Executor的CPU core數量 |
調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。
對於具體資源的分配,我們分別討論Spark的兩種Cluster運行模式:
第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15台機器,每台機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。
第二種是Spark Yarn模式,由於Yarn使用資源隊列進行資源的分配和調度,在表寫submit腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。
對表2-1中的各項資源進行了調節后,得到的性能提升如表2-2所示:
名稱 |
解析 |
增加Executor·個數 |
在資源允許的情況下,增加Executor的個數可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將Executor的個數增加到8個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。 |
增加每個Executor的CPU core個數 |
在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那么可以並行執行8個task,如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),那么可以並行執行16個task,此時的並行能力提升了一倍。 |
增加每個Executor的內存量 |
在資源允許的情況下,增加每個Executor的內存量以后,對性能的提升有三點:
|