一、Spark on YARN cluster 模式作業運行全過程分析
下面是分析Spark on YARN的Cluster模式,從用戶提交作業到作業運行結束整個運行期間的過程分析。
客戶端進行操作
1、根據yarnConf來初始化yarnClient,並啟動yarnClient
2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;
3、設置資源、環境變量:其中包括了設置Application的Staging目錄、准備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、創建Container啟動的Context等;
4、設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該作業的類型為Spark;
5、申請Memory,並最終通過yarnClient.submitApplication向ResourceManager提交該Application。
當作業提交到YARN上之后,客戶端就沒事了,甚至在終端關掉那個進程也沒事,因為整個作業運行在YARN集群上進行,運行的結果將會保存到HDFS或者日志中。
提交到YARN集群,YARN操作
1、運行ApplicationMaster的run方法;
2、設置好相關的環境變量。
3、創建amClient,並啟動;
4、在Spark UI啟動之前設置Spark UI的AmIpFilter;
5、在startUserClass函數專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認為10),如果等待了的次數超過了配置的,程序將會退出;否則用SparkContext初始化yarnAllocator;
其實在5步驟中啟動Application的過程中會初始化SparkContext,在初始化SparkContext的時候將會創建YarnClusterScheduler,在SparkContext初始化完成的時候,會調用YarnClusterScheduler類中的postStartHook方法,而該方法會通知ApplicationMaster已經初始化好了SparkContext
7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster
8、分配並啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。如果在啟動Executeors的過程中失敗的次數達到了maxNumExecutorFailures的次數,maxNumExecutorFailures的計算規則如下:
// Default to numExecutors * 2, with minimum of 3
private
val
maxNumExecutorFailures
=
sparkConf.getInt(
"spark.yarn.max.executor.failures"
,
sparkConf.getInt(
"spark.yarn.max.worker.failures"
, math.max(args.numExecutors *
2
,
3
)))
|
那么這個Application將失敗,將Application Status標明為FAILED,並將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。
9、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
二、Spark on YARN client 模式作業運行全過程分析
在前篇文章中我介紹了Spark on YARN集群模式(yarn-cluster)作業從提交到運行整個過程的情況(詳情見《Spark on YARN集群模式作業運行全過程分析》),我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是在yarn上面運行,但是其中的運行方式很不一樣,今天我就來談談Spark on YARN yarn-client模式作業從提交到運行的過程剖析。
和yarn-cluster模式一樣,整個程序也是通過spark-submit腳本提交的。但是yarn-client作業程序的運行不需要通過Client類來封裝啟動,而是直接通過反射機制調用作業的main函數。下面就來分析:
1、通過SparkSubmit類的launch的函數直接調用作業的main函數(通過反射機制實現),如果是集群模式就會調用Client的main函數。
2、而應用程序的main函數一定都有個SparkContent,並對其進行初始化;
3、在SparkContent初始化中將會依次做如下的事情:設置相關的配置、注冊MapOutputTracker、BlockManagerMaster、BlockManager,創建taskScheduler和dagScheduler;其中比較重要的是創建taskScheduler和dagScheduler。在創建taskScheduler的時候會根據我們傳進來的master來選擇Scheduler和SchedulerBackend。由於我們選擇的是yarn-client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend,並將YarnClientSchedulerBackend的實例初始化YarnClientClusterScheduler,上面兩個實例的獲取都是通過反射機制實現的,YarnClientSchedulerBackend類是CoarseGrainedSchedulerBackend類的子類,YarnClientClusterScheduler是TaskSchedulerImpl的子類,僅僅重寫了TaskSchedulerImpl中的getRackForHost方法。
4、初始化完taskScheduler后,將創建dagScheduler,然后通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會調用SchedulerBackend的start方法。在SchedulerBackend啟動的過程中將會初始化一些參數,封裝在ClientArguments中,並將封裝好的ClientArguments傳進Client類中,並client.runApp()方法獲取Application ID。
5、client.runApp里面的做是和前面客戶端進行操作那節類似,不同的是在里面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。
6、在ExecutorLauncher里面會初始化並啟動amClient,然后向ApplicationMaster注冊該Application。注冊完之后將會等待driver的啟動,當driver啟動完之后,會創建一個MonitorActor對象用於和CoarseGrainedSchedulerBackend進行通信(只有事件AddWebUIFilter他們之間才通信,Task的運行狀況不是通過它和CoarseGrainedSchedulerBackend通信的)。然后就是設置addAmIpFilter,當作業完成的時候,ExecutorLauncher將通過amClient設置Application的狀態為FinalApplicationStatus.SUCCEEDED。
7、分配Executors,這里面的分配邏輯和yarn-cluster里面類似,就不再說了。
8、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
9、在作業運行的時候,YarnClientSchedulerBackend會每隔1秒通過client獲取到作業的運行狀況,並打印出相應的運行信息,當Application的狀態是FINISHED、FAILED和KILLED中的一種,那么程序將退出等待。
10、最后有個線程會再次確認Application的狀態,當Application的狀態是FINISHED、FAILED和KILLED中的一種,程序就運行完成,並停止SparkContext。整個過程就結束了。