從兩方面來闡述spark的組件,一個是宏觀上,一個是微觀上。
1. spark組件
要分析spark的源碼,首先要了解spark是如何工作的。spark的組件:
了解其工作過程先要了解基本概念
官方羅列了一些概念:
Term | Meaning |
---|---|
Application | User program built on Spark. Consists of a driver program and executors on the cluster. |
Application jar | A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime. |
Driver program | The process running the main() function of the application and creating the SparkContext |
Cluster manager | An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN) |
Deploy mode | Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster. |
Worker node | Any node that can run application code in the cluster |
Executor | A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors. |
Task | A unit of work that will be sent to one executor |
Job | A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save ,collect ); you'll see this term used in the driver's logs. |
Stage | Each job gets divided into smaller sets of tasks called stages that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs. |
1.1 SparkContext
sparkContext:Main entry point for Spark functionality. A SparkContext represents the connection to a Spark,cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
1.2 Task
/** * A unit of execution. We have two kinds of Task's in Spark: * * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] * * A Spark job consists of one or more stages. The very last stage in a job consists of multiple * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task * and sends the task output back to the driver application. A ShuffleMapTask executes the task * and divides the task output to multiple buckets (based on the task's partitioner). * * @param stageId id of the stage this task belongs to * @param partitionId index of the number in the RDD */
1.3 ActiveJob
/** * A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a * ResultStage to execute an action, or a map-stage job, which computes the map outputs for a * ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive * query planning, to look at map output statistics before submitting later stages. We distinguish * between these two types of jobs using the finalStage field of this class. * * Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's * submitJob or submitMapStage methods. However, either type of job may cause the execution of * other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of * these previous stages. These dependencies are managed inside DAGScheduler. * * @param jobId A unique ID for this job. * @param finalStage The stage that this job computes (either a ResultStage for an action or a * ShuffleMapStage for submitMapStage). * @param callSite Where this job was initiated in the user's program (shown on UI). * @param listener A listener to notify if tasks in this job finish or the job fails. * @param properties Scheduling properties attached to the job, such as fair scheduler pool name. */
1.4 Stage
/** * A stage is a set of parallel tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the * DAGScheduler runs these stages in topological order. * * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for * other stage(s), or a result stage, in which case its tasks directly compute a Spark action * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also * track the nodes that each output partition is on. * * Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered * faster on failure. * * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that * case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI. * The latest one will be accessible through latestInfo. * * @param id Unique stage ID * @param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks * on, while for a result stage, it's the target RDD that we ran an action on * @param numTasks Total number of tasks in stage; result stages in particular may not need to * compute all partitions, e.g. for first(), lookup(), and take(). * @param parents List of stages that this stage depends on (through shuffle dependencies). * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling. * @param callSite Location in the user program associated with this stage: either where the target * RDD was created, for a shuffle map stage, or where the action for a result stage was called. */
1.5 executor
/** * Spark executor, backed by a threadpool to run tasks. * * This can be used with Mesos, YARN, and the standalone scheduler. * An internal RPC interface (at the moment Akka) is used for communication with the driver, * except in the case of Mesos fine-grained mode. */
2. spark核心
Spark建立在統一抽象的RDD之上,使得它可以以基本一致的方式應對不同的大數據處理場景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。
要理解Spark,就需得理解RDD。
2.1 RDD是什么?
它的特性可以總結如下:
- 它是不變的數據結構存儲
- 它是支持跨集群的分布式數據結構
- 可以根據數據記錄的key對結構進行分區
- 提供了粗粒度的操作,且這些操作都支持分區
- 它將數據存儲在內存中,從而提供了低延遲性
官方定義:
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, * partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
從上述描述可以知道:
1.rdd是可以並行操作的不可變,分區的元素集合。
2.rdd定義了所有類型rdd的操作如:
`map`, `filter`, and `persist`
其中,其它操作有:PairRDDFunctions,DoubleRDDFunctions,SequenceFileRDDFunctions
3. rdd主要由五部分組成:
一組分片(partition);計算每個分片的函數;其它rdd的依賴集合;可選的分區鍵(key-value rdd擁有);一個列表,存儲存取每個partition的preferred位置。對於一個HDFS文件來說,存儲每個partition所在的塊的位置。
小結:
RDD,全稱為Resilient Distributed Datasets,是一個容錯的、並行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,並能控制數據的分區。同時,RDD還提供了一組豐富的操作來操作這些數據。在這些操作中,諸如map、flatMap、filter等轉換操作實現了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支持常見的數據運算。
通常來講,針對數據處理有幾種常見模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm則采用了Stream Processing模型。RDD混合了這四種模型,使得Spark可以應用於各種大數據處理場景。
RDD作為數據結構,本質上是一個只讀的分區記錄集合。一個RDD可以包含多個分區,每個分區就是一個dataset片段。RDD可以相互依賴。如果RDD的每個分區最多只能被一個Child RDD的一個分區使用,則稱之為narrow dependency;若多個Child RDD分區都可以依賴,則稱之為wide dependency。不同的操作依據其特性,可能會產生不同的依賴。例如map操作會產生narrow dependency,而join操作則產生wide dependency。
Spark之所以將依賴分為narrow與wide,基於兩點原因。
首先,narrow dependencies可以支持在同一個cluster node上以管道形式執行多條命令,例如在執行了map后,緊接着執行filter。相反,wide dependencies需要所有的父分區都是可用的,可能還需要調用類似MapReduce之類的操作進行跨節點傳遞。
其次,則是從失敗恢復的角度考慮。narrow dependencies的失敗恢復更有效,因為它只需要重新計算丟失的parent partition即可,而且可以並行地在不同節點進行重計算。而wide dependencies牽涉到RDD各級的多個Parent Partitions。下圖說明了narrow dependencies與wide dependencies之間的區別:
本圖來自Matei Zaharia撰寫的論文An Architecture for Fast and General Data Processing on Large Clusters。圖中,一個box代表一個RDD,一個帶陰影的矩形框代表一個partition。
2.2 RDD對容錯的支持
支持容錯通常采用兩種方式:數據復制或日志記錄。對於以數據為中心的系統而言,這兩種方式都非常昂貴,因為它需要跨集群網絡拷貝大量數據,畢竟帶寬的數據遠遠低於內存。
RDD天生是支持容錯的。首先,它自身是一個不變的(immutable)數據集,其次,它能夠記住構建它的操作圖(Graph of Operation),因此當執行任務的Worker失敗時,完全可以通過操作圖獲得之前執行的操作,進行重新計算。由於無需采用replication方式支持容錯,很好地降低了跨網絡的數據傳輸成本。
不過,在某些場景下,Spark也需要利用記錄日志的方式來支持容錯。例如,在Spark Streaming中,針對數據進行update操作,或者調用Streaming提供的window操作時,就需要恢復執行過程的中間狀態。此時,需要通過Spark提供的checkpoint機制,以支持操作能夠從checkpoint得到恢復。
針對RDD的wide dependency,最有效的容錯方式同樣還是采用checkpoint機制。不過,似乎Spark的最新版本仍然沒有引入auto checkpointing機制。
2.3 分片Partition
Partition是RDD中一個分片的標識。
/** * An identifier for a partition in an RDD. */ trait Partition extends Serializable { /** * Get the partition's index within its parent RDD */ def index: Int // A better default implementation of HashCode override def hashCode(): Int = index }
2.4 分片函數Partitioner
分片函數Partitioner:An object that defines how the elements in a key-value pair RDD are partitioned by key.Maps each key to a partition ID, from 0 to `numPartitions - 1`.
分片函數的分類
默認分片函數defaultPartitioner:
/** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If any of the RDDs already has a partitioner, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the * same as the number of partitions in the largest upstream RDD, as this should * be least likely to cause out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } }
hash分片函數HashPartitioner:A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using, Java's `Object.hashCode`.Java arrays have hashCodes that are based on the arrays' identities rather than their contents,so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will produce an unexpected or incorrect result.
Range分片函數RangePartitioner:A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. The ranges are determined by sampling the content of the RDD passed in. Note that the actual number of partitions created by the RangePartitioner might not be the same as the `partitions` parameter, in the case where the number of sampled records is less than the value of `partitions`.
2.5 依賴Dependency
依賴的分類
窄依賴NarrowDependency:Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
shuffle依賴ShuffleDependency:Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,the RDD is transient since we don't need it on the executor side.
一對一依賴OneToOneDependency:Represents a one-to-one dependency between partitions of the parent and child RDDs.
范圍依賴RangeDependency:Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
參考文獻:
【1】http://spark.apache.org/docs/latest/cluster-overview.html
【2】http://www.infoq.com/cn/articles/spark-core-rdd/