問題導讀:
1.spark是如何提交作業的?
2.Akka框架是如何實現的?
3.如何實現調度的?
前言
折騰了很久,終於開始學習Spark的源碼了,第一篇我打算講一下Spark作業的提交過程。

這個是Spark的App運行圖,它通過一個Driver來和集群通信,集群負責作業的分配。今天我要講的是如何創建這個Driver Program的過程。
1、作業提交方法以及參數
我們先看一下用Spark Submit提交的方法吧,下面是從官方上面摘抄的內容。
# Run on a Spark standalone cluster ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000
這個是提交到standalone集群的方式,打開spark-submit這文件,我們會發現它最后是調用了org.apache.spark.deploy.SparkSubmit這個類。
我們直接進去看就行了,main函數就幾行代碼,太節省了。
def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs) launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose) }
我們主要看看createLaunchEnv方法就可以了,launch是反射調用mainClass,精華全在createLaunchEnv里面了。
在里面我發現一些有用的信息,可能在官方文檔上面都沒有的,發出來大家瞅瞅。前面不帶--的可以在spark-defaults.conf里面設置,帶--的直接在提交的時候指定,具體含義大家一看就懂。
val options = List[OptionAssigner]( OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"), OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"), OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, sysProp = "spark.driver.extraJavaOptions"), OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), OptionAssigner(args.queue, YARN, true, clOption = "--queue"), OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, sysProp = "spark.executor.memory"), OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, sysProp = "spark.cores.max"), OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.files, YARN, true, clOption = "--files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), OptionAssigner(args.archives, YARN, true, clOption = "--archives"), OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars") )
Driver程序的部署模式有兩種,client和cluster,默認是client。client的話默認就是直接在本地運行了Driver程序了,cluster模式還會兜一圈把作業發到集群上面去運行。
指定部署模式需要用參數--deploy-mode來指定,或者在環境變量當中添加DEPLOY_MODE變量來指定。
下面講的是cluster的部署方式,兜一圈的這種情況。
yarn模式的話mainClass是org.apache.spark.deploy.yarn.Client,standalone的mainClass是org.apache.spark.deploy.Client。
這次我們講org.apache.spark.deploy.Client,yarn的話單獨找一章出來單獨講,目前超哥還是推薦使用standalone的方式部署spark,具體原因不詳,據說是因為資源調度方面的問題。
說個快捷鍵吧,Ctrl+Shift+N,然后輸入Client就能找到這個類,這是IDEA的快捷鍵,相當好使。
我們直接找到它的main函數,發現了它居然使用了Akka框架,我百度了一下,被它震驚了。
2、Akka
在main函數里面,主要代碼就這么三行。
//創建一個ActorSystem val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0, conf, new SecurityManager(conf)) //執行ClientActor的preStart方法和receive方法 actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) //等待運行結束 actorSystem.awaitTermination()
看了這里真的有點兒懵啊,這是啥玩意兒,不懂的朋友們,請點擊這里Akka。下面是它官方放出來的例子:
//定義一個case class用來傳遞參數 case class Greeting(who: String) //定義Actor,比較重要的一個方法是receive方法,用來接收信息的 class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ⇒ log.info("Hello " + who) } } //創建一個ActorSystem val system = ActorSystem("MySystem") //給ActorSystem設置Actor val greeter = system.actorOf(Props[GreetingActor], name = "greeter") //向greeter發送信息,用Greeting來傳遞 greeter ! Greeting("Charlie Parker")
簡直是無比強大啊,就這么幾行代碼就搞定了,接下來看你會更加震驚的。
我們回到Client類當中,找到ClientActor,它有兩個方法,是之前說的preStart和receive方法,preStart方法用於連接master提交作業請求,receive方法用於接收從master返回的反饋信息。
我們先看preStart方法吧。
override def preStart() = { // 這里需要把master的地址轉換成akka的地址,然后通過這個akka地址獲得指定的actor // 它的格式是"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) // 把自身設置成遠程生命周期的事件 context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) driverArgs.cmd match { case "launch" => // 此處省略100個字 val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" // 此處省略100個字 // 向master發送提交Driver的請求,把driverDescription傳過去,RequestSubmitDriver前面說過了,是個case class masterActor ! RequestSubmitDriver(driverDescription) case "kill" => val driverId = driverArgs.driverId val killFuture = masterActor ! RequestKillDriver(driverId) } }
從上面的代碼看得出來,它需要設置master的連接地址,最后提交了一個RequestSubmitDriver的信息。在receive方法里面,就是等待接受回應了,有兩個Response分別對應着這里的launch和kill。
線索貌似到這里就斷了,那下一步在哪里了呢?當然是在Master里面啦,怎么知道的,猜的,哈哈。
Master也是繼承了Actor,在它的main函數里面找到了以下代碼:
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName) val timeout = AkkaUtils.askTimeout(conf) val respFuture = actor.ask(RequestWebUIPort)(timeout) val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
和前面的actor基本一致,多了actor.ask這句話,查了一下官網的文檔,這句話的意思的發送消息,並且接受一個Future作為response,和前面的actor ! message的區別就是它還接受返回值。
具體的Akka的用法,大家還是參照官網吧,Akka確實如它官網所言的那樣子,是一個簡單、強大、並行的分布式框架。
小結:
Akka的使用確實簡單,短短的幾行代碼即刻完成一個通信功能,比Socket簡單很多。但是它也逃不脫我們常說的那些東西,請求、接收請求、傳遞的消息、注冊的地址和端口這些概念。
3、調度schedule
我們接下來查找Master的receive方法吧,Master是作為接收方的,而不是主動請求,這點和hadoop是一致的。
case RequestSubmitDriver(description) => { val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) // 調度 schedule() // 告訴client,提交成功了,把driver.id告訴它 sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") }
這里我們主要看schedule方法就可以了,它是執行調度的方法。
private def schedule() { if (state != RecoveryState.ALIVE) { return } // 首先調度Driver程序,從workers里面隨機抽一些出來 val shuffledWorkers = Random.shuffle(workers) for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { for (driver <- waitingDrivers) { // 判斷內存和cpu夠不夠,夠的就執行了哈 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } } } // 這里是按照先進先出的,spreadOutApps是由spark.deploy.spreadOut參數來決定的,默認是true if (spreadOutApps) { // 遍歷一下app for (app <- waitingApps if app.coresLeft > 0) { // canUse里面判斷了worker的內存是否夠用,並且該worker是否已經包含了該app的Executor val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // 記錄每個節點的核心數 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) var pos = 0 // 遍歷直到分配結束 while (toAssign > 0) { // 從0開始遍歷可用的work,如果可用的cpu減去已經分配的>0,就可以分配給它 if (usableWorkers(pos).coresFree - assigned(pos) > 0) { toAssign -= 1 // 這個位置的work的可分配的cpu數+1 assigned(pos) += 1 } pos = (pos + 1) % numUsable } // 給剛才標記的worker分配任務 for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING } } } } else { // 這種方式和上面的方式的區別是,這種方式盡可能用少量的節點來完成這個任務 for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { // 判斷條件是worker的內存比app需要的內存多 if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } } }
它的調度器是這樣的,先調度Driver程序,然后再調度App,調度App的方式是從各個worker的里面和App進行匹配,看需要分配多少個cpu。
那我們接下來看兩個方法launchDriver和launchExecutor即可。
def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.actor ! LaunchDriver(driver.id, driver.desc) driver.state = DriverState.RUNNING }
給worker發送了一個LaunchDriver的消息,下面在看launchExecutor的方法。
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
它要做的事情多一點,除了給worker發送LaunchExecutor指令外,還需要給driver發送ExecutorAdded的消息,說你的任務已經有人干了。
在繼續Worker講之前,我們先看看它是怎么注冊進來的,每個Worker啟動之后,會自動去請求Master去注冊自己,具體我們可以看receive的方法里面的RegisterWorker這一段,它需要上報自己的內存、Cpu、地址、端口等信息,注冊成功之后返回RegisteredWorker信息給它,說已經注冊成功了。
4、Worker執行
同樣的,我們到Worker里面在receive方法找LaunchDriver和LaunchExecutor就可以找到我們要的東西。
case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
看一下start方法吧,start方法里面,其實是new Thread().start(),run方法里面是通過傳過來的DriverDescription構造的一個命令,丟給ProcessBuilder去執行命令,結束之后調用。
worker !DriverStateChanged通知worker,worker再通過master ! DriverStateChanged通知master,釋放掉worker的cpu和內存。
同理,LaunchExecutor執行完畢了,通過worker ! ExecutorStateChanged通知worker,然后worker通過master ! ExecutorStateChanged通知master,釋放掉worker的cpu和內存。
下面我們再梳理一下這個過程,只包括Driver注冊,Driver運行之后的過程在之后的文章再說,比較復雜。
1、Client通過獲得Url地址獲得ActorSelection(master的actor引用),然后通過ActorSelection給Master發送注冊Driver請求(RequestSubmitDriver)
2、Master接收到請求之后就開始調度了,從workers列表里面找出可以用的Worker
3、通過Worker的actor引用ActorRef給可用的Worker發送啟動Driver請求(LaunchDriver)
4、調度完畢之后,給Client回復注冊成功消息(SubmitDriverResponse)
5、Worker接收到LaunchDriver請求之后,通過傳過來的DriverDescription的信息構造出命令來,通過ProcessBuilder執行
6、ProcessBuilder執行完命令之后,通過DriverStateChanged通過Worker
7、Worker最后把DriverStateChanged匯報給Master