一個 Spark 應用程序的完整執行流程
1、編寫 Spark Application 應用程序
2、打 jar 包,通過 spark-submit 提交執行
3、SparkSubmit 提交執行
4、執行 Spark Application 的 main 方法
5、初始化 SparkContext,這一步主要是把執行 Application 所需要的一個 Driver 和多個 Executor 啟動起來
6、執行到 Action 算子,這個階段會產生 DAG 血緣依賴關系,但是並沒有真正執行
7、執行 Action 算子,生成一個 Job 提交執行
8、DAGScheduler 會對提交的 Job 進行 Stage 切分
9、TaskSchedule 通過 TaskSet 獲取 job 的所有 Task,然后序列化分給 Exector
....
shuffle
Application、Job、Stage 和 Task
1、Application:初始化一個 SparkContext 即生成一個 Application;
2、Job:一個 Action 算子就會生成一個 Job;
3、Stage:Stage 等於寬依賴的個數加 1;
4、Task:一個 Stage 階段中,最后一個 RDD 的分區個數就是 Task 的個數。
注意:Application->Job->Stage->Task每一層都是1對n的關系
Spark Application 提交分析
入口:spark application 中的 action 算子!(SparkPi 程序中的 reduce 函數)
以 SparkPi 程序舉例:reduce() 算子就是提交 job 的入口
最后到:
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
從此,任務的提交就交給了 dagScheduler
Spark App Stage 切分分析
入口:EventLoop 中的 eventQueue.take() 方法
如果任務提交,則有 JobSubmitted 事件提交到 eventQueue 中,則 eventQueue.take() 阻塞返回,此時的 event 就是 JobSubmitted。
根據事件機制,跳轉到:DAGScheduler.handleJobSubmitted()
兩個核心的方法:
// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage執行入口
submitStage(finalStage)
方法依賴關系:
1、createResultStage(傳入finalRDD獲得ResultStage) ->2
2、getOrCreateParentStages(傳入rdd獲得父stage) ->3->4
3、getShuffleDependencies(傳入rdd獲得寬依賴)
4、getOrCreateShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->5->6
5、getMissingAncestorShuffleDependencies(傳入一個rdd獲得所有寬依賴) ->3
6、createShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->2
Spark Task 分發和執行分析
入口:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
backend.reviveOffers()
總結一下:
1、用戶編寫 spark 應用程序
2、達成jar包
3、通過spark-submit 提交執行
4、sparkSessioin sparkContext 初始化
5、執行action算子
6、sparkContext.runJob()
7、dagScheduler.handleJobSubmitted()
8、dagScheduler.runJob()
createResultStage() stage切分
submitStage()
9、taskScheduler.submitTasks(new TaskSet())
10、schedulerBackEnd.reviveOffers();
11、Driver發送 LaunchTask 消息給 Executor
12、Executor 就會封裝Task 為一個 TaskRunner 對象,提交給該 Executor 的線程池執行
13、Executor 執行的Task 有可能是 ShuffleMapTask,也有可能是ResultTask
14、ShuffleMapTask 會后續的 Shuffle操作,具體有 Writer 完成
Spark Suffle 源碼分析
入口:
Task.runTask()