SparkContext作為整個Spark的入口,不管是spark、sparkstreaming、spark sql都需要首先創建一個SparkContext對象,然后基於這個SparkContext進行后續RDD的操作;所以很有必要了解下SparkContext在初始化時干了什么事情。
SparkContext初始化過程主要干了如下幾件事情:
1、根據SparkContext的構造入參SparkConf創建SparkEnv;
2、初始化SparkUI;
3、創建TaskScheduler;
4、創建DAGScheduler;
5、啟動taskScheduler;
通過源代碼說明SparkContext初始化的過程
1、創建SparkEnv
private[spark] val env = SparkEnv.create( conf, "<driver>", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal, listenerBus = listenerBus) SparkEnv.set(env)
2、初始化SparkUI
private[spark] val ui = new SparkUI(this) ui.bind()
3、創建TaskScheduler:根據spark的運行模式創建不同的SchedulerBackend
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master) private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { val SPARK_REGEX = """spark://(.*)""".r master match { case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) //為TaskSchedulerImpl中的backend變量初始化 scheduler } } TaskSchedulerImpl extends TaskScheduler{ var backend: SchedulerBackend = null def initialize(backend: SchedulerBackend) { this.backend = backend //將SparkDeploySchedulerBackend賦值給backend變量 rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => //先進先出調度 new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => //公平調度 new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() } } private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging { }
4、創建DAGScheduler:根據TaskScheduler創建DAGScheduler,用於接收提交過來的job
//根據TaskScheduler創建DAGScheduler,產生eventProcssActor(是DAGSchedule的通信載體,能接收和發送很多消息) @volatile private[spark] var dagScheduler: DAGScheduler = new DAGScheduler(this) class DAGScheduler{ def this(sc: SparkContext) = this(sc, sc.taskScheduler) private def initializeEventProcessActor() { implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef] } initializeEventProcessActor() }
//詳細分析見DAGScheduler篇章 private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)extends Actor with Logging {{ override def preStart() { dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties) ...... } }
5、啟動taskScheduler
啟動taskScheduler的主要目的是啟動相應的SchedulerBackend,並判斷是否進行推測式執行任務;
在啟動TaskScheduler的過程中會創建Application並向Master發起注冊請求;
taskScheduler.start()
TaskSchedulerImpl extends TaskScheduler{ var backend: SchedulerBackend = null override def start() { backend.start() //spark.speculation... } } private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging { var client: AppClient = null val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { super.start() //調用CoarseGrainedSchedulerBackend的start()方法 val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() } } class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging var driverActor: ActorRef = null override def start() { driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } } class ClientActor extends Actor with Logging{ override def preStart() { registerWithMaster() //向Master注冊Application } }
CoarseGrainedSchedulerBackend與CoarseGrainedExecutorBackend通信
private[spark] class CoarseGrainedExecutorBackend(driverUrl: String, executorId: String, hostPort: String, cores: Int) extends Actor with ExecutorBackend with Logging { var executor: Executor = null var driver: ActorSelection = null override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores) //注冊Executor,接收方是CoarseGrainedSchedulerBackend context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { case RegisteredExecutor(sparkProperties) case LaunchTask(taskDesc) case KillTask(taskId, _, interruptThread) case StopExecutor } }