Spark 資源調度及任務調度


1、  資源分配

 

         通過SparkSubmit進行提交應用后,首先會創建Client將應用程序(字節碼文件.class)包裝成Driver,並將其注冊到Master。Master收到Client的注冊請求后將其加入待調度隊列waitingDrivers,並等待分配執行資源。

1.1 Dirver調度(分配Driver執行容器,1個)

         Master中調度程序執行時會為Driver分配一滿足其執行要求的Worker, 並通知Worker啟動將Driver。Worker接到執行Driver指令后創建DriverRunner執行Driver(應用程序mainClass,mainClass執行時其會創建Spark執行上下文環境:SparkContext。伴隨SparkContext會創建DAGScheduler和TaskScheduler分別用於Stage調度和任務調度,並會觸發RDD的Action算子提交job)。

1.2 APP調度(分配Executor, 多個)

         若想Job運行就需要得到執行資源,Dirver成功執行后,會通過SparkDeployScheduler-Backend創建AppClient(包裝App信息,包含可以創建CoarseGrainedExecutorBackend實例Command),用於向Master匯報資源需求。Master接到AppClient的匯報后,將其加入waittingApps隊列,等待調度。

App調度時會為app分配滿足條件的資源-----Worker(State是Alive,其上並沒有該Application的executor,可用內存滿足要求(spark.executor.memory指定,默認512), 核滿足要求(spark.cores.max, 最大可用core數,若未指定,則為全部資源)),然后通知Woker啟動Excutor. 及向AppClient發送ExecutorAdded消息。

進行調度時,調度程序會根據配制SpreadOutApps = spark.deploy.spreadOut情況決定資源分配方式,若

SpreadOutApps方式:將每個app分配到盡可能多的worker中執行。

         1 從列表中取下一app,根據CPU情況找出合適的woker,按核從小到大排序

2 如果worker節點存在可以分配的core 則進行預分配處理(輪循一次分一個直至滿足app需求),並在分配列表(assigned = Array[Int](numUsable))中記數。

3根據assinged列表中的預分配信息,進行分配Executor(真實分配)

4 啟動Executor並設置app.state =  ApplicationState.RUNNING

非SpreadOutApps方式: 將每個app分配到盡可能少的worker中執行。

                   1 從可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)

                   2 遍歷waitingApps 找到滿足app運行條件的app,進行分配

3啟動Executor(launchExecutor(w,e))並設置app.state =  ApplicationState.RUNNING

         其中:launchExcutor(worker, exec) 具體內容如下:

                   向executor分配給worker

                                     通知worker啟動executor

由分配過程可知, 分配的Excutor個數與CPU核心數有關。當指定完Worker節點后,會在Worker節點創建ExecutorRunner,並啟動,執行App中的Command 去創建並啟動CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend啟動后,會首先通過傳入的driverUrl這個參數向在CoarseGrainedSchedulerBackend::DriverActor(用於與Master通信,及調度任務)發送RegisterExecutor(executorId, hostPort, cores),DriverActor會創建executorData(executor信息)加入executorDataMap供后續task使用,並回復RegisteredExecutor,此時CoarseGrainedExecutorBackend會創建一個org.apache.spark.executor.Executor。至此,Executor創建完畢。Executor是直接用於task執行, 是集群中的直接勞動者。

         至此,資源分配結束。當分配完資源后,就可以為依本地性為任務分配具體的執行資源。

        

2、Stage划分

         當執行mainClass時,執行到RDD的action算子時,會觸發執行作業(sc.runJob),最終通過調用DAGScheduler的runJob方法根據RDD信息及action算子要做的操作創建ResultStage(FinalStage)及ActiveJob。

         若ResultStage創建成功的話,根據配制信息及RDD特征可分為本地執行,集群執行。

若“spark.localExecution.enable”指定允許本地運行(默認為:false,不允許),具RDD的action算了允許本地運行allowLocal=true,且RDD只有一個partition的話可以直接以本地線程執行job,無需划分stage。否則要將job分成多個Stage提交到集群去執行(通過提交ResultStage進行)。

         因為ResultStage提交時,首先會去判斷其是否存在缺失的ParentStage(也就是說是否存在未完成的父Stage)。若有,則其需要等待其父Stage執行完成,才能進行提交執行。
       判斷是否存在Stage的標准是看是否存在ShuffeDependency(Stage的分界線)。提交ResultStage時會根據其finalRDD 的依賴遞歸的尋找其DAG圖中是否存在ShuffeDependency, 若存在,則創建ShuffleMapStage做為finalStage的父Stage以此類似。但至此,只能說存在父Stage並不能說存在缺失的父Stage. 判斷缺失的標准是看其結果成功的輸出信息(status)個數與其處理的分區個數是否相同,如若相同,則說明父Stage已經執行完成, 不存在missing;否則,說明還未完成,存在missing.  因為將ShuffleMapStage划分成maptask時,每個Partition對應一個maptask, 每個task會得到一個status輸出結果信息,並在執行結束時將輸出結果上報mapOutputTracker,並更新shuffleStage狀態(將status增加進行其outputLocs列表,並將numAvailableOutputs加1),若numAvailableOutputs 與 Stage所要處理的partitions一致,說明所有的task都已經執行完成,即Stage執行完成;否則,說明還有task未完成,即Stage未完成。
       由上述分析可知,存在依賴關系的兩個Stage,如果父Stage未執行完成,子Stage不能提交,也就是不能轉變為Taskset加入任務調度隊列。因此其先后順序是嚴格控制的。我們知道只有存在ShuffleDependency時,才會划分Stage,這也就是說兩個Stage之間是要做Shuffle操作的。根據上述分析可知Shuffle時ShuffleWrite做不完,ShuffleRead不能進行.

3. Task調度

         當Stage不存在缺失的ParentStage時,會將其轉換為TaskSet並提交。轉換時依Stage類型進行轉換:將ResultStage轉換成ResultTask, ShuffleMapStage轉換成ShuffleMapTask. Task個數由Stage中finalRDD 的分區數決定。

         當轉換成的TaskSet提交之后,將其通過taskScheduler包裝成TaskSetManager並添加至調度隊列中(Pool),等待調度。在包裝成TaskSetManager時,根據task的preferredLocatitions將任務分類存放在pendingTasksForExecutor, pendingTaskForHost, pendingTasksForRack, pendingTaskWithNoPrefs及allPendingTasks中, 前三個列表是是包含關系(本地性越來越低),范圍起來越大,例如:在pendingTasksForExecutor也在pendingTaskForHost,pendingTasksForRack中, 分類的目的是在調度時,依次由本地性高à低的查找task。

         在進行Task調度時,首先根據調度策略將可調度所有taskset進行排序,然后對排好序的taskset待調度列表中的taskset,按序進行分配Executor。再分配Executor時,然后逐個為Executor列表中可用的Executor在此次選擇的taskset中按本地性由高到低查找適配任務。此處任務調度為延遲調度,即若本次調度時間距上一任務結束時間小於當前本地性配制時間則等待,若過了配制時間,本地性要求逐漸降低,再去查找適配的task。當選定某一task后后將其加入runningtask列表,當其執行完成時會加入success列表,下次調度時就會過濾過存在這兩個列表中的任務,避免重復調度。

         當一個任務執行結束時,會將其從runningtask中移除,並加入success,並會適放其占用的執行資源,供后序task使用, 將判斷其執行成功的task數與此taskset任務總數相等時,意為taskset中所有任務執行結束,也就是taskset結束。此時會將taskset移除出可調度隊列。

         重復上述過程直到taskset待調度列表為空。即所有作業(job)執行完成。

3.1 spark調度策略

         上文任務調度時提到,在調度任務時,首先后依據調度策略對任務按優先級進行排序。下面就調度策略就行介紹。

         Spark現有的調度策略有FIFO 及 Fair兩種。采用何種調度策略由“spark.scheduler.mode”參數指定,默認為FIFO類型。

   下小節進行分析……

        ……………………

       文章出處:http://www.cnblogs.com/barrenlake/p/4550800.html                       

            ……………………


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM