摘要:
1.基本術語
2.運行架構
2.1基本架構
2.2運行流程
2.3相關的UML類圖
2.4調度模塊:
2.4.1作業調度簡介
2.4.2任務調度簡介
3.運行模式
3.1 standalone模式
4.RDD實戰
總結:
- 基本術語:
- Application:在Spark 上建立的用戶程序,一個程序由一個驅動程序(Driver Program)和集群中的執行進程(Executer)構成。
- Driver Program:運行應用程序(Application)的main函數和創建SparkContext的程序。
- Executer:運行在工作節點(Work Node)上的進程。Executer負責運行任務(Task)並將各節點的數據保存在內存或磁盤中。每個應用程序都有自己對應Executer
- Work Node:集群中運行應用程序(Applicatuon)的節點
- Cluster Manager: 在集群上獲取資源的外部服務(如Standalone,Mesos,Yarn),稱作資源管理器或集群管理器
- Job: 包含多個Task的並行計算,往往由Spark Action(如save,collect)觸發生成,一個Application中往往會產生多個Job
- Stage:每個Job被分成了更小的任務集合(TaskSet),各個階段(Stage)相互依賴
- Task:被發送到某一個Executer的工作單元
- DAGScheduler:基於Stage的邏輯調度模塊,負責將每個Job分割成一個DAG圖
- TaskScheduler:基於Task的任務調度模塊,負責每個Task的跟蹤和向DAGScheduler匯報任務執行情況
2.運行架構
2.1基本架構:
圖示:

Spark Application在集群中以一組獨立的進程運行,通過你的驅動程序(driver program)中的SparkContext 對象進行協作。
具體來說,SparkContext可以連接到多種類型的集群管理器 cluster managers (standalone cluster manager, Mesos ,YARN),這些 cluster managers 負責跨應用程序分配資源。一旦連接,Spark就獲得集群中的節點上的executors,接下來,它會將應用程序代碼發送到executors。最后,SparkContext發送tasks到executors運行。
注意:該驅動程序會一直監聽並接受其executor傳入的連接(spark.driver.port在網絡配置部分)。這樣,driver program必須可以尋找到工作節點的網絡地址。數據不能跨應用程序(SparkContext)訪問,除非寫入外部系統
2.1.1 SparkContext類(代表連接到spark集群,現在一個jvm只能有一個sc,以后會取消):
幾個重要的屬性(包含DAGScheduler,TaskScheduler調度,獲取executor,心跳與監聽等):
說明:這里的下划線_代表默認值,比如Int 默認值就是0,String默認值就是None 參考知乎
/* ------------------------------------------------------------------------------------- * | Private variables. These variables keep the internal state of the context, and are | | not accessible by the outside world. They're mutable since we want to initialize all | | of them to some neutral value ahead of time, so that calling "stop()" while the | | constructor is still running is safe. | * ------------------------------------------------------------------------------------- */ private var _conf: SparkConf = _ private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None private var _ui: Option[SparkUI] = None private var _hadoopConfiguration: Configuration = _ private var _executorMemory: Int = _ private var _schedulerBackend: SchedulerBackend = _ private var _taskScheduler: TaskScheduler = _ private var _heartbeatReceiver: RpcEndpointRef = _ @volatile private var _dagScheduler: DAGScheduler = _ private var _applicationId: String = _ private var _applicationAttemptId: Option[String] = None private var _eventLogger: Option[EventLoggingListener] = None private var _executorAllocationManager: Option[ExecutorAllocationManager] = None private var _cleaner: Option[ContextCleaner] = None private var _listenerBusStarted: Boolean = false private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _
2.1.2 Executor(一個運行任務的線程池,通過RPC與Driver通信):
心跳報告(心跳進程,記錄心跳失敗次數和接受task的心跳):
這里有兩個參數:spark.executor.heartbeat.maxFailures = 60,spark.executor.heartbeatInterval = 10s,意味着最多每隔10min會重新發送一次心跳
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
// list of (task id, accumUpdates) to send back to the driver
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
}
}
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
heartbeatFailures = 0
} catch {
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}
Task管理(taskRunner類的啟動,停止)
// Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
下面是TaskRunner 的run方法,貼出來,以后研究
override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = computeTotalGcTime()
try {
val (taskFiles, taskJars, taskProps, taskBytes) =
Task.deserializeWithDependencies(serializedTask)
// Must be set before updateDependencies() is called, in case fetching dependencies
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskProps)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.localProperties = taskProps
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
}
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logWarning(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
if (task.killed) {
throw new TaskKilledException
}
val resultSer = env.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
case ffe: FetchFailedException =>
val reason = ffe.toTaskEndReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case _: TaskKilledException | _: InterruptedException if task.killed =>
logInfo(s"Executor killed $taskName (TID $taskId)")
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
case t: Throwable =>
// Attempt to exit cleanly by informing the driver of our failure.
// If anything goes wrong (or this was a fatal exception), we will delegate to
// the default uncaught exception handler, which will terminate the Executor.
logError(s"Exception in $taskName (TID $taskId)", t)
// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
if (task != null) {
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty
}
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val serializedTaskEndReason = {
try {
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
}
}
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) {
SparkUncaughtExceptionHandler.uncaughtException(t)
}
} finally {
runningTasks.remove(taskId)
}
}
2.2運行流程:
圖示:

注意這里的StandaloneExecutorBackend是一個概念(我在spark項目中沒找到),實際上的spark standalone的資源調度類是 CoarseGrainedExecutorBackend
1.構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(ClusterManager)(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
2.資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;
3.SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。
Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
4.Task在Executor上運行,運行完畢釋放所有資源。
2.3相關的類:
ExecutorBackend:
特質簽名(Executor用來向集群調度發送更新的插件)

各種運行模式的類圖:

其中standalone是用SparkDeploySchedulerBackend配合TeskSchedulerImpl工作,相關類圖應該是:

SchedulerBackend特質(核心函數:reviveOffers())
CoarseGrainedExecutorBackend(receive方法里是若干模式匹配,類似於switch case,根據相關模式執行相應操作。主要有注冊Executor,運行Task等)
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
exitExecutor(1)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
exitExecutor(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
exitExecutor(1)
} else {
executor.killTask(taskId, interruptThread)
}
case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)
case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
}
最后一個類SparkDeploySchedulerBackend(start):
var driverEndpoint: RpcEndpointRef = null
protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
2.4調度模塊:
2.4.1作業調度簡介
DAGScheduler: 根據Job構建基於Stage的DAG(Directed Acyclic Graph有向無環圖),並提交Stage給TASkScheduler。 其划分Stage的依據是RDD之間的依賴的關系找出開銷最小的調度方法,如下圖

注:從最后一個Stage開始倒推,如果有依賴關系 就先解決父節點,如果沒有依賴關系 就直接運行;這里做了一個簡單的實驗:Spark DAGSheduler生成Stage過程分析實驗
2.4.2 任務調度簡介:
TaskSchedulter: 將TaskSet提交給worker運行,每個Executor運行什么Task就是在此處分配的. TaskScheduler維護所有TaskSet,當Executor向Driver發生心跳時,TaskScheduler會根據資源剩余情況分配相應的Task。
另外TaskScheduler還維護着所有Task的運行標簽,重試失敗的Task。下圖展示了TaskScheduler的作用

在不同運行模式中任務調度器具體為:
-
- Spark on Standalone模式為TaskScheduler
- YARN-Client模式為YarnClientClusterScheduler
- YARN-Cluster模式為YarnClusterScheduler
3.運行模式
3.1 standalone模式
- Standalone模式使用Spark自帶的資源調度框架
- 采用Master/Slaves的典型架構,選用ZooKeeper來實現Master的HA
- 框架結構圖如下:

- 該模式主要的節點有Client節點、Master節點和Worker節點。其中Driver既可以運行在Master節點上中,也可以運行在本地Client端。當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式運行Spark任務時,Driver是運行在本地Client端上的
- 運行過程如下圖:(參考至)

- SparkContext連接到Master,向Master注冊並申請資源(CPU Core 和Memory)
- Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,然后在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;
- StandaloneExecutorBackend向SparkContext注冊;
- SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;並且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操作時,就會催生Job;每個Job中含有1個或多個Stage,Stage一般在獲取外部數據和shuffle之前產生),然后以Stage(或者稱為TaskSet)提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
- StandaloneExecutorBackend會建立Executor線程池,開始執行Task,並向SparkContext報告,直至Task完成
- 所有Task完成后,SparkContext向Master注銷,釋放資源
4 RDD實戰
sc.makeRDD(Seq("arachis","tony","lily","tom")).map{
name => (name.charAt(0),name)
}.groupByKey().mapValues{
names => names.toSet.size //unique and count
}.collect()
提交Job collect

划分Stage

提交Stage , 開始Task 運行調度
Stage0的DAG圖,makeRDD => map ; 相應生成了兩個RDD:ParallelCollectionRDD,MapPartitionsRDD

Stage1 的DAG圖,groupByKey => mapValues; 相應生成兩個RDD:ShuffledRDD, MapPartitionsRDD

- 將這些術語串起來的運行層次圖如下:

- Job=多個stage,Stage=多個同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency
鏈接:
Spark官網:http://spark.apache.org/docs/latest/cluster-overview.html
http://www.cnblogs.com/tgzhu/p/5818374.html
