轉自:http://www.cnblogs.com/frankdeng/p/9301485.html
一、Spark集群模式概述
Spark 應用在集群上作為獨立的進程組來運行,在您的main程序中通過SparkContext來協調(稱之為driver程序)。
一、Spark中的基本概念
(1)Application:表示你的應用程序
(2)Driver:表示main()函數,創建SparkContext。由SparkContext負責與ClusterManager通信,進行資源的申請,任務的分配和監控等。程序執行完畢后關閉SparkContext
(3)Executor:某個Application運行在Worker節點上的一個進程,該進程負責運行某些task,並且負責將數據存在內存或者磁盤上。在Spark on Yarn模式下,其進程名稱為 CoarseGrainedExecutor Backend,一個CoarseGrainedExecutor Backend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閑線程運行Task,這樣,每個CoarseGrainedExecutorBackend能並行運行Task的數據就取決於分配給它的CPU的個數。
(4)Worker:集群中可以運行Application代碼的節點。在Standalone模式中指的是通過slave文件配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點。
(5)Task:在Executor進程中執行任務的工作單元,多個Task組成一個Stage
(6)Job:包含多個Task組成的並行計算,是由Action行為觸發的
(7)Stage:每個Job會被拆分很多組Task,作為一個TaskSet,其名稱為Stage
(8)DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler,其划分Stage的依據是RDD之間的依賴關系
(9)TaskScheduler:將TaskSet提交給Worker(集群)運行,每個Executor運行什么Task就是在此處分配的。
二、Spark的運行流程
2.1 Spark的基本運行流程
1、說明
(1)構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
(2)資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;
(3)SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task
(4)Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
(5)Task在Executor上運行,運行完畢釋放所有資源。
2、圖解
3、Spark運行架構特點
(1)每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優勢的,無論是從調度角度看(每個Driver調度它自己的任務),還是從運行角度看(來自不同Application的Task運行在不同的JVM中)。當然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。
(2)Spark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。
(3)提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。
(4)Task采用了數據本地性和推測執行的優化機制。
4、DAGScheduler
Job=多個stage
Stage=多個同種task,
Task分為ShuffleMapTask和ResultTask
Dependency分為ShuffleDependency和NarrowDependency
面向stage的切分,切分依據為寬依賴維護waiting jobs和active jobs,維護waiting stages、active stages和failed stages,以及與jobs的映射關系
主要職能:
1、接收提交Job的主入口,
submitJob(rdd, ...)
或runJob(rdd, ...)
。在SparkContext
里會調用這兩個方法。
- 生成一個Stage並提交,接着判斷Stage是否有父Stage未完成,若有,提交並等待父Stage,以此類推。結果是:DAGScheduler里增加了一些waiting stage和一個running stage。
- running stage提交后,分析stage里Task的類型,生成一個Task描述,即TaskSet。
- 調用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會為這個TaskSet分配資源並觸發執行。DAGScheduler
提交job后,異步返回JobWaiter
對象,能夠返回job運行狀態,能夠cancel job,執行成功后會處理並返回結果2、處理
TaskCompletionEvent
- 如果task執行成功,對應的stage里減去這個task,做一些計數工作:
- 如果task是ResultTask,計數器
Accumulator
加一,在job里為該task置true,job finish總數加一。加完后如果finish數目與partition數目相等,說明這個stage完成了,標記stage完成,從running stages里減去這個stage,做一些stage移除的清理工作- 如果task是ShuffleMapTask,計數器
Accumulator
加一,在stage里加上一個output location,里面是一個MapStatus
類。MapStatus
是ShuffleMapTask
執行完成的返回,包含location信息和block size(可以選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker
注冊本stage里的shuffleId和location信息。然后檢查stage的output location里是否存在空,若存在空,說明一些task失敗了,整個stage重新提交;否則,繼續從waiting stages里提交下一個需要做的stage- 如果task是重提交,對應的stage里增加這個task
- 如果task是fetch失敗,馬上標記對應的stage完成,從running stages里減去。如果不允許retry,abort整個stage;否則,重新提交整個stage。另外,把這個fetch相關的location和map任務信息,從stage里剔除,從
MapOutputTracker
注銷掉。最后,如果這次fetch的blockManagerId對象不為空,做一次ExecutorLost
處理,下次shuffle會換在另一個executor上去執行。- 其他task狀態會由
TaskScheduler
處理,如Exception, TaskResultLost, commitDenied等。3、其他與job相關的操作還包括:cancel job, cancel stage, resubmit failed stage等
其他職能:
cacheLocations 和 preferLocation
5、TaskScheduler
維護task和executor對應關系,executor和物理資源對應關系,在排隊的task和正在跑的task。
內部維護一個任務隊列,根據FIFO或Fair策略,調度任務。
TaskScheduler
本身是個接口,spark里只實現了一個TaskSchedulerImpl
,理論上任務調度可以定制。
1、submitTasks(taskSet)
,接收DAGScheduler
提交來的tasks
- 為tasks創建一個
TaskSetManager
,添加到任務隊列里。TaskSetManager
跟蹤每個task的執行狀況,維護了task的許多具體信息。 - 觸發一次資源的索要。
- 首先,
TaskScheduler
對照手頭的可用資源和Task隊列,進行executor分配(考慮優先級、本地化等策略),符合條件的executor會被分配給TaskSetManager
。 - 然后,得到的Task描述交給
SchedulerBackend
,調用launchTask(tasks)
,觸發executor上task的執行。task描述被序列化后發給executor,executor提取task信息,調用task的run()
方法執行計算。
- 首先,
2、cancelTasks(stageId)
,取消一個stage的tasks
- 調用
SchedulerBackend
的killTask(taskId, executorId, ...)
方法。taskId和executorId在TaskScheduler
里一直維護着。
3、resourceOffer(offers: Seq[Workers])
,這是非常重要的一個方法,調用者是SchedulerBacnend
,用途是底層資源SchedulerBackend
把空余的workers資源交給TaskScheduler
,讓其根據調度策略為排隊的任務分配合理的cpu和內存資源,然后把任務描述列表傳回給SchedulerBackend
- 從worker offers里,搜集executor和host的對應關系、active executors、機架信息等等
- worker offers資源列表進行隨機洗牌,任務隊列里的任務列表依據調度策略進行一次排序
- 遍歷每個taskSet,按照進程本地化、worker本地化、機器本地化、機架本地化的優先級順序,為每個taskSet提供可用的cpu核數,看是否滿足
- 默認一個task需要一個cpu,設置參數為
"spark.task.cpus=1"
- 為taskSet分配資源,校驗是否滿足的邏輯,最終在
TaskSetManager
的resourceOffer(execId, host, maxLocality)
方法里 - 滿足的話,會生成最終的任務描述,並且調用
DAGScheduler
的taskStarted(task, info)
方法,通知DAGScheduler
,這時候每次會觸發DAGScheduler
做一次submitMissingStage
的嘗試,即stage的tasks都分配到了資源的話,馬上會被提交執行
- 默認一個task需要一個cpu,設置參數為
4、statusUpdate(taskId, taskState, data)
,另一個非常重要的方法,調用者是SchedulerBacnend
,用途是SchedulerBacnend
會將task執行的狀態匯報給TaskScheduler
做一些決定
- 若
TaskLost
,找到該task對應的executor,從active executor里移除,避免這個executor被分配到其他task繼續失敗下去。 - task finish包括四種狀態:finished, killed, failed, lost。只有finished是成功執行完成了。其他三種是失敗。
- task成功執行完,調用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,否則調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
。TaskResultGetter
內部維護了一個線程池,負責異步fetch task執行結果並反序列化。默認開四個線程做這件事,可配參數"spark.resultGetter.threads"=4
。
TaskResultGetter取task result的邏輯
1、對於success task,如果taskResult里的數據是直接結果數據,直接把data反序列出來得到結果;如果不是,會調用blockManager.getRemoteBytes(blockId)
從遠程獲取。如果遠程取回的數據是空的,那么會調用TaskScheduler.handleFailedTask
,告訴它這個任務是完成了的但是數據是丟失的。否則,取到數據之后會通知BlockManagerMaster
移除這個block信息,調用TaskScheduler.handleSuccessfulTask
,告訴它這個任務是執行成功的,並且把result data傳回去。
2、對於failed task,從data里解析出fail的理由,調用TaskScheduler.handleFailedTask
,告訴它這個任務失敗了,理由是什么。
6、SchedulerBackend
在TaskScheduler
下層,用於對接不同的資源管理系統,SchedulerBackend
是個接口,需要實現的主要方法如下:
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler,TaskScheduler根據調度策略分配給排隊的任務嗎,返回一批可執行的任務描述,SchedulerBackend負責launchTask,即最終把task塞到了executor模型上,executor里的線程池會執行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
粗粒度:進程常駐的模式,典型代表是standalone模式,mesos粗粒度模式,yarn
細粒度:mesos細粒度模式
這里討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend
。
維護executor相關信息(包括executor的地址、通信端口、host、總核數,剩余核數),手頭上executor有多少被注冊使用了,有多少剩余,總共還有多少核是空的等等。