【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取
1、 Spark運行架構
1.1 術語定義
lApplication:Spark Application的概念和Hadoop MapReduce中的類似,指的是用戶編寫的Spark應用程序,包含了一個Driver 功能的代碼和分布在集群中多個節點上運行的Executor代碼;
lDriver:Spark中的Driver即運行上述Application的main()函數並且創建SparkContext,其中創建SparkContext的目的是為了准備Spark應用程序的運行環境。在Spark中由SparkContext負責和ClusterManager通信,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢后,Driver負責將SparkContext關閉。通常用SparkContext代表Drive;
lExecutor:Application運行在Worker 節點上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上,每個Application都有各自獨立的一批Executor。在Spark on Yarn模式下,其進程名稱為CoarseGrainedExecutorBackend,類似於Hadoop MapReduce中的YarnChild。一個CoarseGrainedExecutorBackend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閑線程運行Task。每個CoarseGrainedExecutorBackend能並行運行Task的數量就取決於分配給它的CPU的個數了;
lCluster Manager:指的是在集群上獲取資源的外部服務,目前有:
Ø Standalone:Spark原生的資源管理,由Master負責資源的分配;
Ø Hadoop Yarn:由YARN中的ResourceManager負責資源的分配;
lWorker:集群中任何可以運行Application代碼的節點,類似於YARN中的NodeManager節點。在Standalone模式中指的就是通過Slave文件配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點;
l作業(Job):包含多個Task組成的並行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation;
l階段(Stage):每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;
l任務(Task): 被送到某個Executor上的工作任務;
1.2 Spark運行基本流程
Spark運行基本流程參見下面示意圖
1. 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
2. 資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;
3. SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
4. Task在Executor上運行,運行完畢釋放所有資源。
Spark運行架構特點:
l每個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優勢的,無論是從調度角度看(每個Driver調度它自己的任務),還是從運行角度看(來自不同Application的Task運行在不同的JVM中)。當然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。
lSpark與資源管理器無關,只要能夠獲取executor進程,並能保持相互通信就可以了。
l提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。
lTask采用了數據本地性和推測執行的優化機制。
1.2.1 DAGScheduler
DAGScheduler把一個Spark作業轉換成Stage的DAG(Directed Acyclic Graph有向無環圖),根據RDD和Stage之間的關系找出開銷最小的調度方法,然后把Stage以TaskSet的形式提交給TaskScheduler,下圖展示了DAGScheduler的作用:
1.2.2 TaskScheduler
DAGScheduler決定了運行Task的理想位置,並把這些信息傳遞給下層的TaskScheduler。此外,DAGScheduler還處理由於Shuffle數據丟失導致的失敗,這有可能需要重新提交運行之前的Stage(非Shuffle數據丟失導致的Task失敗由TaskScheduler處理)。
TaskScheduler維護所有TaskSet,當Executor向Driver發送心跳時,TaskScheduler會根據其資源剩余情況分配相應的Task。另外TaskScheduler還維護着所有Task的運行狀態,重試失敗的Task。下圖展示了TaskScheduler的作用:
在不同運行模式中任務調度器具體為:
l Spark on Standalone模式為TaskScheduler;
l YARN-Client模式為YarnClientClusterScheduler
l YARN-Cluster模式為YarnClusterScheduler
1.3 RDD運行原理
那么 RDD在Spark架構中是如何運行的呢?總高層次來看,主要分為三步:
1.創建 RDD 對象
2.DAGScheduler模塊介入運算,計算RDD之間的依賴關系。RDD之間的依賴關系就形成了DAG
3.每一個JOB被分為多個Stage,划分Stage的一個主要依據是當前計算因子的輸入是否是確定的,如果是則將其分在同一個Stage,避免多個Stage之間的消息傳遞開銷。
以下面一個按 A-Z 首字母分類,查找相同首字母下不同姓名總個數的例子來看一下 RDD 是如何運行起來的。
步驟 1 :創建 RDD 上面的例子除去最后一個 collect 是個動作,不會創建 RDD 之外,前面四個轉換都會創建出新的 RDD 。因此第一步就是創建好所有 RDD( 內部的五項信息 ) 。
步驟 2 :創建執行計划 Spark 會盡可能地管道化,並基於是否要重新組織數據來划分 階段 (stage) ,例如本例中的 groupBy() 轉換就會將整個執行計划划分成兩階段執行。最終會產生一個 DAG(directed acyclic graph ,有向無環圖 ) 作為邏輯執行計划。
步驟 3 :調度任務 將各階段划分成不同的 任務 (task) ,每個任務都是數據和計算的合體。在進行下一階段前,當前階段的所有任務都要執行完成。因為下一階段的第一個轉換一定是重新組織數據的,所以必須等當前階段所有結果數據都計算出來了才能繼續。
假設本例中的 hdfs://names 下有四個文件塊,那么 HadoopRDD 中 partitions 就會有四個分區對應這四個塊數據,同時 preferedLocations 會指明這四個塊的最佳位置。現在,就可以創建出四個任務,並調度到合適的集群結點上。
2、Spark在不同集群中的運行架構
Spark注重建立良好的生態系統,它不僅支持多種外部文件存儲系統,提供了多種多樣的集群運行模式。部署在單台機器上時,既可以用本地(Local)模式運行,也可以使用偽分布式模式來運行;當以分布式集群部署的時候,可以根據自己集群的實際情況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各種運行模式雖然在啟動方式、運行位置、調度策略上各有不同,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據用戶的配置和Job的需要運行和管理Task。
2.1 Spark on Standalone運行過程
Standalone模式是Spark實現的資源調度框架,其主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以運行在Master節點上中,也可以運行在本地Client端。當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式運行Spark任務時,Driver是運行在本地Client端上的。
其運行過程如下:
1.SparkContext連接到Master,向Master注冊並申請資源(CPU Core 和Memory);
2.Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;
3.StandaloneExecutorBackend向SparkContext注冊;
4.SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生),然后以Stage(或者稱為TaskSet)提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
5.StandaloneExecutorBackend會建立Executor線程池,開始執行Task,並向SparkContext報告,直至Task完成。
6.所有Task完成后,SparkContext向Master注銷,釋放資源。
2.2 Spark on YARN運行過程
YARN是一種統一資源管理機制,在其上面可以運行多套計算框架。目前的大數據技術世界,大多數公司除了使用Spark來進行數據計算,由於歷史原因或者單方面業務處理的性能考慮而使用着其他的計算框架,比如MapReduce、Storm等計算框架。Spark基於此種情況開發了Spark on YARN的運行模式,由於借助了YARN良好的彈性資源管理機制,不僅部署Application更加方便,而且用戶在YARN集群中運行的服務和Application的資源也完全隔離,更具實踐應用價值的是YARN可以通過隊列的方式,管理同時運行在集群中的多個服務。
Spark on YARN模式根據Driver在集群中的位置分為兩種模式:一種是YARN-Client模式,另一種是YARN-Cluster(或稱為YARN-Standalone模式)。
2.2.1 YARN框架流程
任何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節之前,有必要先分析一下YARN框架的一些基本原理。
Yarn框架的基本運行流程圖為:
其中,ResourceManager負責將集群的資源分配給各個應用使用,而資源分配和調度的基本單位是Container,其中封裝了機器資源,如內存、CPU、磁盤和網絡等,每個任務會被分配一個Container,該任務只能在該Container中執行,並使用該Container封裝的資源。NodeManager是一個個的計算節點,主要負責啟動Application所需的Container,監控資源(內存、CPU、磁盤和網絡等)的使用情況並將之匯報給ResourceManager。ResourceManager與NodeManagers共同組成整個數據計算框架,ApplicationMaster與具體的Application相關,主要負責同ResourceManager協商以獲取合適的Container,並跟蹤這些Container的狀態和監控其進度。
2.2.2 YARN-Client
Yarn-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問。
YARN-client的工作流程分為以下幾個步驟:
1.Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;
2.ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;
3.Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);
4.一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task;
5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
6.應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己。
2.2.3 YARN-Cluster
在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行過程,直到運行完成。
YARN-cluster的工作流程分為以下幾個步驟:
1. Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;
2. ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
3. ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束;
4. 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;
5. ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
6. 應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己。
2.2.4 YARN-Client 與 YARN-Cluster 區別
理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別。
l YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業;
l YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開。
3、Spark在不同集群中的運行演示
在以下運行演示過程中需要啟動Hadoop和Spark集群,其中Hadoop需要啟動HDFS和YARN,啟動過程可以參見第三節《Spark編程模型(上)--概念及Shell試驗》。
3.1 Standalone運行過程演示
在Spark集群的節點中,40%的數據用於計算,60%的內存用於保存結果,為了能夠直觀感受數據在內存和非內存速度的區別,在該演示中將使用大小為1G的Sogou3.txt數據文件(參見第三節《Spark編程模型(上)--概念及Shell試驗》的3.2測試數據文件上傳),通過對比得到差距。
3.1.1 查看測試文件存放位置
使用HDFS命令觀察Sogou3.txt數據存放節點的位置
$cd /app/hadoop/hadoop-2.2.0/bin
$hdfs fsck /sogou/SogouQ3.txt -files -blocks -locations
通過可以看到該文件被分隔為9個塊放在集群中
3.1.2啟動Spark-Shell
通過如下命令啟動Spark-Shell,在演示當中每個Executor分配1G內存
$cd /app/hadoop/spark-1.1.0/bin
$./spark-shell --master spark://hadoop1:7077 --executor-memory 1g
通過Spark的監控界面查看Executors的情況,可以觀察到有1個Driver 和3個Executor,其中hadoop2和hadoop3啟動一個Executor,而hadoop1啟動一個Executor和Driver。在該模式下Driver中運行SparkContect,也就是DAGSheduler和TaskSheduler等進程是運行在節點上,進行Stage和Task的分配和管理。
3.1.3運行過程及結果分析
第一步 讀取文件后計算數據集條數,並計算過程中使用cache()方法對數據集進行緩存
val sogou=sc.textFile("hdfs://hadoop1:9000/sogou/SogouQ3.txt")
sogou.cache()
sogou.count()
通過頁面監控可以看到該作業分為8個任務,其中一個任務的數據來源於兩個數據分片,其他的任務各對應一個數據分片,即顯示7個任務獲取數據的類型為(NODE_LOCAL),1個任務獲取數據的類型為任何位置(ANY)。
在存儲監控界面中,我們可以看到緩存份數為3,大小為907.1M,緩存率為38%
運行結果得到數據集的數量為1000萬筆數據,總共花費了352.17秒
第二步 再次讀取文件后計算數據集條數,此次計算使用緩存的數據,對比前后
sogou.count()
通過頁面監控可以看到該作業還是分為8個任務,其中3個任務數據來自內存(PROCESS_LOCAL),3個任務數據來自本機(NODE_LOCAL),其他2個任務數據來自任何位置(ANY)。任務所耗費的時間多少排序為:ANY> NODE_LOCAL> PROCESS_LOCAL,對比看出使用內存的數據比使用本機或任何位置的速度至少會快2個數量級。
整個作業的運行速度為34.14秒,比沒有緩存提高了一個數量級。由於剛才例子中數據只是部分緩存(緩存率38%),如果完全緩存速度能夠得到進一步提升,從這體驗到Spark非常耗內存,不過也夠快、夠鋒利!
3.2 YARN-Client運行過程演示
3.2.1 啟動Spark-Shell
通過如下命令啟動Spark-Shell,在演示當中分配3個Executor、每個Executor為1G內存
$cd /app/hadoop/spark-1.1.0/bin
$./spark-shell --master YARN-client --num-executors 3 --executor-memory 1g
第一步 把相關的運行JAR包上傳到HDFS中
通過HDFS查看界面可以看到在 /user/hadoop/.sparkStaging/應用編號,查看到這些文件:
第二步 啟動Application Master,注冊Executor
應用程序向ResourceManager申請啟動Application Master,在啟動完成后會分配Cotainer並把這些信息反饋給SparkContext,SparkContext和相關的NM通訊,在獲得的Container上啟動Executor,從下圖可以看到在hadoop1、hadoop2和hadoop3分別啟動了Executor
第三步 查看啟動結果
YARN-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端所以可以通過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN通過http:// hadoop1:8088訪問。
3.2.2 運行過程及結果分析
第一步 讀取文件后計算數據集條數,並計算過程中使用cache()方法對數據集進行緩存
val sogou=sc.textFile("hdfs://hadoop1:9000/sogou/SogouQ3.txt")
sogou.cache()
sogou.count()
通過頁面監控可以看到該作業分為8個任務,其中一個任務的數據來源於兩個數據分片,其他的任務各對應一個數據分片,即顯示7個任務獲取數據的類型為(NODE_LOCAL),1個任務獲取數據的類型為任何位置(RACK_LOCAL)。
通過運行日志可以觀察到在所有任務結束的時候,由 YARNClientScheduler通知YARN集群任務運行完畢,回收資源,最終關閉SparkContext,整個過程耗費108.6秒。
第二步 查看數據緩存情況
通過監控界面可以看到,和Standalone一樣38%的數據已經緩存在內存中
第三步 再次讀取文件后計算數據集條數,此次計算使用緩存的數據,對比前后
sogou.count()
通過頁面監控可以看到該作業還是分為8個任務,其中3個任務數據來自內存(PROCESS_LOCAL),4個任務數據來自本機(NODE_LOCAL),1個任務數據來自機架(RACK_LOCAL)。對比在內存中的運行速度最快,速度比在本機要快至少1個數量級。
YARNClientClusterScheduler替代了Standalone模式下得TaskScheduler進行任務管理,在任務結束后通知YARN集群進行資源的回收,最后關閉SparkContect。部分緩存數據運行過程耗費了29.77秒,比沒有緩存速度提升不少。
3.3 YARN-Cluster運行過程演示
3.3.1 運行程序
通過如下命令啟動Spark-Shell,在演示當中分配3個Executor、每個Executor為512M內存
$cd /app/hadoop/spark-1.1.0
$./bin/spark-submit --master YARN-cluster --class class3.SogouResult --executor-memory 512m LearnSpark.jar hdfs://hadoop1:9000/sogou/SogouQ3.txt hdfs://hadoop1:9000/class3/output2
第一步 把相關的資源上傳到HDFS中,相對於YARN-Client多了LearnSpark.jar文件
這些文件可以在HDFS中找到,具體路徑為 http://hadoop1:9000/user/hadoop/.sparkStaging/應用編號 :
第二步 YARN集群接管運行
首先YARN集群中由ResourceManager分配Container啟動SparkContext,並分配運行節點,由SparkConext和NM進行通訊,獲取Container啟動Executor,然后由SparkContext的YarnClusterScheduler進行任務的分發和監控,最終在任務執行完畢時由YarnClusterScheduler通知ResourceManager進行資源的回收。
3.3.2 運行結果
在YARN-Cluster模式中命令界面只負責應用的提交,SparkContext和作業運行均在YARN集群中,可以從http:// hadoop1:8088查看到具體運行過程,運行結果輸出到HDFS中,如下圖所示:
4、問題解決
4.1 YARN-Client啟動報錯
在進行Hadoop2.X 64bit編譯安裝中由於使用到64位虛擬機,安裝過程中出現下圖錯誤:
[hadoop@hadoop1 spark-1.1.0]$ bin/spark-shell --master YARN-client --executor-memory 1g --num-executors 3
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Exception in thread "main" java.lang.Exception: When running with master 'YARN-client' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.checkRequiredArguments(SparkSubmitArguments.scala:182)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:62)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:70)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
參考資料:
(1)《Spark1.0.0 運行架構基本概念》 http://blog.csdn.net/book_mmicky/article/details/25714419
(2)《Spark架構與作業執行流程簡介》 http://www.cnblogs.com/shenh062326/p/3658543.html
(3)《Spark1.0.0 運行架構基本概念》 http://shiyanjun.cn/archives/744.html