用戶端執行
以下是一個以spark on yarn Cluster模式提交命令,本系列文章所有分析都是基於spark on yarn Cluster模式,spark版本:2.4.0
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 512M \
--num-executors 1 \
/opt/cloudera/parcels/CDH/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.2.jar
spark-submit是一個shell腳本,其內容如下:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-submit提交的參數最終都會通過exec命令調用org.apache.spark.deploy.SparkSubmit傳入。
SparkSubmit類
main方法
SparkSubmit的main方法在其伴生類中,源碼簡略版如下:
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
...
}
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
override def doSubmit(args: Array[String]): Unit = {
...
super.doSubmit(args)
...
}
}
submit.doSubmit(args)
}
可以看到,在main方法中,通過調用SparkSubmit類的doSubmit方法實現任務提交的,doSubmit方法如下:
def doSubmit(args: Array[String]): Unit = {
...
val appArgs = parseArguments(args)
...
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
在doSubmit方法中,解析spark-submit命令提交的參數后,通過模式匹配實現不同命令走不同方法的,而我們上面的命令是submit,所以到這里執行的是submit方法。
submit方法
submit源碼簡略如下:
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 參數解析,拿到執行的childMainClass值
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
// 執行runMain方法
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
})
} catch {
...
}
} else {
// 執行runMain方法
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) {
try {
doRunMain()
} catch {
...
}
} else {
doRunMain()
}
}
可以看到submit方法中,主要是解析用戶提交的參數,然后執行doRunMain,通過doRunMain方法執行runMain方法,
這樣做的原因是:在執行runMain方法前做了一次判斷,判斷是不是StandaloneCluster模式,
如果是StandaloneCluster模式,在任務提交時,有兩種提交方式,一種是用org.apache.spark.deploy.Client包裝后通過傳統的rpc方式提交,
另一種是spark1.3以后引入的rest方式提交,而rest方式提交是spark1.3以后StandaloneCluster模式的默認提交方式,
而如果master不支持rest模式則會報錯,在這里做了一個判斷,在報錯后會通過傳統rpc的方式去調用。
參數解析prepareSubmitEnvironment方法中,有一段重要的代碼如下:
...
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
...
private[deploy] def prepareSubmitEnvironment(...)
: (Seq[String], Seq[String], SparkConf, String) = {
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
...
}
...
(childArgs, childClasspath, sparkConf, childMainClass)
}
這段代碼的作用是判斷當前模式是不是YarnCluster模式,是YarnCluster模式的話,則將“org.apache.spark.deploy.yarn.YarnClusterApplication”賦值給變量“childMainClass”;
而在prepareSubmitEnvironment方法外,submit中可以看到,childMainClass變量的值通過模式匹配拿到后傳給了runMain方法;
runMain方法
runMain方法源碼如下:
private def runMain(...): Unit = {
...
var mainClass: Class[_] = null
try {
// 通過反射拿到YarnClusterApplication類
mainClass = Utils.classForName(childMainClass)
} catch {
...
}
// 判斷拿到的mainClass類是不是SparkApplication的子類,是SparkApplication的子類就實例化mainClass,不是的話則通過JavaMainApplication包裝一下mainClass
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
...
new JavaMainApplication(mainClass)
}
...
try {
app.start(childArgs.toArray, sparkConf)
} catch {
...
}
}
runMain方法中,主要是通過反射去拿到參數childMainClass的class,而childMainClass正是我們前面的“org.apache.spark.deploy.yarn.YarnClusterApplication”類,而YarnClusterApplication是實現了SparkApplication特質的;
最后拿到了YarnClusterApplication的實例后調用YarnClusterApplication的start方法。
YarnClusterApplication類
start方法
YarnClusterApplication的start方法很簡單,源碼如下:
override def start(args: Array[String], conf: SparkConf): Unit = {
new Client(new ClientArguments(args), conf).run()
}
Client類
Client是spark任務提交在提交用戶的計算機上跑的最后一個類,Client完成了與hadoop yarn通信並提交application master的任務。
在Client類的構造函數中,實例化了yarn的client,以及獲取到了amMemory等諸多的參數;
Client的構造函數中重要參數初始化如下:
private val yarnClient = YarnClient.createYarnClient
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
} else {
sparkConf.get(AM_MEMORY).toInt
}
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
}
private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
run方法
run方法源碼如下:
def run(): Unit = {
this.appId = submitApplication()
...
}
run方法主要是通過調用Client類的submitApplication方法獲取到應用的appId,而主要的提交方法都在submitApplication方法中;
submitApplication方法
submitApplication方法的源碼如下:
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// yarn client啟動
yarnClient.init(hadoopConf)
yarnClient.start()
// 向yarn申請唯一的app id
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
...
// yarn提交的容器命令准備
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
...
// 容器提交
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
...
}
createContainerLaunchContext方法
createContainerLaunchContext的主要源碼如下:
private def createContainerLaunchContext(...) : ContainerLaunchContext = {
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
...
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
setupSecurityToken(amContainer)
amContainer
}
從上面的源碼可以看到,createContainerLaunchContext方法主要是把“/bin/java -server + javaOpts + amArgs”組成commands包裝成amContainer提交給了yarn;
而amArgs參數通過上面源碼可以發現在ClusterMode下是:org.apache.spark.deploy.yarn.ApplicationMaster,在ClientMode下是:org.apache.spark.deploy.yarn.ExecutorLauncher
本文僅討論ClusterMode模式,至此,ClusterMode模式下所有在提交任務的用戶的計算機上運行的代碼全部以及跑完,下面的所有的代碼全部運行在yarn的容器中。
方法執行時序圖
Driver
ApplicationMaster類
main方法
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
System.exit(master.run())
}
run方法
final def run(): Int = {
doAsUser {
runImpl()
}
exitCode
}
runImpl方法
runImpl方法關鍵代碼如下:
private def runImpl(): Unit = {
try {
val appAttemptId = client.getAttemptId()
var attemptID: Option[String] = None
...
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
} catch {
...
} finally {
...
}
}
ApplicationMaster類中,層層調用,main中實例化一個ApplicationMaster對象,然后在調用ApplicationMaster的run方法,run方法通過runImpl實現;
在runImpl方法中,判斷了一下是不是ClusterMode,是ClusterMode的話調用runDriver,Client模式的話調用runExecutorLauncher;
本文討論ClusterMode模式。
runDriver方法
private def runDriver(): Unit = {
// 執行用戶提交的main方法
userClassThread = startUserApplication()
...
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
...
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
// 獲取容器,並運行Executor
createAllocator(driverRef, userConf)
} else {
...
}
resumeDriver()
userClassThread.join()
} catch {
...
} finally {
resumeDriver()
}
}
在runDriver方法中,通過調用startUserApplication方法來創建一個線程執行用戶程序的main方法,同時會通過createAllocator方法向yarn申請資源並運行Executor
startUserApplication方法
private def startUserApplication(): Thread = {
var userArgs = args.userArgs
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run() {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
...
} finally {
...
}
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}
在startUserApplication方法中,首先會去解析參數,拿到我們在提交命令中的“--class”指定的類,然后判斷該類是否有參數為“Array[String]”的靜態“main”方法,即Scala/Java程序的入口函數,隨后會在一個名為“Driver”的線程中通過反射執行用戶程序的main方法,至此,我們通過源碼分析的方式知道了我們常說的“Driver”實際上是一個用來執行用戶程序名為“Driver”的線程。
執行用戶程序的main方法時會初始化SparkContext,初始化SparkContext時在其構造函數中,會創建TaskScheduler,然后調用TaskScheduler的postStartHook()方法將SparkContext自身又返回給到ApplicationMaster中,方便后續的調用,源碼如下:
// SparkContext init
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
......
_taskScheduler.postStartHook()
// YarnClusterScheduler
override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
// ApplicationMaster
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
}
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}
createAllocator方法
createAllocator方法是Driver用於向yarn通信申請資源並在申請的容器中運行Executor,接下來我們看看他是如何實現的,由於createAllocator方法中的調用棧太深,所以精簡下createAllocator方法的調用,createAllocator方法的簡略源碼如下:
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// 獲取YarnAllocator,通過YarnAllocator與yarn通訊
allocator = client.createAllocator(
yarnConf,
_sparkConf,
driverUrl,
driverRef,
securityMgr,
localResources)
credentialRenewer.foreach(_.setDriverRef(driverRef))
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
// 資源申請
allocator.allocateResources()
...
}
def allocateResources(): Unit = synchronized {
updateResourceRequests()
val progressIndicator = 0.1f
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (allocatedContainers.size > 0) {
...
handleAllocatedContainers(allocatedContainers.asScala)
}
...
}
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
// 省略一系列容器校驗
...
runAllocatedContainers(containersToUse)
}
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
for (container <- containersToUse) {
...
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
// 通過線程執行提交Executor的操作
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run() // 實際提交Executor的方法
updateInternalState()
} catch {
...
}
}
})
} else {
updateInternalState()
}
} else {
...
}
}
}
從上面一系列方法可以看到,spark將Executor任務提交分裝成了4個步驟,分別是獲取yarnClient、與yarn通訊拿到可申請的容器數、容器校驗、容器提交;
而運行多少Executor是如何確定的呢?在YarnAllocator類中有個targetNumExecutors值,在初始化YarnAllocator類時就確定了要創建多少個Executor,在runAllocatedContainers方法時會確定running的Executor是否小於targetNumExecutors數,小於的話就會去創建Executor直到等於targetNumExecutors數,targetNumExecutors值確定源碼如下:
// YarnAllocator
@volatile private var targetNumExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
// SchedulerBackendUtils
val DEFAULT_NUMBER_EXECUTORS = 2
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")
initialNumExecutors
} else {
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}
ExecutorRunnable類
run方法
def run(): Unit = {
...
startContainer()
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
...
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
...
try {
nmClient.startContainer(container.get, ctx)
} catch {
...
}
}
在ExecutorRunnable的run方法中我們又看到了熟悉命令准備操作,實際上向yarn提交任務也都是向yarn提交一系列命令實現的;
prepareCommand方法
private def prepareCommand(): List[String] = {
...
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
commands.map(s => if (s == null) "null" else s).toList
}
可以看到,Executor任務實際執行的是“org.apache.spark.executor.CoarseGrainedExecutorBackend”這個類;
至此任務提交所有的Driver的工作已經完成。
方法執行時序圖
Executor
在上面的Driver分析中,我們發現Driver創建Executor任務實際上是向yarn提交了一個執行“org.apache.spark.executor.CoarseGrainedExecutorBackend”類的命令,
CoarseGrainedExecutorBackend類
Executor啟動的一系列方法
def main(args: Array[String]) {
...
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
private def run(...) {
...
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}
在Executor啟動的一系列方法中,我們會發現實際上我們常說的Executor指的是一個名為“Executor”的Endpoint,Endpoint是屬於spark通訊中的actor模型中的概念,我們將在下一篇文章中專門講這個,此時我們只需要知道,在注冊完這個Endpoint后,被注冊的Endpoint的onStart方法會被調用;
CoarseGrainedExecutorBackend類
onStart方法
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
在onStart方法中,CoarseGrainedExecutorBackend向CoarseGrainedSchedulerBackend發送了一條名為“RegisterExecutor”的消息用於注冊,CoarseGrainedSchedulerBackend收到消息后,回復一條“RegisteredExecutor”消息,CoarseGrainedExecutorBackend接收到后開始初始化Executor;