一個 Spark 應用程序的完整執行流程


一個 Spark 應用程序的完整執行流程

1、編寫 Spark Application 應用程序
2、打 jar 包,通過 spark-submit 提交執行
3、SparkSubmit 提交執行
4、執行 Spark Application 的 main 方法
5、初始化 SparkContext,這一步主要是把執行 Application 所需要的一個 Driver 和多個 Executor 啟動起來
6、執行到 Action 算子,這個階段會產生 DAG 血緣依賴關系,但是並沒有真正執行
7、執行 Action 算子,生成一個 Job 提交執行
8、DAGScheduler 會對提交的 Job 進行 Stage 切分
9、TaskSchedule 通過 TaskSet 獲取 job 的所有 Task,然后序列化分給 Exector
.... 
shuffle

Application、Job、Stage 和 Task

1、Application:初始化一個 SparkContext 即生成一個 Application;
2、Job:一個 Action 算子就會生成一個 Job;
3、Stage:Stage 等於寬依賴的個數加 1;
4、Task:一個 Stage 階段中,最后一個 RDD 的分區個數就是 Task 的個數。

注意:Application->Job->Stage->Task每一層都是1對n的關系

Spark Application 提交分析

入口:spark application 中的 action 算子!(SparkPi 程序中的 reduce 函數)

以 SparkPi 程序舉例:reduce() 算子就是提交 job 的入口

最后到:

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

從此,任務的提交就交給了 dagScheduler

Spark App Stage 切分分析

入口:EventLoop 中的 eventQueue.take() 方法

如果任務提交,則有 JobSubmitted 事件提交到 eventQueue 中,則 eventQueue.take() 阻塞返回,此時的 event 就是 JobSubmitted。

根據事件機制,跳轉到:DAGScheduler.handleJobSubmitted()

兩個核心的方法:

// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage執行入口
submitStage(finalStage)

方法依賴關系:

1、createResultStage(傳入finalRDD獲得ResultStage) ->2
2、getOrCreateParentStages(傳入rdd獲得父stage) ->3->4
	3、getShuffleDependencies(傳入rdd獲得寬依賴)
	4、getOrCreateShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->5->6
		5、getMissingAncestorShuffleDependencies(傳入一個rdd獲得所有寬依賴) ->3
		6、createShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->2

Spark Task 分發和執行分析

入口:

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
backend.reviveOffers()

總結一下:

1、用戶編寫 spark 應用程序
2、達成jar包
3、通過spark-submit 提交執行
4、sparkSessioin sparkContext 初始化
5、執行action算子
6、sparkContext.runJob()
7、dagScheduler.handleJobSubmitted()
8、dagScheduler.runJob()
	createResultStage() stage切分
	submitStage()
9、taskScheduler.submitTasks(new TaskSet())
10、schedulerBackEnd.reviveOffers();
11、Driver發送 LaunchTask 消息給 Executor 
12、Executor 就會封裝Task 為一個 TaskRunner 對象,提交給該 Executor 的線程池執行
13、Executor 執行的Task 有可能是 ShuffleMapTask,也有可能是ResultTask
14、ShuffleMapTask 會后續的 Shuffle操作,具體有 Writer 完成

Spark Suffle 源碼分析

入口:

Task.runTask()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM