spark動態資源分配


spark動態資源調整其實也就是說的executor數目支持動態增減,動態增減是根據spark應用的實際負載情況來決定。

開啟動態資源調整需要(on yarn情況下)

1.將spark.dynamicAllocation.enabled設置為true。意思就是啟動動態資源功能
2.將spark.shuffle.service.enabled設置為true。 在每個nodeManager上設置外部shuffle服務
  2.1 將spark-<version>-yarn-shuffle.jar拷貝到每台nodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
  2.2 配置yarn-site.xml
    <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
      <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
      <name>spark.shuffle.service.port</name>
      <value>7337</value>
    </property>
   2.3 重啟所有nodeManager

 

關於資源(executor)的Request與Remove策略

Request策略

當有被掛起的任務(pending task)的時候,也就表示當前的executor數量還不足夠所有的task並行運行,這時候spark會申請增加資源,
但是並不是出現pending task就立刻請求增加executor。由下面兩個參數決定,如下:

  • 1.spark.dynamicAllocation.schedulerBacklogTimeout:

如果啟用了動態資源分配功能,如果有pending task並且等待了一段時間(默認1秒),則增加executor

  • 2.spark.dynamicAllocation.sustainedSchedulerBacklogTimeout:

隨后每隔N秒(默認1秒),再檢測pending task,如果仍然存在,增加executor。
此外每輪請求的executor數量是指數增長的。 比如,在第一輪中添加1個executor,然后在隨后的輪中添加2、4、8,依此類推。

Remove策略

如果某executor空閑超過了一段時間,則remove此executor,由下面參數決定:
spark.dynamicAllocation.executorIdleTimeout:默認60秒

此外關於動態資源分配還有以下相關參數

  • spark.dynamicAllocation.initialExecutors:

    初始executor數量,如果--num-executors設置的值比這個值大,那么將使用--num-executors設置的值作為初始executor數量。

  • spark.dynamicAllocation.maxExecutors:

    executor數量的上限,默認是無限制的。

  • spark.dynamicAllocation.minExecutors:

    executor數量的下限,默認是0個

  • spark.dynamicAllocation.cachedExecutorIdleTimeout:

    如果executor內有緩存數據(cache data),並且空閑了N秒。則remove該executor。默認值無限制。也就是如果有緩存數據,則不會remove該executor
為什么?比如在寫shuffle數據時候,executor可能會寫到磁盤也可能會保存在內存中,如果保存在內存中,該executor又remove掉了,那么數據也就丟失了。

 


 

spark動態資源分配機制的應用

使用spark thriftserver將spark作為一個長期運行的服務。用戶通過JDBC來提交sql查詢:

$SPARK_HOME/sbin/start-thriftserver.sh 
--executor-memory 20g --executor-cores 5 --driver-memory 10g --driver-cores 5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.initialExecutors=20 \
--conf spark.dynamicAllocation.minExecutors=20 \
--conf spark.dynamicAllocation.maxExecutors=400 \
--conf spark.dynamicAllocation.executorIdleTimeout=300s \
--conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \

 


 

動態調整資源面臨的問題

我們先看看,動態資源調整需要解決哪幾個問題:

  • Cache問題。如果需要移除的Executor含有RDD cache該如何辦?

  • Shuffle問題。如果需要移除的Executor包含了Shuffle Write先關數據該怎么辦?

  • 添加和刪除之后都需要告知DAGSchedule進行相關信息更新。

Cache去掉了重算即可。為了防止數據抖動,默認包含有Cache的Executor是不會被刪除的,因為默認的Idle時間設置的非常大:

private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout",
s"${Integer.MAX_VALUE}s")

你可以自己設置從而去掉這個限制。

而對於Shuffle,則需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具體配置方式,大家可以Google。這樣Spark Executor就不用保存Shuffle狀態了。

使用動態資源分配的建議

我們發現,DRA(Dynamic Resource Allocation)涉及到的點還是很多的,雖然邏輯比較簡單,但是和任務調度密切相關,是一個非常動態的過程。這個設計本身也是面向一個通用的調度方式。

建議如果采用了DRA,可以注意如下幾點:

  • 設置一個合理的minExecutors-maxExecutors值

  • 將Executor對應的cpuCore 最好設置為<=3 ,避免Executor數目下降時,等不及新申請到資源,已有的Executor就因為任務過重而導致集群掛掉。

  • 如果程序中有shuffle,例如(reduce,groupBy),建議設置一個合理的並行數,避免殺掉過多的Executors。

  • 對於每個Stage持續時間很短的應用,其實不適合這套機制。這樣會頻繁增加和殺掉Executors,造成系統顛簸。而Yarn對資源的申請處理速度並不快。


 

官網關於動態資源分配的文檔:

http://spark.apache.org/docs/2.3.1/job-scheduling.html#dynamic-resource-allocation

http://spark.apache.org/docs/2.3.1/configuration.html#dynamic-allocation


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM