Spark任務提交方式和執行流程


轉自:http://www.cnblogs.com/frankdeng/p/9301485.html

一、Spark集群模式概述

Spark 應用在集群上作為獨立的進程組來運行,在您的main程序中通過SparkContext來協調(稱之為driver程序)。

一、Spark中的基本概念

(1)Application:表示你的應用程序

(2)Driver:表示main()函數,創建SparkContext。由SparkContext負責與ClusterManager通信,進行資源的申請,任務的分配和監控等。程序執行完畢后關閉SparkContext

(3)Executor:某個Application運行在Worker節點上的一個進程,該進程負責運行某些task,並且負責將數據存在內存或者磁盤上。在Spark on Yarn模式下,其進程名稱為 CoarseGrainedExecutor Backend,一個CoarseGrainedExecutor Backend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閑線程運行Task,這樣,每個CoarseGrainedExecutorBackend能並行運行Task的數據就取決於分配給它的CPU的個數。

(4)Worker:集群中可以運行Application代碼的節點。在Standalone模式中指的是通過slave文件配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點。

(5)Task:在Executor進程中執行任務的工作單元,多個Task組成一個Stage

(6)Job:包含多個Task組成的並行計算,是由Action行為觸發的

(7)Stage:每個Job會被拆分很多組Task,作為一個TaskSet,其名稱為Stage

(8)DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler,其划分Stage的依據是RDD之間的依賴關系

(9)TaskScheduler:將TaskSet提交給Worker(集群)運行,每個Executor運行什么Task就是在此處分配的。

二、Spark的運行流程

2.1 Spark的基本運行流程

1、說明

(1)構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;

(2)資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;

(3)SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task

(4)Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。

(5)Task在Executor上運行,運行完畢釋放所有資源。

2、圖解

3、Spark運行架構特點

(1)每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優勢的,無論是從調度角度看(每個Driver調度它自己的任務),還是從運行角度看(來自不同Application的Task運行在不同的JVM中)。當然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。

(2)Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。

(3)提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。

(4)Task采用了數據本地性和推測執行的優化機制。

4、DAGScheduler

Job=多個stage

Stage=多個同種task,

Task分為ShuffleMapTask和ResultTask

Dependency分為ShuffleDependency和NarrowDependency

面向stage的切分,切分依據為寬依賴維護waiting jobs和active jobs,維護waiting stages、active stages和failed stages,以及與jobs的映射關系

 主要職能:

1、接收提交Job的主入口,submitJob(rdd, ...)runJob(rdd, ...)。在SparkContext里會調用這兩個方法。 

  • 生成一個Stage並提交,接着判斷Stage是否有父Stage未完成,若有,提交並等待父Stage,以此類推。結果是:DAGScheduler里增加了一些waiting stage和一個running stage。
  • running stage提交后,分析stage里Task的類型,生成一個Task描述,即TaskSet。
  • 調用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會為這個TaskSet分配資源並觸發執行。
  • DAGScheduler提交job后,異步返回JobWaiter對象,能夠返回job運行狀態,能夠cancel job,執行成功后會處理並返回結果

2、處理TaskCompletionEvent 

  • 如果task執行成功,對應的stage里減去這個task,做一些計數工作: 
    • 如果task是ResultTask,計數器Accumulator加一,在job里為該task置true,job finish總數加一。加完后如果finish數目與partition數目相等,說明這個stage完成了,標記stage完成,從running stages里減去這個stage,做一些stage移除的清理工作
    • 如果task是ShuffleMapTask,計數器Accumulator加一,在stage里加上一個output location,里面是一個MapStatus類。MapStatusShuffleMapTask執行完成的返回,包含location信息和block size(可以選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker注冊本stage里的shuffleId和location信息。然后檢查stage的output location里是否存在空,若存在空,說明一些task失敗了,整個stage重新提交;否則,繼續從waiting stages里提交下一個需要做的stage
  • 如果task是重提交,對應的stage里增加這個task
  • 如果task是fetch失敗,馬上標記對應的stage完成,從running stages里減去。如果不允許retry,abort整個stage;否則,重新提交整個stage。另外,把這個fetch相關的location和map任務信息,從stage里剔除,從MapOutputTracker注銷掉。最后,如果這次fetch的blockManagerId對象不為空,做一次ExecutorLost處理,下次shuffle會換在另一個executor上去執行。
  • 其他task狀態會由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

3、其他與job相關的操作還包括:cancel job, cancel stage, resubmit failed stage等

其他職能:

 cacheLocations 和 preferLocation

5、TaskScheduler

維護task和executor對應關系,executor和物理資源對應關系,在排隊的task和正在跑的task。

內部維護一個任務隊列,根據FIFO或Fair策略,調度任務。

TaskScheduler本身是個接口,spark里只實現了一個TaskSchedulerImpl,理論上任務調度可以定制。

1、submitTasks(taskSet),接收DAGScheduler提交來的tasks 

  • 為tasks創建一個TaskSetManager,添加到任務隊列里。TaskSetManager跟蹤每個task的執行狀況,維護了task的許多具體信息。
  • 觸發一次資源的索要。 
    • 首先,TaskScheduler對照手頭的可用資源和Task隊列,進行executor分配(考慮優先級、本地化等策略),符合條件的executor會被分配給TaskSetManager
    • 然后,得到的Task描述交給SchedulerBackend,調用launchTask(tasks),觸發executor上task的執行。task描述被序列化后發給executor,executor提取task信息,調用task的run()方法執行計算。

2、cancelTasks(stageId),取消一個stage的tasks 

  • 調用SchedulerBackendkillTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler里一直維護着。

3、resourceOffer(offers: Seq[Workers]),這是非常重要的一個方法,調用者是SchedulerBacnend,用途是底層資源SchedulerBackend把空余的workers資源交給TaskScheduler,讓其根據調度策略為排隊的任務分配合理的cpu和內存資源,然后把任務描述列表傳回給SchedulerBackend 

  • 從worker offers里,搜集executor和host的對應關系、active executors、機架信息等等
  • worker offers資源列表進行隨機洗牌,任務隊列里的任務列表依據調度策略進行一次排序
  • 遍歷每個taskSet,按照進程本地化、worker本地化、機器本地化、機架本地化的優先級順序,為每個taskSet提供可用的cpu核數,看是否滿足 
    • 默認一個task需要一個cpu,設置參數為"spark.task.cpus=1"
    • 為taskSet分配資源,校驗是否滿足的邏輯,最終在TaskSetManagerresourceOffer(execId, host, maxLocality)方法里
    • 滿足的話,會生成最終的任務描述,並且調用DAGSchedulertaskStarted(task, info)方法,通知DAGScheduler,這時候每次會觸發DAGScheduler做一次submitMissingStage的嘗試,即stage的tasks都分配到了資源的話,馬上會被提交執行

4、statusUpdate(taskId, taskState, data),另一個非常重要的方法,調用者是SchedulerBacnend,用途是SchedulerBacnend會將task執行的狀態匯報給TaskScheduler做一些決定 

    • TaskLost,找到該task對應的executor,從active executor里移除,避免這個executor被分配到其他task繼續失敗下去。
    • task finish包括四種狀態:finished, killed, failed, lost。只有finished是成功執行完成了。其他三種是失敗。
    • task成功執行完,調用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否則調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)TaskResultGetter內部維護了一個線程池,負責異步fetch task執行結果並反序列化。默認開四個線程做這件事,可配參數"spark.resultGetter.threads"=4

 TaskResultGetter取task result的邏輯

1、對於success task,如果taskResult里的數據是直接結果數據,直接把data反序列出來得到結果;如果不是,會調用blockManager.getRemoteBytes(blockId)從遠程獲取。如果遠程取回的數據是空的,那么會調用TaskScheduler.handleFailedTask,告訴它這個任務是完成了的但是數據是丟失的。否則,取到數據之后會通知BlockManagerMaster移除這個block信息,調用TaskScheduler.handleSuccessfulTask,告訴它這個任務是執行成功的,並且把result data傳回去。

2、對於failed task,從data里解析出fail的理由,調用TaskScheduler.handleFailedTask,告訴它這個任務失敗了,理由是什么。

6、SchedulerBackend

TaskScheduler下層,用於對接不同的資源管理系統,SchedulerBackend是個接口,需要實現的主要方法如下:

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler,TaskScheduler根據調度策略分配給排隊的任務嗎,返回一批可執行的任務描述,SchedulerBackend負責launchTask,即最終把task塞到了executor模型上,executor里的線程池會執行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException

粗粒度:進程常駐的模式,典型代表是standalone模式,mesos粗粒度模式,yarn

細粒度:mesos細粒度模式

這里討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend

維護executor相關信息(包括executor的地址、通信端口、host、總核數,剩余核數),手頭上executor有多少被注冊使用了,有多少剩余,總共還有多少核是空的等等。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM