關於Spark中RDD的設計的一些分析


RDD, Resilient Distributed Dataset,彈性分布式數據集, 是Spark的核心概念。

對於RDD的原理性的知識,可以參閱Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster ComputingAn Architecture for Fast and General Data Processing on Large Clusters 這兩篇論文。

這篇文章用來記錄一部分Spark對RDD實現的細節。

首先翻譯一下RDD這個虛類的注釋

 RDD是一個分布式彈性數據集, RDD是Spark的基本抽象,代表了一個不可變的、分區的、可以用於並行計算的數據集。這個類包括了所有RDD共有的
基本操作,比如map, filter, persist。另外

  • org.apache.spark.rdd.PairRDDFunctions包括了只能用於key-value對類型的RDD的操作,
    比如groupByKey和join。
  • org.apache.spark.rdd.DoubleRDDFunctions包括了只能用於Double類型RDD的操作,
  • org.apache.spark.rdd.SequenceFileRDDFunctions包括了能被保存為SequenceFile的RDD支持的操作。
    通過隱式轉換,只要RDD的類型正確,相關的操作就自動可用。

在內部,每個RDD都由五個主要屬性來表征:

  • 分區表(A list of partitions)
  • 一個用於計算每個split的函數
  • 對其它RDD的依賴
  • 可選: 用於鍵值對類型的RDD使用的Partitioner
  • 可選: 計算每個split時優先使用的location(+ 數據本地化, preferred locations) (比如一個HDFS文件的block的位置)。

Spark里所有的調度和執行都是依據這些方法,以此來允許每個RDD實現自己的方式來計算自己。用戶可以覆蓋這些方法來實現自己的RDD(比如,從一個新的存儲系統中讀取數據)。
新參考Spark paper來查看關於RDD內部機制的更多細節。     

       RDD的5個主要屬性對應的代碼主要為:

    • 分區   protected def getPartitions: Array[Partition] 以及 final def partitions: Array[Partition]
    • 計算每個partition   def compute(split: Partition, context: TaskContext): Iterator[T]
    • 對其它RDD的依賴   構造函數中的 deps: Seq[Dependency[_]] 以及 protected def getDependencies: Seq[Dependency[_]] = deps 以及 final def dependencies: Seq[Dependency[T]]
    • kv類型RDD的partitioner   @transient val partitioner: Option[Partitioner] = None
    • preferred location   protected def getPreferredLoations(split: Partition): Seeq[String] = Nil 以及 final def preferredLocations(split: Partition): Seq[String]

其中的這些final方法: partitions, dependencies, preferedLocations都是考慮了checkpoint的結果。可見,checkpoint機制會對這些屬性有所改變。     


 

以下是對於這個注釋的內容的思考:

1. RDD把定語去掉了,就是數據集;但是Spark作為一個分布式計算的框架,“數據集的轉換”與“數據集”都是不可缺少的。Spark並沒有把transformation這個概念抽象成一個基類,在我們寫rdd.filter(func1).map(func2)這樣的語句的時候,得到的最終結果是一個RDD,而scheduler使用的也只是這個RDD,因此,func1和func2這樣的轉換操作,作為一種元信息,肯定被RDD記錄,作為RDD的屬性。具體的講,轉換操作的信息會被記錄在RDD的第二個屬性“一個用於計算每個split的函數”中。所以,RDD不僅是彈性分布式數據集,也包括了數據集之間進行轉換所需要的函數。

2. RDD的第三個屬性“對其它RDD的依賴”,提供了以下信息:

    a. 對這個RDD的父RDD的引用

    b. 這個RDD的每個partition跟父RDD的partition的映射關系。

    假設有RDD X和RDD YX可以轉換為Y, 即 X -> Y。這是一個鏈式的構造,要獲得Y,需要X->->即是轉換操作,被記錄於第二個屬性,那么X在何處呢?X即是Dependency, 是RDD的第三個屬性。也就是說第二和第三個屬性,使得RDD成為一個鏈式結構, X -> Y -> Z,知道Z,就可以上溯到作為源頭的X,就能從X計算出Z來。這個就是為什么我們在最后一個RDD上調用action, Spark就可以開始執行,而不再需要提供其它的RDD。

 

下面看一下Spark對於以上兩點具體的實現。

 轉換邏輯的存儲

以常用的map操作為例  X -> Y, -> 在這里就是map。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

作為map參數的f和map的語義一起指明了從當前RDD到MapPartitionsRDD轉換的邏輯。而這個邏輯,作為參數被傳遞給MapPartitionsRDD,即 (context, pid, iter) => iter.map(cleanF))。下面看一下MapPartitionsRDD是如何儲存這個邏輯的。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

注意它的compute方法,首先,它調用了f, f就是我們在RDD的map方法中傳給MapPartitionsRDD構造器的函數。也就是說MapPartitionsRDD儲存了從父RDD轉換的邏輯, 即 ->

另外,注意compute方法中的 firstParent[T].iterator(split, context)。firstParent即是在map函數中傳進來的this, 也就是MapPartitionsRDD的父RDD, 即X

->X就這樣被儲存在了Y, 即MapPartitionsRDD中。

關於Iterator

當compute方法被調用時,實際上會調用firstParent.iterator.map(cleanF)。那么此時,父RDD的迭代器會進行迭代和map計算嗎?

答案是否,而且,可以看出Spark的RDD間的轉換和Scala的迭代器間的轉換是類似的,它們都可以認為是惰性的,即在x -> y中,儲存了x和->,只有在需要計算時才會計算。

 下面是scala.collection.Iterator的map方法的代碼

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

在這里被返回的Iterator相當於y, 而調用map的Iterator相當於x。y持有對x的引用"self", 也持有轉換的函數f,這就使得x -> y的鏈是完備的,因此Iterator上的map, filter等操作也構成了一個鏈式結構。

由於Iterator的這種特性,使得RDD的計算過程構成一個由函數組成的管道,在不對中間RDD進行persist的操作時,初始RDD的每個元素經過所有轉換函數的處理后,再開始處理第二個元素;而不是所有元素都經過第一個函數處理后,形成一個數據集,這個數據集再進行轉換。

比如,有三個RDD, X -> Y -> Z,都是使用的map進行轉換,所使用的函數依次為f和g。

那么Z的compute方法的調用過程就成為了X.iterator.map(f).map(g)。

依據Iterator的特點, Z的迭代器的hasNext方法會返回X.iterator.hasNext.hasNext, Z的迭代器的next方法會返回g(f(X.iterator.next))。

因此,在一系列轉過程中的中間的RDD如果沒有被persist, 是不會作為一個數據集存在的。

另外,需要注意

trait Iterator[+A] extends TraversableOnce[A]

注意這個TraversableOnce的含義。所以,在自己實現RDD時,需要確保compute方法被調用時,它所使用的父RDD的迭代器沒有在其它地方被使用過,不然一個已經被迭代過的迭代器再次被使用時,可能不會返回所有元素,或者干脆就不能繼續迭代了(俺就曾經在compute里加了條日志,記了下iteartor.size(), 就悲劇了)。

父子關系的存儲

先看下RDD的主構造器

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD的這個構造器展示了Dependency對於RDD定義的重要作用。 Dependency包含了這個RDD對其父RDD的依賴,這個依賴不僅包括其父RDD是什么,還包括子RDD的分區和父RDD的
分區之間的對應關系。

需要注意到,deps是一個Seq, 這說明單個的Dependency可能不足以描述父子RDD之間的依賴關系,得通過一系列的Dependency才能描述此關系。結合Dependency的定義,每個
Dependency只包含了一個父RDD的信息,但是一個RDD可能依賴多個RDD,所以這里用Seq[Dependency[_]]是有必要的 。

 

 如果使用

class NarrowDependency[T](parent: RDD[T], deps: List[List[Int]]) extends Dependency[T]{
  override def _rdd: RDD[T] = parent

  def getDependency(partition: Int) = deps(partition)
}

這種定義。在Dependency中提供子RDD的每個分區所依賴的父RDD的分區,那么NarrowDependency和ShuffleDependency就都可以用這一種方式來定義。
但是,Spark中卻把NarrowDependency和ShuffleDependency分開定義,是為了區分什么呢?

  • 或許是在NarrowDependency的定義中是定義的每個父RDD的分區被哪一個子RDD的分區依賴。
  • 或許是在ShuffleDependency中不僅要提供子RDD的每個分區的依賴,還要提供父RDD的每個分區被哪些子RDD的分區依賴,這樣進行shuffle時,才好由父RDD
    的分區計算出對於不同子RDD分區的數據。

let us see see.

ShuffleDependency

之所以不像俺想的那樣,是因為ShuffleDependency包括了與shuffle有關的更多的信息,這些信息包括:

  1. partitioner 決定父RDD的每個record進入哪個子RDD分區。同時,它包含了reduce的個數的信息。
  2. aggeragator 可選,對value進行聚合
  3. mapSideCombine 是否要在map側調用aggeragator,這是一個布爾類型值
  4. keyOrdering 可選,決定key的順序,用來對key排序。
  5. serializer ?可選,或許是用來對key-value做序列化的,現在不能確定

以上是構造函數里的信息,此外ShuffleDependency的方法也提供了一些信息:
* shuffleId 還不確定有什么用
* shuffleHandle 提供與shuffle有關的信息。目前只看到它的一個實現: BaseShuffleHandler,構造器為(shuffleId, numTasks,
Dependency:[ShuffleDependency]) 不確定其具體作用

這些信息被shuffle過程使用,具體怎么用,得看shuffle的實現。

 

NarrowDependency

而NarrowDependency包括的情況更少,因為如果用List[List[Int]]來表示NarrowDependency的話,會把NarrowDependency的范圍括大,比如多對多的關系也能用這種形式來表示。
Spark的實現里,NarrowDependency是個abstract class ,由不同的子類來應對具體的NarrowDependency的情況,每種情況用不同的方法來表示窄依賴。在NarrowDependency同
一個文件里,有兩種NarrowDepdency的子類。在其它的RDD實現中,還有會其它的NarrowDependency,比如CoalescedRDD在一個匿名內部類里實現了自己的NarrowDependency。

  • OneToOneDependency 這種情況父RDD的分區跟子RDD的分區是一致的,每個子RDD分區依賴於同樣索引號的父RDD的分區
  • RangeDependency 子RDD的一個分區依賴於父RDD的某個連續的分區段,比如0-3, 4-5這種。

其實現為:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

可見,父RDD的index為partitionId的分區被同樣index的子RDD的分區依賴,父子RDD的分區是一對一的關系

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

它述描了子RDD的一些分區對父RDD的一些分區依賴關系,在父子RDD對應的分區間是OneToOne的關系,但這種關系只對父子RDD的一個區間有效。比如,
子RDD從index為2開始的分區,以OneToOne的關系依賴於父RDD從index為8開始的分區,這種依賴關系對於連續的3個分區有效,即(子2依賴父8), (子3依賴父9),
(子4依賴父10)

在UnionRDD中會使用RangeDependency     

總結:

RDD儲存了DAG Scheduler進行調度所需的信息(比如可以在RDD鏈中尋找ShuffleDependency來划分Stage),也儲存了生成目標RDD所需要的計算邏輯。也就是說RDD對於Spark這個框架,在某種程度上相當於元數據。可以看到,在driver往executor發送的作為task的字節數組中就包括了RDD。

在ShuffleMapTask中,反序列化后的taskBinary為:

    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  //返回結果是(RDD, ShuffleDependency)
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

在ResultTask中,反序列化后的taskBinary為:

    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

可以看到,RDD始終是作為計算邏輯的主要攜帶者被傳給executor。

而RDD能做到這些,就是因為它儲存了所需的信息在自己的定義中, 前邊分析了一部分其實現的細節。RDD這個類的實現有很長很長的代碼,也有更多有意思的細節需要進一步看一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM