spark動態資源調整其實也就是說的executor數目支持動態增減,動態增減是根據spark應用的實際負載情況來決定。
開啟動態資源調整需要(on yarn情況下)
1.將spark.dynamicAllocation.enabled設置為true。意思就是啟動動態資源功能
2.將spark.shuffle.service.enabled設置為true。 在每個nodeManager上設置外部shuffle服務
2.1 將spark-<version>-yarn-shuffle.jar拷貝到每台nodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
2.2 配置yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>
2.3 重啟所有nodeManager
關於資源(executor)的Request與Remove策略
Request策略
當有被掛起的任務(pending task)的時候,也就表示當前的executor數量還不足夠所有的task並行運行,這時候spark會申請增加資源,
但是並不是出現pending task就立刻請求增加executor。由下面兩個參數決定,如下:
- 1.spark.dynamicAllocation.schedulerBacklogTimeout:
如果啟用了動態資源分配功能,如果有pending task並且等待了一段時間(默認1秒),則增加executor
- 2.spark.dynamicAllocation.sustainedSchedulerBacklogTimeout:
隨后每隔N秒(默認1秒),再檢測pending task,如果仍然存在,增加executor。
此外每輪請求的executor數量是指數增長的。 比如,在第一輪中添加1個executor,然后在隨后的輪中添加2、4、8,依此類推。
Remove策略
如果某executor空閑超過了一段時間,則remove此executor,由下面參數決定:
spark.dynamicAllocation.executorIdleTimeout:默認60秒
此外關於動態資源分配還有以下相關參數
- spark.dynamicAllocation.initialExecutors:
初始executor數量,如果--num-executors設置的值比這個值大,那么將使用--num-executors設置的值作為初始executor數量。
- spark.dynamicAllocation.maxExecutors:
executor數量的上限,默認是無限制的。
- spark.dynamicAllocation.minExecutors:
executor數量的下限,默認是0個
- spark.dynamicAllocation.cachedExecutorIdleTimeout:
如果executor內有緩存數據(cache data),並且空閑了N秒。則remove該executor。默認值無限制。也就是如果有緩存數據,則不會remove該executor
為什么?比如在寫shuffle數據時候,executor可能會寫到磁盤也可能會保存在內存中,如果保存在內存中,該executor又remove掉了,那么數據也就丟失了。
spark動態資源分配機制的應用
使用spark thriftserver將spark作為一個長期運行的服務。用戶通過JDBC來提交sql查詢:
$SPARK_HOME/sbin/start-thriftserver.sh --executor-memory 20g --executor-cores 5 --driver-memory 10g --driver-cores 5 \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.dynamicAllocation.initialExecutors=20 \ --conf spark.dynamicAllocation.minExecutors=20 \ --conf spark.dynamicAllocation.maxExecutors=400 \ --conf spark.dynamicAllocation.executorIdleTimeout=300s \ --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \
動態調整資源面臨的問題
我們先看看,動態資源調整需要解決哪幾個問題:
-
Cache問題。如果需要移除的Executor含有RDD cache該如何辦?
-
Shuffle問題。如果需要移除的Executor包含了Shuffle Write先關數據該怎么辦?
-
添加和刪除之后都需要告知DAGSchedule進行相關信息更新。
Cache去掉了重算即可。為了防止數據抖動,默認包含有Cache的Executor是不會被刪除的,因為默認的Idle時間設置的非常大:
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds( "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s")
你可以自己設置從而去掉這個限制。
而對於Shuffle,則需要和Yarn集成,需要配置yarn.nodemanager.aux-services
。具體配置方式,大家可以Google。這樣Spark Executor就不用保存Shuffle狀態了。
使用動態資源分配的建議
我們發現,DRA(Dynamic Resource Allocation)涉及到的點還是很多的,雖然邏輯比較簡單,但是和任務調度密切相關,是一個非常動態的過程。這個設計本身也是面向一個通用的調度方式。
建議如果采用了DRA,可以注意如下幾點:
-
設置一個合理的minExecutors-maxExecutors值
-
將Executor對應的cpuCore 最好設置為<=3 ,避免Executor數目下降時,等不及新申請到資源,已有的Executor就因為任務過重而導致集群掛掉。
-
如果程序中有shuffle,例如(reduce,groupBy),建議設置一個合理的並行數,避免殺掉過多的Executors。
-
對於每個Stage持續時間很短的應用,其實不適合這套機制。這樣會頻繁增加和殺掉Executors,造成系統顛簸。而Yarn對資源的申請處理速度並不快。
官網關於動態資源分配的文檔:
http://spark.apache.org/docs/2.3.1/job-scheduling.html#dynamic-resource-allocation
http://spark.apache.org/docs/2.3.1/configuration.html#dynamic-allocation