背景: 前幾天了解了spark了運行架構,spark代碼提交給driver時候會根據rdd生成DAG,那么實際DAG在代碼中是如何生成的呢?
首先了解,spark任務中的幾個划分點:
1.job:job是由rdd的action來划分,每一個action操作是在spark任務執行時是一個job。(action的區分:rdd分為行動操作和轉化操作,因為我們知道rdd是惰性加載的,除非遇到行動操作,前面的所有的轉化操作才會執行,這也就是為什么spark任務由job來划分執行了,區分行動操作和轉化操作最簡單的方法就是看,rdd放回的值,如果返回的是一個rdd則是轉化操作,例如map,如果返回的是一個其他的數據類型則是行動操作,例如count)
2.stage:根據rdd的寬窄依賴來划分(shuffle來區分),遇到shuffle,則將shuffle之前的窄依賴歸來一個stage;
3.task:task是由最后的executor執行的最小任務,它最終落到各個executor上,實現分布式執行;
簡單的歸納一下他們的關系:job -> stage -> task (job中有多個stage,stage中有多個task);
spark運行時,一個任務由client提交,再由driver划分邏輯實現圖DAG,最后分配給各個executor上執行task;
思考:任務是如何分配監聽的?hash分配,隨機分配?
spark在任務拆分的時候,參考下圖:

1.先由sparkcontext初始化,創建一個DAGshcheduler,啟動一個監聽器,監聽spark任務,spark拆分的所有任務都會發給這個監聽器;
2.客戶端這邊,當我們調用action時,則action會向sparkcontext啟動一個runjob,即是將action任務(一個job)提交給DAGshcheduler的監聽器;
3.接到job的DAGscheduler 會將任務交給handleJobSubmitted 來處理;
4. 每個job會生成一個resultstage,其余的都是shufflestage,shufflestage是根據rdd的寬依賴來生成的,根據廣度優先遍歷rdd,遇到shufflestage就創建一個新的stage;
5.形成DAG圖之后,遍歷執行stage列表,根據父子stage順序執行,如果上層未執行完,下層會一直等待;
6.每個stage會拆分成多個task,交由taskshcheduler來分配,等待executor來執行完一個task后交給下一個task;
1. spark2.0中, 初始化sparksession builder 中的sparkcontext在初始化的時候會創建一個dagscheduler的變量,
sparkcontext:
_dagScheduler = new DAGScheduler(this)
DAGscheduler的構造方法中,會自己創建一個DAGschedulereventprocessloop,並自啟動一個監聽器
DAGscheduler:
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
……
eventProcessLoop.start()
}
DAGschedulereventprocessloop的父類 EventLoop 中有個線程類Thread 會起一個線程監聽
EventLoop:
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) => {
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
其中核心就是一個線程只做onReceive操作,父類只是一個抽象類,子類實現這個方法,調用doOnReceive
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
其最終單線程監聽循環執行的就是:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 處理job提交任務
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
// 處理map提交的stage任務
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
// 處理map stage 取消的任務
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
// 處理job 取消的任務
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
// 處理job 組取消的任務
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
// 處理所有job 取消的任務
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
// 處理executort完成分配的事件
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
// 處理executor對視事件
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
// 處理task丟失的事件
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
// 處理重新提交失敗Stage事件
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
假如執行一個rdd 執行action操作,即是將rdd中由sparkcontext 調用 runjob方法,sparkcontext中的初始化的DAGscheduler來調用 submitjob將這一個event事件加入到
DAGscheduler的執行隊列中,等待線程順序執行
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
........
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
至此,一個rdd的執行操作已經進入DAG監聽器的隊列了,下一步由監聽器取按順序取出來doOreceive 按照event的實際類型來執行相應的操作:
如果調用JobSubmitted方法,則調用相對應的handleJobSubmitted。
