在Spark中, RDD是有依賴關系的,這種依賴關系有兩種類型
- 窄依賴(Narrow Dependency)
- 寬依賴(Wide Dependency)
以下圖說明RDD的窄依賴和寬依賴

窄依賴
窄依賴指父RDD的每一個分區最多被一個子RDD的分區所用,表現為
- 一個父RDD的分區對應於一個子RDD的分區
- 兩個父RDD的分區對應於一個子RDD 的分區。
如上面的map,filter,union屬於第一類窄依賴,而join with inputs co-partitioned(對輸入進行協同划分的join操作,也就是說先按照key分組然后shuffle write的時候一個父分區對應一個子分區)則為第二類窄依賴
如果有多個父RDD的分區對應於同一個子RDD的分區不能稱之為窄依賴?
寬窄依賴與容錯性
Spark基於lineage的容錯性是指,如果一個RDD出錯,那么可以從它的所有父RDD重新計算所得,如果一個RDD僅有一個父RDD(即窄依賴),那么這種重新計算的代價會非常小。
Spark基於Checkpoint(物化)的容錯機制何解?在上圖中,寬依賴得到的結果(經歷過Shuffle過程)是很昂貴的,因此,Spark將此結果物化到磁盤上了,以備后面使用
寬依賴
寬依賴指子RDD的每個分區都要依賴於父RDD的所有分區,這是shuffle類操作,上圖中的groupByKey和對輸入未協同划分的join操作就是寬依賴。
窄依賴細說
窄依賴對優化很有利。邏輯上,每個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個並行任務的barrier): 把計算fork到每個分區,算完后join,然后fork/join下一個RDD的算子。如果直接翻譯到物理實現,是很不經濟的:一是每一個RDD(即使 是中間結果)都需要物化到內存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分區到 父RDD的分區是窄依賴,就可以實施經典的fusion優化,把兩個fork/join合為一個;如果連續的變換算子序列都是窄依賴,就可以把很多個 fork/join並為一個,不但減少了大量的全局barrier,而且無需物化很多中間結果RDD,這將極大地提升性能。Spark把這個叫做流水線 (pipeline)優化。
Spark流水線優化:

寬依賴細說
變換算子序列一碰上shuffle類操作,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前算子往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的算子序列。在這個stage里,可以安全地實施流水線優化。然后,又從那個寬依賴開始繼續回溯,生成下一個stage。
分區划分規則與首選位置
要深究兩個問題:一,分區如何划分;二,分區該放到集群內哪個節點。這正好對應於RDD結構中另外兩個域:分區划分器(partitioner)和首選位置(preferred locations)。
分區划分
分區划分對於shuffle類操作很關鍵,它決定了該操作的父RDD和子RDD之間的依賴類型。上文提到,同一個join算子,如果協同划分的話,兩個父 RDD之間、父RDD與子RDD之間能形成一致的分區安排,即同一個key保證被映射到同一個分區,這樣就能形成窄依賴。反之,如果沒有協同划分,導致寬依賴。
所謂協同划分,就是指定分區划分器以產生前后一致的分區安排。Pregel和HaLoop把這個作為系統內置的一部分;而Spark 默認提供兩種划分器:HashPartitioner和RangePartitioner,允許程序通過partitionBy算子指定。注意,HashPartitioner能夠發揮作用,要求key的hashCode是有效的,即同樣內容的key產生同樣的hashCode。這對 String是成立的,但對數組就不成立(因為數組的hashCode是由它的標識,而非內容,生成)。這種情況下,Spark允許用戶自定義 ArrayHashPartitioner。
首選位置
第二個問題是分區放置的節點,這關乎數據本地性:本地性好,網絡通信就少。有些RDD產生時就 有首選位置,如HadoopRDD分區的首選位置就是HDFS塊所在的節點。有些RDD或分區被緩存了,那計算就應該送到緩存分區所在的節點進行。再不然,就回溯RDD的lineage一直找到具有首選位置屬性的父RDD,並據此決定子RDD的放置。
寬/窄依賴的概念不止用在調度中,對容錯也很有用。如果一個節點宕機了,而且運算是窄依賴,那只要把丟失的父RDD分區重算即可,跟其他節點沒有依賴。而寬依賴需要父RDD的所有分區都存在, 重算就很昂貴了。所以如果使用checkpoint算子來做檢查點,不僅要考慮lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加檢查點是最物有所值的。
Spark中關於Dependency的源代碼
package org.apache.spark import scala.reflect.ClassTag import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleHandle /** * :: DeveloperApi :: * Base class for dependencies. */ @DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] } /** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */
//這里是說,窄依賴是指子RDD的每個Partition只依賴於父RDD很少部分的的RDD,文檔明顯說的不對!窄依賴起碼需要父RDD的每個Partition只被一個子RDD的Partition依賴
@DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd } /** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None, * the default serializer, as specified by `spark.serializer` config option, will * be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */
//ShuffleDependency指的是,子RDD的partition部分依賴於父RDD的每個Partition 部分依賴被稱為 ShuffleDependency。
//其實 ShuffleDependency 跟 MapReduce 中 shuffle 的數據依賴相同(mapper 將其 output 進行 partition,然后每個 reducer 會將所有 mapper 輸出中屬於自己的 partition 通過 HTTP fetch 得到)。
@DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) } /** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) } /** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
ShuffleDependency的參數說明:
1.RDD數據集合中的元素需要是(K,V)類型,因為一般需要依據key進行shuffle,所以數據結構往往是key-value;同時Shuffle需要根據K做shuffle output的partition。
2.Partitioner,按照K進行分區的算法,比如HashPartitioner
3.Serializer,因為Shuffle過程需要有數據的網絡傳輸,因此需要序列化,Serializer即是指定序列化算法
4.keyOrdering
5.aggregator,mapSideCombine用於map端的combine
6.shuffleId:每個shuffle操作都有一個唯一的ID
