spark submit啟動到Application執行過程大致分為兩個階段,一個是注冊階段:Driver、Application、worker的注冊;另一個是任務的調度、分配、執行階段:DagScheduler對stage的划分、TaskScheduler對TaskSets的分配以及Executor對Task的執行。
在我們日常開發中,我們常常會在IDEA中編寫好Spark應用程序,然后打包到集群上執行,上面的這幅圖就是以standalone模式提交應用執行的流程。
- 首先是提交打包的應用程序,使用Spark submit或者spark shell工具執行。
- 提交應用程序后后台會在后台啟動Driver進程(注意:這里的Driver是在Client上啟動,如果使用cluster模式提交任務,Driver進程會在Worker節點啟動)。
- 開始構建Spark應用上下文。一般的一個Spark應用程序都會先創建一個Sparkconf,然后來創建SparkContext。如下代碼所示:val conf=new SparkConf() val sc=new SparkContext(conf)。在創建SparkContext對象時有兩個重要的對象,DAGScheduler和TaskScheduler(具體作用后面會詳細講解)。
- 構建好TaskScheduler后,它對應着一個后台進程,接着它會去連接Master集群,向Master集群注冊Application。
- Master節點接收到應用程序之后,會向該Application分配資源,啟動一個或者多個Worker節點。
- 每一個Worker節點會為該應用啟動一個Executor進程來執行該應用程序。
- 向Master節點注冊應用之后,master為應用分配了節點資源,在Worker啟動Executor完成之后,此時,Executo會向TaskScheduler反向注冊,以讓它知道Master為應用程序分配了哪幾台Worker節點和Executor進程來執行任務。到此時為止,整個SparkContext創建完成。
- 創建好SparkContext之后,繼續執行我們的應用程序,每執行一個action操作就創建為一個job,將job交給DAGScheduler執行,然后DAGScheduler會將多個job划分為stage(這里涉及到stage的划分算法,比較復雜)。然后每一個stage創建一個TaskSet。
- 實際上TaskScheduler有自己的后台進程會處理創建好的TaskSet。
- 然后就會將TaskSet中的每一個task提交到Executor上去執行。(這里也涉及到task分配算法,提交到哪幾個worker節點的executor中去執行)。
- Executor會創建一個線程池,當executor接收到一個任務時就從線程池中拿出來一個線程將Task封裝為一個TaskRunner。
- 在TaskRunner中會將我們程序的拷貝,反序列化等操作,然后執行每一個Task。對於這個Task一般有兩種,ShufflerMapTask和ResultTask,只有最后一個stage的task是ResultTask,其它的都是ShufflerMapTask。
- 最后會執行完所有的應用程序,將stage的每一個task分批次提交到executor中去執行,每一個Task針對一個RDD的partition,執行我們定義的算子和函數,直到全部執行完成。