概述
研究Spark作業調度,是為了合理使用集群的資源。更具體一點,是看看是否提供了可以個性化配置的點,然后根據應用的具體情況制定配置或者使用方案。
本文參考官網作業調度文檔。
spark的作業調度分為兩個場景:跨應用的調度和應用內部的調度,下面分別介紹。
聲明:文中配圖是自己的理解,並不敢保證其准確性。
跨應用調度
跨應用的調度是由底層的集群管理器負責的,有兩種資源分配策略。
一種是靜態資源分隔,即一個應用一開始就申請所有的資源,並在程序運行期間使用持有這些資源。
一種是動態資源分配,應用根據自己的負載情況動態請求或釋放資源。這種策略默認是不開啟的。
靜態資源分隔
所有的集群管理器都支持靜態資源分隔,只是具體的配置策略不同:
Standalone mode
提交到Standalone mode集群的應用會以FIFO的順序運行,每一個正在運行的應用都會嘗試占用所有的可用資源。使用下面的配置項可以限制每個應用申請的資源:
spark.cores.max
應用可以申請的最大數量的CPU核的數量,如果沒有設置,取spark.deploy.defaultCores
的值。
spark.executor.memory
分配給每個executor進程的內存資源。
Mesos
為了使用靜態資源隔離,需要設置spark.mesos.coarse
為true,這稱為粗粒度的Mesos模式。
另外,spark.cores.max
和spark.executor.memory
在Mesos模式下同樣有效。
YARN
--num-executors
在使用spark-submit提交作業時,可以使用--num-executors
選項請求指定的executor個數。
在程序內部,可以通過設置spark.executor.instances
屬性達到同樣的目的。
--executor-memory
在使用spark-submit提交作業時,可以使用--executor-memory
選項設置每個executor申請的內存。
在程序內部,可以通過設置spark.executor.memory
屬性達到同樣的目的。
--executor-cores
在使用spark-submit提交作業時,可以使用--executor-cores
選項設置每個executor申請的CPU核。
在程序內部,可以通過設置spark.executor.cores
屬性達到同樣的目的。
動態資源分配
spark的運行模型是基於executor的,executor是資源的實際持有者。所以動態資源分配,是通過動態的申請executor和釋放executor來實現的。
動態資源分配涉及到兩個方面,如何在需要的時候動態申請資源,以及如何在空閑的時候動態釋放資源。
動態請求策略:如果一個應用有tasks在等待,超過一定的時間(spark.dynamicAllocation.schedulerBacklogTimeout
秒)就會申請1個executor。此后每隔一定的時間(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒)就檢測應用是否有tasks在等待,有就繼續申請executor。
動態請求資源的數量是指數級的,第一次申請1個,第二次申請2個,接着是4, 8 ...這種考慮是為了在謹慎申請資源的同時,又可以在允許的時間范圍內獲得真正需要的資源量。
動態釋放資源:是通過檢查應用占據的executor是否超過了指定的時間(spark.dynamicAllocation.executorIdleTimeout
秒)來決定的,超過了就釋放。
釋放資源的條件和請求資源的條件是互斥的,即如果一個應用有tasks在排隊,就不應該會有空閑的executor。
how to do
為了使用動態資源分配,需要做兩件事:
- 設置
spark.dynamicAllocation.enabled
值為true - 在每一個工作節點啟動
external shuffle service
,並設置spark.shuffle.service.enabled
為true
external shuffle service
的作用在后面會介紹,不同集群模式下啟動external shuffle service
的方式不同:
- 在Standalone模式,不需要額外的工作來啟動
external shuffle service
,只需要設置spark.shuffle.service.enabled
為true即可。 - 在Mesos粗粒度模式,在每一個slave nodes運行腳本
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
來啟動external shuffle service
。 - 在YARN模式,參考Configuring the External Shuffle Service On Yarn。
動態移除executor面對的問題
動態釋放資源需要額外的支持,因為executor可能會產生中間結果並輸出到本地,在需要的時候需要通過這個executor獲取它的中間結果。冒然移除executor會丟失它計算的中間結果,導致在真正需要的時候又要重新計算。
比如在map階段executor輸出map結果,在shuffle階段這些map結果又需要通過executor讀出來傳送到負責reduce的executor。
spark通過external shuffle service
來解決這個問題。external shuffle service
是指在每一個node都運行的一個長期進程,這個進程獨立於應用和executor,負責提供executor的輸出數據的獲取服務。原來executor之間相互請求來獲取對方的輸出結果,變成了統一從external shuffle service獲取結果。
即使executor已經被移除了,它所輸出的數據依然可以通過external shuffle service
來獲取。
另外,executor還可能會把中間結果緩存到內存,目前的策略是不移除此類的executor。未來可能采取將緩存持久化的方式,進而釋放executor。
應用內部調度
一個spark應用可以支持多個不同線程的job同時提交,這常見於spark應用提供網絡服務的場景。
spark默認的調度策略是FIFO,如果隊列頭部的job比較大,占用了集群的所有資源,后面的小任務將遲遲得不到運行的機會。
另外,spark還支持配置FAIR調度,spark循環調度每個job的task。這樣即使有大job在運行,剛提交的小job也可以及時獲得資源,而不需要等到大job結束。
通過設置屬性spark.scheduler.mode
來啟用公平調度:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
公平調度池
spark支持公平調度池的概念,每個線程可以指定將jobs提交到哪個池子,最細粒度的場景下是每個線程對應一個池,也可以多個線程使用同一個池。
每個線程默認使用的池是default,也可以通過設置參數來明確指定池。
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
如果想重置當前線程綁定的池子,調用sc.setLocalProperty("spark.scheduler.pool", null)
。
可以通過配置文件將資源按照一定的比重分配到池,配置文件的模板:conf/fairscheduler.xml.template
。
通過conf.set("spark.scheduler.allocation.file", "/path/to/file")
指定配置文件。
每個池可支持的參數有三個:
- schedulingMode:FIFO 或 FAIR,FIFO是默認的策略。
- weight:每個池子分配資源的權重,默認情況下所有的權重為1。
- minShare:最小資源,CPU核的數量,默認為0。在進行資源分配時,總是最先滿足所有池的minShare,再根據weight分配剩下的資源。
配置文件示例:
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
沒有出現在配置文件中的池,所有參數取默認值(schedulingMode=FIFO,weight=1,minShare=0)。
概念澄清
executor到底指什么?和容器、JVM的關系是怎樣的?
executor是負責一定職責的程序組件,可以在已有的JVM中運行(比如local mode),也可以在新的JVM中運行。使用YARN時,executor是在YARN容器中運行的。
spark的job - stage - task的划分是怎么樣的?
spark的job可以划分為多個stage,這些stage構成了DAG。每一個stage又可以划分為多個tasks。stage的划分是根據shuffle map task來的,這一類的task相當於MapReduce中shuffle的map端,負責在本地RDD分區進行計算,並將結果輸出到新的分區,供后續的使用。在划分stage時,shuffle map任務作為階段的結束的邊界。
Mesos的粗粒度和細粒度
Mesos可以啟用CPU核的共享,即同一個節點executor在不使用核的情況下可以讓給另一個executor來使用。
不啟用CPU核共享稱為粗粒度,啟用則稱為細粒度,相關的配置項為spark.mesos.coarse
,值為true表示粗粒度。