Spark基本工作流程及YARN cluster模式原理
轉載請注明出處:http://www.cnblogs.com/BYRans/
Spark基本工作流程
相關術語解釋
Spark應用程序相關的幾個術語:
- Worker:集群中任何可以運行Application代碼的節點,類似於YARN中的NodeManager節點。在Spark on Yarn模式中指的就是NodeManager節點;
- Executor:Application運行在Worker 節點上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上,每個Application都有各自獨立的一批Executor。
- SparkContext:由用戶程序啟動,通過資源調度模塊與Executor通信。
- Driver:運行Application的main()函數,並創建SparkContext。其中創建SparkContext的目的是為了准備Spark應用程序的運行環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢后,Driver負責將SparkContext關閉。通常用SparkContext代表Drive;
基本運行流程
Spark應用程序有多種運行模式。SparkContext和Executor這兩部分的核心代碼實現在各種運行模式中都是公用的,在這兩部分之上,根據運行部署模式(例如:Local[N]、Yarn cluster等)的不同,有不同的調度模塊以及對應的適配代碼。
具體來說,以SparkContext為程序運行的總入口,在SparkContext的初始化過程中,Spark會分別創建DAGScheduler作業和TaskScheduler任務調度兩級調度模塊。
其中作業調度模塊是基於任務階段的高層調度模塊,它為每個Spark作業計算具有依賴關系的多個調度階段(通常根據shuffle來划分),然后為每個階段構建出一組具體的任務(通常會考慮數據的本地性等),然后以TaskSets(任務組)的形式提交給任務調度模塊來具體執行。而任務調度模塊則負責具體啟動任務、監控和匯報任務運行情況。
詳細的運行流程為:
- 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
- 資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;
- SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
- Task在Executor上運行,運行完畢釋放所有資源。
作業調度模塊和具體的部署運行模式無關,在各種運行模式下邏輯相同。不同運行模式的區別主要體現在任務調度模塊。不同的部署和運行模式,根據底層資源調度方式的不同,各自實現了自己特定的任務調度模塊,用來將任務實際調度給對應的計算資源。接下來重點介紹下YARN cluster模式的實現原理和實現細節。
YARN cluster運行模式的內部實現原理
Spark有多種運行模式,在這里主要介紹下YARN cluster模式的內部實現原理。如下圖是YARN cluster模式的原理框圖,相對於其他模式,該模式比較特殊的是它需要由外部程序輔助啟動APP。用戶的應用程序通過輔助的YARN Client類啟動。YARN cluster模式和YARN client模式的區別在於:YARN client模式的AM是運行在提交任務的節點,而YARN cluster模式的AM是由YARN在集群中選取一個節點運行,不一定是在提交任務的節點運行。例如spark-shell如果需要使用YARN模式運行,只能為yarn-client
模式,啟動命令可以使用spark-shell --master yarn-client
。
Client類通過YARN Client API提交請求,在Hadoop集群上啟動一個Spark ApplicationMaster,Spark ApplicationMaster首先注冊自己為一個YARN ApplicationMaster,之后啟動用戶程序,SparkContext在用戶程序中初始化時,使用CoarseGrainedSchedulerBackend配合YARNClusterScheduler,YARNClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等。
根據Client類傳遞的參數,Spark ApplicationMaster通過YARN ResourceManager/NodeManager的接口在集群中啟動若干個Container,用於運行CoarseGrainedExecutorBackend.CoarseGrainedExecutorBackend在啟動過程中會向CoarseGrainedSchedulerBackend注冊。
CoarseGrainedSchedulerBackend是一個基於Akka Actor實現的粗粒度的資源調度類,在整個Spark作業運行期間,CoarseGrainedSchedulerBackend主要負責如下功能:
- 監聽並持有注冊給它的Executor資源
- 根據現有的Executor資源,進行Executor的注冊、狀態更新、相應Scheduler的請求等任務的調度
模式的實現細節
Spark的各種運行模式雖然在啟動方式、運行為之、調度手段上有所不同,但它們所要完成的任務基本是一致的,就是在合適的位置安全可靠的根據用戶的配置和作業的需要管理和運行任務,在運行調度過程中需要考慮的問題主要為:
- 環境變量的傳遞
- JAR包和各種依賴文件的分發
- 任務的管理和序列化等
- 用戶參數配置
- 用戶及權限控制
環境變量的傳遞
Spark的運行參數有很大一部分是通過環境變量來設置的,例如Executor的內存設置、Library路徑等。在Cluster模式下就涉及到環境變量在各個Worker節點的傳遞問題。不同的運行模式有不同的傳遞方式。需要指出的是,在Local模式下,不存在環境變量的傳遞問題。
在這里主要說明一下再YARN相關模式下的參數傳遞。在YARN相關模式中,這些環境變量首先要通過YARN client設置到Spark AM的運行環境中,之后Spark AM在啟動Executor時再將環境變量設置到Executor中。
JAR包和依賴文件的分發
Spark程序的運行主要有兩類依賴:
- Spark運行庫及其依賴
- 應用程序自身的額外依賴
在Local模式下,不存在JAR包分發的問題。在這里主要介紹下YARN模式下的文件分發。
在YARN相關模式中,運行庫和程序運行所以來的其他文件首先通過HDFS客戶端API上傳到作業的.sparkStaing目錄下,然后將對應的文件和URL映射關系通知YARN,YARN的Node Manager在啟動Container的時候會從指定URL處下載相關文件作為運行環境的一部分。
對於需要進一步分發到Executor運行環境的文件,Spark YARN客戶端將需要分發的文件的相關屬性(例:URL、時間戳、尺寸等)打包成字符串,通過特定的環境變量(SPARK_YARN_CACHE_XXXX)傳遞給Spark AM,Spark AM在創建Executor的Container時還原特定環境變中的各個文件,並通過調用setLocalResources函數初始化Container。
任務管理和序列化
Spark任務的運行要解決的問題為:
- 以正確的順序運行任務,有效地管理和分派任務
- 將任務及運行所需相關數據有效地發送到遠端
- 收集運行結果
Spark任務通過DAGScheduler調用TaskScheduler.submitTasks進行派發,該接口將相關的一組任務一起提交並進行調度。
任務的運行結果在Executor端被序列化並發送回SchedulerBackend,由於受到Akka幀尺寸的限制,如果運行結果數據過大,結果會存儲到BlockManager中,這時候發送到SchedulerBackend的是對應數據的BlockID,TaskScheduler最終會調用TaskResultGetter在線程池中以異步的方式讀取結果,TaskSetManager再根據運行結果更新任務狀態(比如失敗重試等)並匯報給DAGScheduler等。