spark可以運行在standalone,yarn,mesos等多種模式下,當前我們用的最普遍的是yarn模式,在yarn模式下又分為client和cluster。本文接下來將分析yarn cluster下任務提交的過程。也就是回答,在yarn cluster模式下,任務是怎么提交的問題。在yarn cluster模式下,spark任務提交涉及四個角色(client, application, driver以及executor)之間的交互。接下來,將詳細分析這四個角色在任務提交過程中都做了那些事。
1,client流程
Step 1:我們知道:在我們寫完任務准備向集群提交spark任務時,一般是調用bin下的spark-submit腳本進行任務的提交。在完成一些環境變量和參數的准備后,最終調用spark代碼庫中的SparkSubmit類。
Step 2:在SparkSubmit的main函數中,通過submit,runMain然后通過YarnClusterApplication啟動org.apache.spark.deploy.yarn.Client.
Step 3:在Client中,通過main,run,然后在submitApplication中,利用yarnClient向ResourceManager提交新應用以啟動ApplicationMaster,其中在yarn cluster模式下啟動ApplicationMaster的類是
org.apache.spark.deploy.yarn.ApplicationMaster。 至此,client完成所有的工作。
2,ApplicationMaster流程
Step1:yarn分配container運行ApplicationMaster。通過main,run,runDriver,調用startUserApplication,新建線程,運行在spark-submit --class參數指定的應用類用戶代碼。
Step2:ApplicationMaster等待driver完成sparkContext的初始化后,獲取driver的一個ref。調用registerAM函數,利用YarnRMClient向yarn申請資源運行executor。一旦獲取到container資源,在yarnAllocator中,
launcherPool線程池會將container,driver等相關信息封裝成ExecutorRunnable對象,通過ExecutorRunnable啟動新的container以運行executor。在次過程中,指定啟動executor的類是
org.apache.spark.executor.CoarseGrainedExecutorBackend。
3,Driver流程
在ApplicationMaster的步驟1中,會新建線程運行用戶端代碼,並且在完成sparkcontext的初始化,其中包括dagScheduler完成job stage的切分,每個stage的任務轉成化一系列的task,封裝成taskset。交由taskScheduler去調用。由於這個過程比較復雜,而且非常的重要,准備稍后會單獨對這個部分進行詳細講解。
4,Executor流程
在ApplicationMaster的步驟2中提到,新的container將會運行executor。在executor啟動以后,會向driver發送RegisterExecutor消息告訴driver注冊當前運行的executor。在driver端的CoarseGrainedSchedulerBackend中,可以看到對該消息的處理過程。在driver段感知到該消息后,driver將向executor發送RegisteredExecutor消息。executor和driver更多的細節,在稍后spark任務計算解析中,會將進行更詳細的描述。
至此,client在完成使命后退出。其他三個部分也已啟動起來。接下來將以spark example中的sparkPi例子來看看平常我們寫的spark任務是怎么計算的。
首先把sparkPi中的代碼貼出來:
問題1:何時開始運行用戶的中main函數?
在前文中 ApplicationMaster流程中第一步提到:yarn分配container運行ApplicationMaster。通過main,run,runDriver,調用startUserApplication,新建線程,運行在spark-submit --class參數指定的應用類用戶代碼。也就是說,在這一步將運行用戶寫入的代碼。
問題2:上述代碼具體都做了些啥?
1,在SparkSession...getOrCreate函數中主要做的事情是完成sparkContext的初始化,這其中主要包括DAGScheduler,TaskSchedule的初始化等。(注:在調試過程中使用的standalone模式,並且加入extraJavaOption主要是為了便於調試executor的代碼)。
2,上述代碼的核心是sparkContext.parallelize(....).map(....).reduce。在parallelize函數中將新建ParallelCollectionRDD。在map中將新建MapPartitionsRDD。最后reduce是一個action(一個action對應一個Job),觸發實際的計算。
3,在reduce函數中,通過調用sc.runJob->dagScheduler.runJob→submitJob提交JobSubmitted事件到DAGScheduler自己。然后調用handleJobSubmitted來處理Job提交。在handleJobSubmitted函數中,將創建ResultStage,然后根據shuffle將Job划分為不同的stage。在本例中,由於沒有shuffle,將只有一個stage。最終通過submitMissingTasks將stage中的task封裝成taskset,交由taskschuduler(taskScheduler.submitTasks)進行task級別的調度。
4,在TaskSchdulerImpl的submitTasks中,可以看到taskset會被進一步封裝成TasksetManager,加入到schedulableBuilder中(默認使用FIFO隊列進行調度)。然后driver向自己發送ReviveOffers消息。driver接收到該信息后,如果發現有空閑的executor,將該Task序列后,發送LaunchTask消息給executor。讓executor去執行。
5,executor處理LaunchTask消息的代碼如下:
launchTask會將task信息TaskRunner,啟用線程池運行。
6,在TaskRunner的run方法中,將運行
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
然后調用runTask進行運行,有兩種類型的Task(ShuffleMapTask,ResultTask),本例中將運行ResultTask中的runTask方法,然后在該方法中,調用用戶傳入的函數代碼。
7,在TaskRunner的run方法中,在完成計算后,將調用execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult),該函數將向driver發送信息,告訴改Task已完成。
8,在driver端,如果任務正常結束,將調用taskResultGetter.enqueueSuccessfulTask。在該函數中,接着調用handleSuccessfulTask,最終DAGScheduler將向自己發送CompletionEvent事件,然后使用handleTaskCompletion來處理。如果任務正常結束,將通過
job.listener.taskSucceeded通知JobWaiter,JobWaiter完成任務結果的合並。在所有的JobWaiter中的Task都完成后,任務退出。