Spark分析之SparkContext啟動過程分析


SparkContext作為整個Spark的入口,不管是spark、sparkstreaming、spark sql都需要首先創建一個SparkContext對象,然后基於這個SparkContext進行后續RDD的操作;所以很有必要了解下SparkContext在初始化時干了什么事情。

 

SparkContext初始化過程主要干了如下幾件事情:

1、根據SparkContext的構造入參SparkConf創建SparkEnv;

2、初始化SparkUI;

3、創建TaskScheduler;

4、創建DAGScheduler;

5、啟動taskScheduler;

 

通過源代碼說明SparkContext初始化的過程

1、創建SparkEnv

private[spark] val env = SparkEnv.create(
    conf, "<driver>", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt,
    isDriver = true, isLocal = isLocal, listenerBus = listenerBus)
SparkEnv.set(env)

 

2、初始化SparkUI

private[spark] val ui = new SparkUI(this)
ui.bind()

 

3、創建TaskScheduler:根據spark的運行模式創建不同的SchedulerBackend

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
    val SPARK_REGEX = """spark://(.*)""".r

    master match {
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend) //為TaskSchedulerImpl中的backend變量初始化
        scheduler
   }
}

TaskSchedulerImpl extends TaskScheduler{
    var backend: SchedulerBackend = null
    def initialize(backend: SchedulerBackend) {
        this.backend = backend   //將SparkDeploySchedulerBackend賦值給backend變量
        rootPool = new Pool("", schedulingMode, 0, 0)
        schedulableBuilder = {
            schedulingMode match {
                case SchedulingMode.FIFO =>  //先進先出調度
                    new FIFOSchedulableBuilder(rootPool)
                case SchedulingMode.FAIR =>   //公平調度
                    new FairSchedulableBuilder(rootPool, conf)
            }
        }
        schedulableBuilder.buildPools()
    }
}

private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging {
    
}

 

4、創建DAGScheduler:根據TaskScheduler創建DAGScheduler,用於接收提交過來的job

 

 

//根據TaskScheduler創建DAGScheduler,產生eventProcssActor(是DAGSchedule的通信載體,能接收和發送很多消息)
@volatile private[spark] var dagScheduler: DAGScheduler = new DAGScheduler(this)
class DAGScheduler{
    
    def this(sc: SparkContext) = this(sc, sc.taskScheduler)

    private def initializeEventProcessActor() {
        implicit val timeout = Timeout(30 seconds)
        val initEventActorReply =  dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
        eventProcessActor = Await.result(initEventActorReply, timeout.duration).
        asInstanceOf[ActorRef]
    }

    initializeEventProcessActor()
}

//詳細分析見DAGScheduler篇章
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)extends Actor with Logging {{ override def preStart() { dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,listener, properties) ...... } }

 

5、啟動taskScheduler

啟動taskScheduler的主要目的是啟動相應的SchedulerBackend,並判斷是否進行推測式執行任務;

在啟動TaskScheduler的過程中會創建Application並向Master發起注冊請求;

taskScheduler.start()

TaskSchedulerImpl extends TaskScheduler{ var backend: SchedulerBackend = null override def start() { backend.start() //spark.speculation... } } private[spark] class SparkDeploySchedulerBackend(scheduler: TaskSchedulerImpl,sc: SparkContext,masters: Array[String]) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with AppClientListener with Logging { var client: AppClient = null val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { super.start() //調用CoarseGrainedSchedulerBackend的start()方法 val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() } } class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging var driverActor: ActorRef = null override def start() { driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } } class ClientActor extends Actor with Logging{ override def preStart() { registerWithMaster() //向Master注冊Application } }

 

CoarseGrainedSchedulerBackend與CoarseGrainedExecutorBackend通信

private[spark] class CoarseGrainedExecutorBackend(driverUrl: String, executorId: String, hostPort: String, cores: Int)
  extends Actor with ExecutorBackend with Logging {
    var executor: Executor = null
    var driver: ActorSelection = null

    override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        driver = context.actorSelection(driverUrl)
        driver ! RegisterExecutor(executorId, hostPort, cores)  //注冊Executor,接收方是CoarseGrainedSchedulerBackend
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    }

    override def receive = {
        case RegisteredExecutor(sparkProperties)
        case LaunchTask(taskDesc) 
        case KillTask(taskId, _, interruptThread)
        case StopExecutor
    }
}

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM