Spark Job調度


Spark Job調度

1、概覽

Spark有幾種用於在計算之間調度資源的工具。首先,回想一下,如集群模式概述中所述,每個Spark應用程序(SparkContext的實例)都運行一組獨立的executor進程。Spark運行的集群管理器提供了跨應用程序的調度工具。其次, 每個Spark應用程序中,如果多個“job”(每個Spark action都是一個job)由不同的線程提交,則它們可以同時運行。如果您的應用程序通過網絡提供請求,則這很常見。Spark包含一個公平的調度程序來調度每個SparkContext中的資源。

2、跨應用程序進行調度

在集群上運行時,每個Spark應用程序都會獲得一組獨立的executor JVM,它們只運行task並為該應用程序存儲數據。如果多個用戶需要共享您的群集,則可以使用不同的選項來管理分配,具體取決於群集管理器。

所有集群管理器上都可以使用的最簡單選項是資源的靜態分區。通過這種方法,每個應用程序都可以使用最大量的資源,並在整個持續時間內持有它們。這是Spark standalone、YARN以及粗粒度Mesos模式模式中使用的方法。可以根據群集類型配置資源分配如下:

  • 獨立模式

    默認情況下,提交到獨立模式群集的應用程序將以FIFO(先進先出)順序運行,每個應用程序將嘗試使用所有可用節點。您可以通過spark.cores.max在其中設置配置屬性來限制應用程序使用的節點數,或者更改未設此條目的應用程序的默認值spark.deploy.defaultCores。最后,除了控制core外,每個應用程序的spark.executor.memory設置還控制其內存使用。

  • Mesos

    要在Mesos上使用靜態分區,請將spark.mesos.coarse配置屬性設置為true,並可選擇設置spark.cores.max為限制每個應用程序的資源共享,如獨立模式。您還應該設置spark.executor.memory控制執行程序內存。

  • YARN

    --num-executors* Spark YARN客戶端的選項控制它將在集群上分配的執行程序數(spark.executor.instances作為配置屬性),而--executor-memoryspark.executor.memory配置屬性)和--executor-coresspark.executor.cores配置屬性)控制每個執行程序的資源。有關更多信息,請參閱 YARN Spark屬性

Mesos上的第二個選項是動態共享 CPU內核。在此模式下,每個Spark應用程序仍然具有固定且獨立的內存分配(設置方式spark.executor.memory),但是當應用程序未在計算機上運行任務時,其他應用程序可能會在這些核心上運行任務。當您期望大量不過度活躍的應用程序(例如來自不同用戶的shell會話)時,此模式非常有用。但是,它存在延遲可預測性較低的風險,因為應用程序可能需要一段時間才能在一個節點上獲取內核。要使用此模式,只需使用 mesos://URL並設置spark.mesos.coarse為false。

注意,目前沒有一種模式可以跨應用程序提供內存共享。如果您希望以這種方式共享數據,我們建議運行單個服務器應用程序,通過查詢相同的RDD來提供多個請求。

2.1 動態資源分配

Spark提供了一種機制,可根據工作負載動態調整應用程序占用的資源。這意味着如果不再使用它們時會將資源返回給群集,並在有需求時再次請求它們。這一特性在多個應用程序間共享群集資源尤為有用。

默認情況下禁用此功能,並且可在所有粗粒度集群管理器上使用,即 獨立模式YARN模式Mesos粗粒度模式

2.1.1 配置和設置

使用此功能有兩個要求。首先,您的應用程序必須設置 spark.dynamicAllocation.enabledtrue。其次,必須 在同一群集中的每個工作節點上設置外部shuffle服務,並spark.shuffle.service.enabled在應用程序中設置為true。外部shuffle服務的目的是允許刪除執行程序而不刪除由它們寫入的shuffle文件(更詳細描述 如下)。該屬性配置在spark-default.conf文件中,不是spark-env.sh文件中。

#spark-default.conf
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true

設置此服務的方式因集群管理器而異:

  • 在獨立模式下,只需啟動spark.shuffle.service.enabled設置為的工作人員即可true
  • 在Mesos粗粒度模式下,$SPARK_HOME/sbin/start-mesos-shuffle-service.shspark.shuffle.service.enabled設置為的所有從屬節點上運行true。例如,你可以通過Marathon這樣做。
  • 在YARN模式下,請按照此處的說明操作。

所有其他相關配置都是可選的,位於spark.dynamicAllocation.和 spark.shuffle.servic.稱空間下。有關更多詳細信息,請參閱 配置頁面

2.1.2 資源分配策略

在較高的層面上,Spark在不再使用executor時放棄它,並在需要時重新獲取。由於沒有確定的方法可以預測即將被刪除的執行程序是否會在不久的將來運行任務,或者即將添加的新執行程序是否實際上是空閑的,我們需要一組啟發式來確定何時刪除並請求執行者。

a) 請求策略

啟用動態分配的Spark應用程序在等待計划等待掛起的任務時請求其他執行程序。這種情況必然意味着現有的執行者集合不足以同時使已提交但尚未完成的所有任務飽和。

Spark會輪次請求執行程序。當有待處理的任務持續spark.dynamicAllocation.schedulerBacklogTimeout幾秒鍾時會觸發實際請求,spark.dynamicAllocation.sustainedSchedulerBacklogTimeout如果待處理任務隊列仍然存在,則會在此后每隔幾秒再次觸發。此外,每輪請求的執行者數量與上一輪相比呈指數增長。例如,一個應用程序將在第一輪中添加1個執行程序,然后在后續輪次中添加執行程序中的2,4,8等。

指數增長政策的動機是雙重的。首先,應用程序應該在開始時謹慎地請求執行程序,以防只有少數額外的執行程序就足夠了。這與TCP慢啟動的理由相呼應。其次,應用程序應該能夠及時提高其資源使用率,以防萬一實際需要許多執行程序。

b) 刪除策略

刪除執行程序的策略要簡單得多。當Spark應用程序空閑超過spark.dynamicAllocation.executorIdleTimeout幾秒鍾時,它會刪除執行程序。請注意,在大多數情況下,此條件與請求條件互斥,因為如果仍有待安排的待處理任務,則執行程序不應處於空閑狀態。

2.1.3 executor的優雅退役

在動態分配之前,Spark executor會在失敗時或關聯的應用程序退出時也會退出。在這兩種情況下,不再需要與executor關聯的所有狀態,並且可以安全地丟棄。但是,通過動態分配,當顯式刪除executor時,應用程序仍在運行。如果應用程序嘗試訪問存儲在執行程序中或由執行程序寫入的狀態,則必須執行重新計算狀態。因此,Spark需要一種機制來通過在刪除executor之前保留其狀態來優雅地停用executor。

這一要求對於shuffle尤其重要。在shuffle期間,Spark executor首先將其自己的map輸出寫入本地磁盤,然后在其他executor嘗試獲取這些文件時充當這些文件的服務器。如果straggler是比同行運行更長時間的任務,動態分配可能會在shuffle完成之前刪除executor,在這種情況下,必須重新計算由該executor寫入的shuffle文件。

保留shuffle文件的解決方案是使用外部shuffle服務,也在Spark 1.2中引入。此服務指的是一個長期運行的進程,它獨立於Spark應用程序及其執行程序在集群的每個節點上運行。如果啟用該服務,Spark執行程序將從服務而不是相互之間獲取隨機文件。這意味着遺囑執行人所寫的任何shuffle狀態可能會繼續在遺囑執行人的有效期內提供。

除了編寫shuffle文件外,執行程序還會將數據緩存在磁盤或內存中。但是,刪除執行程序后,將無法再訪問所有緩存的數據。為了緩解這種情況,默認情況下,永遠不會刪除包含緩存數據的執行程序。您可以使用配置此行為 spark.dynamicAllocation.cachedExecutorIdleTimeout。在將來的版本中,緩存的數據可以通過堆外存儲來保存,這類存儲的精神與通過外部shuffle服務保存shuffle文件的方式類似。

3、應用程序內的調度

在給定的Spark應用程序(SparkContext實例)中,如果從不同線程提交多個並行job,則它們可以同時運行。Job這里指的是action,並在需要運行,以評估行動的任何任務。Spark的調度程序是完全線程安全的,並支持此用例,以支持提供多個請求的應用程序(例如,查詢多個用戶)。

默認情況下,Spark的調度程序以FIFO方式運行作業。每個Job分為“Stage”(例如map和reduce階段),第一個job優先sh使用所有可用資源,而其階段有任務要啟動,然后第二個工作獲得優先權等。如果工作在頭部隊列不需要使用整個集群,以后的作業可以立即開始運行,但如果隊列頭部的作業很大,則后續作業可能會顯着延遲。

從Spark 0.8開始,還可以在作業之間配置公平共享。在公平共享下,Spark以“循環”方式在作業之間分配任務,以便所有作業獲得大致相等的群集資源份額。這意味着在長時間工作運行時提交的短工作可以立即開始接收資源,並且仍然可以獲得良好的響應時間,而無需等待長時間工作完成。此模式最適合多用戶設置。

要啟用公平調度程序,只需在配置SparkContext時將spark.scheduler.mode屬性設置為FAIR

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平調度池

公平調度程序還支持將作業分組到池中,並為每個池設置不同的調度選項(例如權重)。例如,這可以用於為更重要的作業創建“高優先級”池,或者將每個用戶的作業組合在一起,並為用戶提供相同的份額,而不管他們擁有多少並發作業,而不是給予作業相等的份額。此方法以Hadoop Fair Scheduler為模型 。

在沒有任何干預的情況下,新提交的作業將進入默認池,但可以通過將spark.scheduler.pool“本地屬性” 添加到提交它們的線程中的SparkContext 來設置作業池。這樣做如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

設置這個本地屬性后,所有的工作(在此線程通過調用這個線程內提交RDD.savecountcollect,等)將使用此池名稱。該設置是每個線程,以便讓線程代表同一個用戶運行多個作業變得容易。如果您想清除與線程關聯的池,只需調用:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默認行為

默認情況下,每個池獲得的集群份額相等(默認池中的每個作業的份額也相等),但在每個池中,作業按FIFO順序運行。例如,如果您為每個用戶創建一個池,這意味着每個用戶將獲得相同的群集份額,並且每個用戶的查詢將按順序運行,而不是以后的查詢從該用戶的早期查詢中獲取資源。

配置池屬性

還可以通過配置文件修改特定池的屬性。每個池支持三個屬性:

  • schedulingMode:這可以是FIFO或FAIR,用於控制池中的作業是否相互排隊(默認)或公平地共享池的資源。
  • weight:這將控制池相對於其他池的共享。默認情況下,所有池的權重均為1.例如,如果為特定池提供權重2,則其資源將比其他活動池多2倍。設置高權重(例如1000)也可以實現 池之間的優先級 - 實質上,只要有活動作業,weight-1000池就會始終首先啟動任務。
  • minShare:除了總體權重之外,每個池都可以獲得管理員希望擁有的最小份額(作為許多CPU核心)。公平調度程序始終嘗試滿足所有活動池的最小份額,然后根據權重重新分配額外資源。minShare因此,該屬性可以是另一種確保池可以總是快速獲得一定數量的資源(例如10個核心)的方法,而不會為集群的其余部分賦予高優先級。默認情況下,每個池minShare為0。

可以通過創建XML文件來設置池屬性,類似於conf/fairscheduler.xml.template,並且要么fairscheduler.xml在類路徑中放置一個文件,要么spark.scheduler.allocation.fileSparkConf中設置屬性 。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

XML文件的格式只是 每個池的一個元素,其中包含不同的元素用於各種設置。例如:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

一個完整的例子也可用於conf/fairscheduler.xml.template。請注意,未在XML文件中配置的任何池將僅獲取所有設置的默認值(調度模式FIFO,權重1和minShare 0)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM