不上博客園有一段時間,回過頭來翻看之前寫的幾篇博客,都是寫了一半就丟了,又一次更深的認識自己的懶惰。
做大數據項目快三個月了,一個新的領域一邊探索一邊實現,特別有意思,現在回來梳理對Spark的認識。專題的資料來源是官網和網絡上的博客,暫時還不會涉及對源碼的閱讀。這是專題的第一篇文章,寫寫我對Spark工作流的整體理解,接下來的專題內容會對工作流中的各個組成部分作探究,主要思路:
- 定義,即是什么?
- 為什么要在Spark中這么實現?
- 在Spark中是如何實現的?
- 如若涉及到調優,該如何調優?
首先,說明spark的一個application組成:

圖1
這張是Spark集群工作圖,負責調度的是Cluster Manager,一個Appliaction,即用戶自己寫的 Spark 程序(driver program),比如 WordCount.scala,它包含一個driver(驅動)和多個executor(執行器),其中:
- 驅動(driver)持有應用(SparkContext),為工作調度任務;
- 執行器(executor)獨立於應用(SparkContext),在應用的持續時間內運行,執行應用的任務。
接着,看一個Application具體如何流轉。

圖2 圖3
Application的工作流程分為Job提交和Task任務執行兩個部分,對應圖2、圖3的左半圖和右半圖。Job提交具體的是每個stage的提交,其流程步驟如下:
- 根據rdd.transformation()操作建立 computing chain(一系列的 RDD),這是Job的邏輯執行圖;
- 執行rdd.action(),調用SparkContext的runJob()方法,生成DAGScheduler;
- DAGScheduler運行自己的runJob()方法,生成stages【備注:可能生成多個stage】;
- 接着,DAGScheduler運行自己的submitStage()方法,在這個方法中,定義ShuffleMapTasks 或 ResultTasks任務,然后將任務打包到一個TaskSet,提交給taskScheduler這個任務調度器;
- sparkDeploySchedulerBackend 接收到TaskSet的任務后,會進行序列化,然后通過DriverActor發送給Worker節點Executor的CoarseGrainedExecutorBackend Actor【備注:一個 CoarseGrainedExecutorBackend 進程有且僅有一個 executor 對象】;
- 最后,Executor把task包裝成taskRunner,每個taskRunner從線程池中抽取出一個空閑線程運行。
對於Task任務的執行流程,看一張圖:

圖4
Task任務執行的流程圖具體步驟:
- Executor反序列化接收到的任務,運行任務返回結果directResult/indirectResult給driver:如果directResult比較小,直接返回給driver;否則將結果通過blockManager的“memory + hard disk”模式管理結果,返回給driver的只是包含存儲任務運行結果位置的indirectResult,由driver通過HTTP主動拉取任務運行結果;【備注:是否直接返回任務運行結果的閾值屬性設置為spark.akka.frameSize = 10MB】
- driver獲得該task任務的計算結果result后,它會通知taskScheduler,任務已經處理完成,並分析結果;
- 如果計算結果result來自於ResultTask,在driver中通過resultHandler 統計這個result,如果是ShuffleMapTask這類結果,會存儲到mapOutputTrackerMaster做下一步處理;
- 如果task任務是stage中的最后一個,那么開始提交下一個stage;
- 如果stage是一個job中的最后一個,則通知DAGScheduler這個job已經完成處理。
參考資料:https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/5-Architecture.md
