RDD根據對父RDD的依賴關系,可分為窄依賴與寬依賴2種。
主要的區分之處在於父RDD的分區被多少個子RDD分區所依賴,如果一個就為窄依賴,多個則為寬依賴。更好的定義應該是:
窄依賴的定義是子RDD的每一個分區都依賴於父RDD的一個或者少量幾個分區(不依賴於全部分區)
與依賴相關的以下5個類:
Dependency <--NarrowDependency <--OneToOneDependency <--RangeDependency <--ShuffleDependency
它們全部在同一個Scala文件中,Dependency是一個abstract class, NarrowDependency(abstract class)與ShuffleDependency直接繼承與它,OneToOneDependency與RangeDependency繼承自NarrowDependency,大致如上圖所示。
因此,關於Dependency的真正實現有三個,2個窄依賴:OneToOneDependency與RangeDependency,一個寬依賴:ShuffleDependency。
(一)Dependency
Dependency是一個抽象類,所有的依賴相關的類都必須繼承自它。Dependency只有一個成員變量,表示的是父RDD。
/** * :: DeveloperApi :: * Base class for dependencies. */ @DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] }
(一)窄依賴
1、NarrowDependency
看看代碼中對NarrowDependency的說明:
Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。
即窄依賴的定義應該是子RDD的每一個分區都依賴於父RDD的一個或者少量幾個分區(不依賴於全部分區)。
/** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }
getParents根據子RDD的分區ID返回父RDD的分區ID。
主構建函數中的rdd是父RDD,下同。
2、OneToOneDependency
一對一依賴,即每個子RDD的分區的與父RDD的分區一一對應。
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) }
重寫了NarrowDependency的getParents方法,返回一個List,這個List只有一個元素,且與子RDD的分區ID相同。即子分區的ID與父分區的ID一一對應且相等。
3、RangeDependency
子RDD中的每個分區依賴於父RDD的幾個分區,而父RDD的每個分區僅補一個子RDD分區所依賴,即多對一的關系。它僅僅被UnionRDD所使用。
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
(二)寬依賴
寬依賴只有一種:shuffleDependency,即子RDD依賴於父RDD的所有分區,父RDD的分每個區被所有子RDD的分區所依賴。
/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
(三)stage的划分
DAG根據寬依賴來划分stage,每個寬依賴的處理均會是一個stage的划分點。同一個stage中的多個操作會在一個task中完成。因為子RDD的分區僅依賴於父RDD的一個分區,因此這些步驟可以串行執行。
