RDD, Resilient Distributed Dataset,彈性分布式數據集, 是Spark的核心概念。
對於RDD的原理性的知識,可以參閱Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 和 An Architecture for Fast and General Data Processing on Large Clusters 這兩篇論文。
這篇文章用來記錄一部分Spark對RDD實現的細節。
首先翻譯一下RDD這個虛類的注釋
RDD是一個分布式彈性數據集, RDD是Spark的基本抽象,代表了一個不可變的、分區的、可以用於並行計算的數據集。這個類包括了所有RDD共有的
基本操作,比如map, filter, persist。另外
- org.apache.spark.rdd.PairRDDFunctions包括了只能用於key-value對類型的RDD的操作,
比如groupByKey和join。- org.apache.spark.rdd.DoubleRDDFunctions包括了只能用於Double類型RDD的操作,
- org.apache.spark.rdd.SequenceFileRDDFunctions包括了能被保存為SequenceFile的RDD支持的操作。
通過隱式轉換,只要RDD的類型正確,相關的操作就自動可用。在內部,每個RDD都由五個主要屬性來表征:
- 分區表(A list of partitions)
- 一個用於計算每個split的函數
- 對其它RDD的依賴
- 可選: 用於鍵值對類型的RDD使用的Partitioner
- 可選: 計算每個split時優先使用的location(+ 數據本地化, preferred locations) (比如一個HDFS文件的block的位置)。
Spark里所有的調度和執行都是依據這些方法,以此來允許每個RDD實現自己的方式來計算自己。用戶可以覆蓋這些方法來實現自己的RDD(比如,從一個新的存儲系統中讀取數據)。
新參考Spark paper來查看關於RDD內部機制的更多細節。
RDD的5個主要屬性對應的代碼主要為:
-
- 分區
protected def getPartitions: Array[Partition]
以及final def partitions: Array[Partition]
- 計算每個partition
def compute(split: Partition, context: TaskContext): Iterator[T]
- 對其它RDD的依賴 構造函數中的
deps: Seq[Dependency[_]]
以及protected def getDependencies: Seq[Dependency[_]] = deps
以及final def dependencies: Seq[Dependency[T]]
- kv類型RDD的partitioner
@transient val partitioner: Option[Partitioner] = None
- preferred location
protected def getPreferredLoations(split: Partition): Seeq[String] = Nil
以及final def preferredLocations(split: Partition): Seq[String]
- 分區
其中的這些final方法: partitions, dependencies, preferedLocations都是考慮了checkpoint的結果。可見,checkpoint機制會對這些屬性有所改變。
以下是對於這個注釋的內容的思考:
1. RDD把定語去掉了,就是數據集;但是Spark作為一個分布式計算的框架,“數據集的轉換”與“數據集”都是不可缺少的。Spark並沒有把transformation這個概念抽象成一個基類,在我們寫rdd.filter(func1).map(func2)這樣的語句的時候,得到的最終結果是一個RDD,而scheduler使用的也只是這個RDD,因此,func1和func2這樣的轉換操作,作為一種元信息,肯定被RDD記錄,作為RDD的屬性。具體的講,轉換操作的信息會被記錄在RDD的第二個屬性“一個用於計算每個split的函數”中。所以,RDD不僅是彈性分布式數據集,也包括了數據集之間進行轉換所需要的函數。
2. RDD的第三個屬性“對其它RDD的依賴”,提供了以下信息:
a. 對這個RDD的父RDD的引用
b. 這個RDD的每個partition跟父RDD的partition的映射關系。
假設有RDD X和RDD Y, X可以轉換為Y, 即 X -> Y。這是一個鏈式的構造,要獲得Y,需要X和->。 ->即是轉換操作,被記錄於第二個屬性,那么X在何處呢?X即是Dependency, 是RDD的第三個屬性。也就是說第二和第三個屬性,使得RDD成為一個鏈式結構, X -> Y -> Z,知道Z,就可以上溯到作為源頭的X,就能從X計算出Z來。這個就是為什么我們在最后一個RDD上調用action, Spark就可以開始執行,而不再需要提供其它的RDD。
下面看一下Spark對於以上兩點具體的實現。
轉換邏輯的存儲
以常用的map操作為例 X -> Y, -> 在這里就是map。
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
作為map參數的f和map的語義一起指明了從當前RDD到MapPartitionsRDD轉換的邏輯。而這個邏輯,作為參數被傳遞給MapPartitionsRDD,即 (context, pid, iter) => iter.map(cleanF))。下面看一下MapPartitionsRDD是如何儲存這個邏輯的。
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) }
注意它的compute方法,首先,它調用了f, f就是我們在RDD的map方法中傳給MapPartitionsRDD構造器的函數。也就是說MapPartitionsRDD儲存了從父RDD轉換的邏輯, 即 ->
另外,注意compute方法中的 firstParent[T].iterator(split, context)。firstParent即是在map函數中傳進來的this, 也就是MapPartitionsRDD的父RDD, 即X。
-> 和 X就這樣被儲存在了Y, 即MapPartitionsRDD中。
關於Iterator
當compute方法被調用時,實際上會調用firstParent.iterator.map(cleanF)。那么此時,父RDD的迭代器會進行迭代和map計算嗎?
答案是否,而且,可以看出Spark的RDD間的轉換和Scala的迭代器間的轉換是類似的,它們都可以認為是惰性的,即在x -> y中,儲存了x和->,只有在需要計算時才會計算。
下面是scala.collection.Iterator的map方法的代碼
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next()) }
在這里被返回的Iterator相當於y, 而調用map的Iterator相當於x。y持有對x的引用"self", 也持有轉換的函數f,這就使得x -> y的鏈是完備的,因此Iterator上的map, filter等操作也構成了一個鏈式結構。
由於Iterator的這種特性,使得RDD的計算過程構成一個由函數組成的管道,在不對中間RDD進行persist的操作時,初始RDD的每個元素經過所有轉換函數的處理后,再開始處理第二個元素;而不是所有元素都經過第一個函數處理后,形成一個數據集,這個數據集再進行轉換。
比如,有三個RDD, X -> Y -> Z,都是使用的map進行轉換,所使用的函數依次為f和g。
那么Z的compute方法的調用過程就成為了X.iterator.map(f).map(g)。
依據Iterator的特點, Z的迭代器的hasNext方法會返回X.iterator.hasNext.hasNext, Z的迭代器的next方法會返回g(f(X.iterator.next))。
因此,在一系列轉過程中的中間的RDD如果沒有被persist, 是不會作為一個數據集存在的。
另外,需要注意
trait Iterator[+A] extends TraversableOnce[A]
注意這個TraversableOnce的含義。所以,在自己實現RDD時,需要確保compute方法被調用時,它所使用的父RDD的迭代器沒有在其它地方被使用過,不然一個已經被迭代過的迭代器再次被使用時,可能不會返回所有元素,或者干脆就不能繼續迭代了(俺就曾經在compute里加了條日志,記了下iteartor.size(), 就悲劇了)。
父子關系的存儲
先看下RDD的主構造器
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
RDD的這個構造器展示了Dependency對於RDD定義的重要作用。 Dependency包含了這個RDD對其父RDD的依賴,這個依賴不僅包括其父RDD是什么,還包括子RDD的分區和父RDD的
分區之間的對應關系。
需要注意到,deps是一個Seq, 這說明單個的Dependency可能不足以描述父子RDD之間的依賴關系,得通過一系列的Dependency才能描述此關系。結合Dependency的定義,每個
Dependency只包含了一個父RDD的信息,但是一個RDD可能依賴多個RDD,所以這里用Seq[Dependency[_]]
是有必要的 。
如果使用
class NarrowDependency[T](parent: RDD[T], deps: List[List[Int]]) extends Dependency[T]{
override def _rdd: RDD[T] = parent
def getDependency(partition: Int) = deps(partition)
}
這種定義。在Dependency中提供子RDD的每個分區所依賴的父RDD的分區,那么NarrowDependency和ShuffleDependency就都可以用這一種方式來定義。
但是,Spark中卻把NarrowDependency和ShuffleDependency分開定義,是為了區分什么呢?
- 或許是在NarrowDependency的定義中是定義的每個父RDD的分區被哪一個子RDD的分區依賴。
- 或許是在ShuffleDependency中不僅要提供子RDD的每個分區的依賴,還要提供父RDD的每個分區被哪些子RDD的分區依賴,這樣進行shuffle時,才好由父RDD
的分區計算出對於不同子RDD分區的數據。
let us see see.
ShuffleDependency
之所以不像俺想的那樣,是因為ShuffleDependency包括了與shuffle有關的更多的信息,這些信息包括:
- partitioner 決定父RDD的每個record進入哪個子RDD分區。同時,它包含了reduce的個數的信息。
- aggeragator 可選,對value進行聚合
- mapSideCombine 是否要在map側調用aggeragator,這是一個布爾類型值
- keyOrdering 可選,決定key的順序,用來對key排序。
- serializer ?可選,或許是用來對key-value做序列化的,現在不能確定
以上是構造函數里的信息,此外ShuffleDependency的方法也提供了一些信息:
* shuffleId 還不確定有什么用
* shuffleHandle 提供與shuffle有關的信息。目前只看到它的一個實現: BaseShuffleHandler,構造器為(shuffleId, numTasks,
Dependency:[ShuffleDependency]) 不確定其具體作用
這些信息被shuffle過程使用,具體怎么用,得看shuffle的實現。
NarrowDependency
而NarrowDependency包括的情況更少,因為如果用List[List[Int]]來表示NarrowDependency的話,會把NarrowDependency的范圍括大,比如多對多的關系也能用這種形式來表示。
Spark的實現里,NarrowDependency是個abstract class ,由不同的子類來應對具體的NarrowDependency的情況,每種情況用不同的方法來表示窄依賴。在NarrowDependency同
一個文件里,有兩種NarrowDepdency的子類。在其它的RDD實現中,還有會其它的NarrowDependency,比如CoalescedRDD在一個匿名內部類里實現了自己的NarrowDependency。
- OneToOneDependency 這種情況父RDD的分區跟子RDD的分區是一致的,每個子RDD分區依賴於同樣索引號的父RDD的分區
- RangeDependency 子RDD的一個分區依賴於父RDD的某個連續的分區段,比如0-3, 4-5這種。
其實現為:
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
可見,父RDD的index為partitionId的分區被同樣index的子RDD的分區依賴,父子RDD的分區是一對一的關系
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
它述描了子RDD的一些分區對父RDD的一些分區依賴關系,在父子RDD對應的分區間是OneToOne的關系,但這種關系只對父子RDD的一個區間有效。比如,
子RDD從index為2開始的分區,以OneToOne的關系依賴於父RDD從index為8開始的分區,這種依賴關系對於連續的3個分區有效,即(子2依賴父8), (子3依賴父9),
(子4依賴父10)
在UnionRDD中會使用RangeDependency
總結:
RDD儲存了DAG Scheduler進行調度所需的信息(比如可以在RDD鏈中尋找ShuffleDependency來划分Stage),也儲存了生成目標RDD所需要的計算邏輯。也就是說RDD對於Spark這個框架,在某種程度上相當於元數據。可以看到,在driver往executor發送的作為task的字節數組中就包括了RDD。
在ShuffleMapTask中,反序列化后的taskBinary為:
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( //返回結果是(RDD, ShuffleDependency) ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
在ResultTask中,反序列化后的taskBinary為:
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
可以看到,RDD始終是作為計算邏輯的主要攜帶者被傳給executor。
而RDD能做到這些,就是因為它儲存了所需的信息在自己的定義中, 前邊分析了一部分其實現的細節。RDD這個類的實現有很長很長的代碼,也有更多有意思的細節需要進一步看一下。