窄依賴與寬依賴&stage的划分依據


RDD根據對父RDD的依賴關系,可分為窄依賴與寬依賴2種。 
主要的區分之處在於父RDD的分區被多少個子RDD分區所依賴,如果一個就為窄依賴,多個則為寬依賴。更好的定義應該是: 
窄依賴的定義是子RDD的每一個分區都依賴於父RDD的一個或者少量幾個分區(不依賴於全部分區)

與依賴相關的以下5個類:

Dependency
<--NarrowDependency
    <--OneToOneDependency
    <--RangeDependency
<--ShuffleDependency

它們全部在同一個Scala文件中,Dependency是一個abstract class, NarrowDependency(abstract class)與ShuffleDependency直接繼承與它,OneToOneDependency與RangeDependency繼承自NarrowDependency,大致如上圖所示。

因此,關於Dependency的真正實現有三個,2個窄依賴:OneToOneDependency與RangeDependency,一個寬依賴:ShuffleDependency。

(一)Dependency

Dependency是一個抽象類,所有的依賴相關的類都必須繼承自它。Dependency只有一個成員變量,表示的是父RDD。

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

(一)窄依賴

1、NarrowDependency

看看代碼中對NarrowDependency的說明:

Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。 
即窄依賴的定義應該是子RDD的每一個分區都依賴於父RDD的一個或者少量幾個分區(不依賴於全部分區)。

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

getParents根據子RDD的分區ID返回父RDD的分區ID。

主構建函數中的rdd是父RDD,下同。

2、OneToOneDependency

一對一依賴,即每個子RDD的分區的與父RDD的分區一一對應。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

重寫了NarrowDependency的getParents方法,返回一個List,這個List只有一個元素,且與子RDD的分區ID相同。即子分區的ID與父分區的ID一一對應且相等。

3、RangeDependency

子RDD中的每個分區依賴於父RDD的幾個分區,而父RDD的每個分區僅補一個子RDD分區所依賴,即多對一的關系。它僅僅被UnionRDD所使用。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

(二)寬依賴

寬依賴只有一種:shuffleDependency,即子RDD依賴於父RDD的所有分區,父RDD的分每個區被所有子RDD的分區所依賴。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don't need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD's shuffles
 * @param aggregator map/reduce-side aggregator for RDD's shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It's possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

(三)stage的划分

DAG根據寬依賴來划分stage,每個寬依賴的處理均會是一個stage的划分點。同一個stage中的多個操作會在一個task中完成。因為子RDD的分區僅依賴於父RDD的一個分區,因此這些步驟可以串行執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM