Spark源碼分析之-scheduler模塊
這位寫的非常好, 讓我對Spark的源碼分析, 變的輕松了許多
這里自己再梳理一遍
先看一個簡單的spark操作,
val sc = new SparkContext(……)
val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()
1. SparkContext
這是Spark的入口, 任何需要使用Spark的地方都需要先創建SparkContext
在SparkContext中, 最主要的初始化工作就是start TaskScheduler和DAGScheduler, 這兩個就是Spark的核心所在
Spark的設計非常的干凈, 把整個DAG抽象層從實際的task執行中剝離了出來
DAGScheduler, 負責解析spark命令, 生成stage, 形成DAG, 最終划分成tasks, 提交給TaskScheduler, 他只完成靜態分析
TaskScheduler, 專門負責task執行, 他只負責資源管理, task分配, 執行情況的報告
這樣的好處, 就是Spark可以通過提供不同的TaskScheduler簡單的支持各種資源調度和執行平台, 現在Spark支持, local, standalone, mesos, Yarn...
class SparkContext( val master: String, val appName: String, val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) extends Logging { // Create and start the scheduler private var taskScheduler: TaskScheduler = { //....... } taskScheduler.start() @volatile private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() }
2. sc.textFile
然后當然要載入被處理的數據, 最常用的textFile, 其實就是生成HadoopRDD, 作為起始的RDD
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
.map(pair => pair._2.toString)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits ) : RDD[(K, V)] = { val conf = new JobConf(hadoopConfiguration) FileInputFormat.setInputPaths(conf, path) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) }
3. Transform and Action
這里調用的filter transform很簡單, 可以參考前面的blog
關鍵調用count action, action的不同在於, 會調用runjob
所以在調用action之前, job都是沒有被真正執行的
def count(): Long = {// 只有在action中才會真正調用runJob, 所以transform都是lazy的 sc.runJob(this, (iter: Iterator[T]) => { // count調用的是簡化版的runJob, 只傳入rdd和func, 其他的會用默認值補全 var result = 0L while (iter.hasNext) { result += 1L iter.next() } result }).sum }
4. sc.runJob
關鍵在於調用了dagScheduler.runJob
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver(創建SparkContext的進程) rather than * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( rdd: RDD[T], //只需要傳入Final RDD, 前面的可以根據dependency推出 func: (TaskContext, Iterator[T]) => U, //action的邏輯,比如count邏輯 partitions: Seq[Int], //partition的個數 allowLocal: Boolean, //對於一些簡單的action,是否允許在local執行 resultHandler: (Int, U) => Unit) { //會在JobWaiter的taskSucceeded中用於處理task result val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result }