@
前言
這個伴生類主要功能是創建/配置ApplicationMaster的應用程序,,准備相關的環境與資源。
Github源碼地址:https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
主要方法
submitApplication
將運行ApplicationMaster的應用程序提交到ResourceManager。
- 穩定的Yarn API提供了一種方便的方法(YarnClient#createApplication)
- 創建應用程序並設置應用程序提交上下文
主要邏輯有:
- 從RM獲取新的應用程序
- 設置應用程序的staging目錄:如果配置STAGING_DIR,則使用其值作為staging目錄。否則使用用戶的home目錄。
- 驗證群集是否有足夠的資源用於AM
- 設置適當的上下文以啟動我們的AM
- 最后,提交並監控應用程序
createApplicationSubmissionContext
設置提交ApplicationMaster的上下文。
主要邏輯如下:
- 如果是cluster模式,則獲取所有與
spark.yarn.driver.resource
相關的配置。否則使用spark.yarn.am.resource
相關的配置 - 獲取YarnClientApplication的默認上下文,並在此基礎上設置
ApplicationName
QUEUE_NAME
containerContext
ApplicationType
- 所有
APPLICATION_TAGS
MAX_APP_ATTEMPTS
- 其他配置
- 設置資源
capability.setMemory(amMemory + amMemoryOverhead)
capability.setVirtualCores(amCores)
- 其他配置
setupSecurityToken
設置安全令牌以啟動我們的ApplicationMaster容器。
在客戶端模式下,調度程序已獲取一組憑據,因此將它們復制並發送到AM。 在群集模式下,獲取新憑據,然后將其與當前用戶已有的任何憑據一起發送到AM。
getApplicationReport
從ResourceManager獲取我們提交的應用程序的應用程序報告。
getClientToken
返回此客戶端使用的安全令牌以與ApplicationMaster通信。 如果未啟用安全性,則報告返回的標記為空。
verifyClusterResources
檢查分配的資源是否合理,如果我們請求每個容器的資源多於群集中可用的資源,則會失敗。
主要邏輯:
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
//compare if executorMem > maxMem
//...
val amMem = amMemory + amMemoryOverhead
//compare if amMem > maxMem
//...
copyFileToRemote
如果需要,將給定資源文件復制到遠程文件系統(例如HDFS)。僅當源和目標文件系統不同或源方案為“file”時,才會復制該文件。 用於准備啟動ApplicationMaster容器的資源,例如用戶其他的其他輔助文件。
prepareLocalResources
如果需要,將任何資源上載到分布式緩存。 如果要在本地使用資源,請為下游代碼設置適當的配置以正確處理它。 這用於為ApplicationMaster設置容器啟動上下文。
遠程目錄地址
stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
distribute
將文件分發到群集。
如果文件的路徑是“local:”URI,它實際上不是分發的,而其他文件將被復制到HDFS(如果還沒有)並添加到應用程序的分布式緩存中。
主要邏輯:
-
Keytab文件
- 如果我們傳入keytab,請確保將keytab表復制到HDFS上的登台目錄,並設置相關的環境變量,以便AM可以再次登錄。
-
配置文件以及jar包
有兩個設置可控制要添加到緩存的文件:- 如果定義了Spark歸檔文件,請使用歸檔文件。 該存檔應在其根目錄中包含jar文件。
- 如果提供了jar列表,則過濾非本地jar,解析globs,並將找到的文件添加到緩存中。
請注意,存檔不能是“本地”URI。 如果未找到上述任何設置,則上傳$ SPARK_HOME/jars中找到的所有文件。
-
其他資源
對通過ClientArguments傳遞的任何其他資源執行相同操作。
每個資源類別由3元組表示:
(1)此類別中逗號分隔的資源列表,
(2)資源類型,
(3)是否將這些資源添加到類路徑中 -
python文件
需要特別處理python文件列表。 所有非歸檔文件都需要放在將添加到PYTHONPATH的子目錄中。 -
更新配置里面的所有分布式文件的列表更新配置(conf存檔除外)。 conf存檔將由AM以不同方式處理。
-
手動將conf存檔上傳到HDFS,並在配置中記錄其位置。這將允許AM知道conf存檔在HDFS中的位置,以便可以將其分發到容器中。
-
手動將配置存檔添加到緩存管理器,以便在設置正確文件的情況下啟動AM。
createConfArchive
使用配置文件創建存檔以進行分發。
這些將由AM和執行者使用。 這些文件被壓縮並作為存檔添加到作業中,因此YARN會在分發給AM和執行程序時進行解壓。 然后將此目錄添加到AM和執行程序進程的類路徑中,以確保每個人都使用相同的默認配置。
這遵循啟動腳本設置的優先順序,其中HADOOP_CONF_DIR在YARN_CONF_DIR之前的類路徑中顯示。
存檔還包含一些Spark配置。 即,它將SparkConf的內容保存在由AM進程加載的文件中。
setupLaunchEnv
設置啟動ApplicationMaster容器的環境。如DRIVER_CLASS_PATH
, PYTHONPATH
,PYSPARK_DRIVER_PYTHON
, PYSPARK_PYTHON
, PYTHONHASHSEED
等
createContainerLaunchContext
設置ContainerLaunchContext以啟動我們的ApplicationMaster容器。 這將設置啟動環境,java選項以及啟動AM的命令。
monitorApplication
報告應用程序的狀態,直到它已成功或由於某些故障退出,然后返回一對紗線應用狀態(FINISHED, FAILED, KILLED, or RUNNING)和最終應用狀態(FINISHED, FAILED, KILLED, or RUNNING)。
run
將應用程序提交到ResourceManager。
如果將spark.yarn.submit.waitAppCompletion設置為true,它將保持活動狀態,報告應用程序的狀態,直到應用程序因任何原因退出。 否則,客戶端進程將在提交后退出。
如果應用程序以失敗,終止或未定義狀態完成,則拋出適當的SparkException。