本文介紹一下rdd的基本屬性概念、rdd的轉換/行動操作、rdd的寬/窄依賴。
RDD:Resilient Distributed Dataset 彈性分布式數據集,是Spark中的基本抽象。
RDD表示可以並行操作的元素的不變分區集合。
RDD提供了許多基本的函數(map、filter、reduce等)供我們進行數據處理。
RDD概述
通常來說,每個RDD有5個主要的屬性組成:
-
分區列表
RDD是由多個分區組成的,分區是邏輯上的概念。RDD的計算是以分區為單位進行的。
-
用於計算每個分區的函數
作用於每個分區數據的計算函數。
-
對其他RDD的依賴關系列表
RDD中保存了對於父RDD的依賴,根據依賴關系組成了Spark的DAG(有向無環圖),實現了spark巧妙、容錯的編程模型
-
針對鍵值型RDD的分區器
分區器針對鍵值型RDD而言的,將key傳入分區器獲取唯一的分區id。在shuffle中,分區器有很重要的體現。
-
對每個分區進行計算的首選位置列表
根據數據本地性的特性,獲取計算的首選位置列表,盡可能的把計算分配到靠近數據的位置,減少數據的網絡傳輸。
RDD的內部代碼
先看看基本概念的代碼:
//創建此RDD的SparkContext
def sparkContext: SparkContext = sc
// 唯一的id
val id: Int = sc.newRddId()
// rdd友善的名字
@transient var name: String = _
// 分區器
val partitioner: Option[Partitioner] = None
// 獲取依賴列表
// dependencies和partitions中都用到了checkpointRDD,如果進行了checkpoint,checkpointRDD表示進行checkpoint后的rdd
final def dependencies: Seq[Dependency[_]] = {
// 一對一的窄依賴
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}
// 獲取分區列表
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
partitions_.zipWithIndex.foreach { case (partition, index) =>
require(partition.index == index,
s"partitions($index).partition == ${partition.index}, but it should equal $index")
}
}
partitions_
}
}
// 獲取分區的首選位置
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
// 對應到每個分區的計算函數
def compute(split: Partition, context: TaskContext): Iterator[T]
主要就是圍繞上面5個重要屬性的一些操作
常用的函數/算子
// 返回僅包含滿足過濾條件的元素的新RDD。
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
// 通過將函數應用於此RDD的所有元素來返回新的RDD。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
// 首先向該RDD的所有元素應用函數,然后將結果展平,以返回新的RDD。
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
我們可以發現幾乎每個算子都會以當前RDD和對應的計算函數創建新的RDD,每個子RDD都持有父RDD的引用。
這就印證了RDD的不變性,也表明了RDD的計算是通過對RDD進行轉換實現的。
案例
val words = Seq("hello spark", "hello scala", "hello java")
val rdd = sc.makeRDD(words)
rdd
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println(_))
上面是一個簡單的RDD的操作,我們先調用makeRDD創建了一個RDD,之后對rdd進行一頓算子調用。
首先調用flatMap,flatMap內部會以當前rdd和我們傳入的_.split(" ")構建新的MapPartitionsRDD;
之后map,map以上步生成的MapPartitionsRDD和我們傳入的(_, 1)構造新的MapPartitionsRDD;
之后reduceByKey,reduceByKey構造新的RDD;
走到foreach,foreach是行動操作,觸發計算,輸出。
小總結
- RDD內部的計算除action算子以外,其他算子都是懶執行,不會觸發計算,只是進行RDD的轉換。
- RDD的計算是基於分區為單位計算的,我們傳進去的函數,作用於分區進行計算
轉換、行動算子
從上面知道RDD是懶執行的,只有遇到行動算子才執行計算。
轉換操作:在內部對根據父RDD創建新的RDD,不執行計算
行動操作:內部會調用sc.runJob,提交作業、划分階段、執行作業。
一些常見的行動操作
foreach、foreachPartition、collect、reduce、count
除行動操作外,都是轉換操作
寬、窄依賴
寬窄依賴是shuffle和划分調度的重要依據。
先看看spark中與依賴有關的幾個類(一層一層繼承關系):
Dependency依賴的頂級父類
NarrowDependency 窄依賴
OneToOneDependency 表示父RDD和子RDD分區之間的一對一依賴關系的窄依賴
RangeDependency 表示父RDD和子RDD中分區范圍之間的一對一依賴關系的窄依賴
ShuffleDependency 寬依賴
先說寬窄依賴的概念:
窄依賴:父RDD的每個分區只被一個子RDD分區使用
寬依賴:父RDD的每個分區都有可能被多個子RDD分區使用
其實就是父RDD的一個分區會被傳到幾個子RDD分區的區別。如果被傳到一個子RDD分區,就可以不需要移動數據(移動計算);如果被傳到多個子RDD分區,就需要進行數據的傳輸。
接下來看看Dependency內部的一些屬性及方法:
// 依賴對應的rdd,其實就是當前rdd的父rdd。寬依賴和窄依賴都有這個屬性
def rdd: RDD[T]
// 獲取子分區對應的父分區(窄依賴的方法)
def getParents(partitionId: Int): Seq[Int]
// 以下是寬依賴的屬性及方法
// 對應鍵值RDD的分區器
val partitioner: Partitioner
// 在數據傳輸時的序列化方法
val serializer: Serializer = SparkEnv.get.serializer
// 鍵的排序方式
val keyOrdering: Option[Ordering[K]] = None
// 一組用於聚合數據的功能
val aggregator: Option[Aggregator[K, V, C]] = None
// 是否需要map端預聚合
val mapSideCombine: Boolean = false
// 當前寬依賴的id
val shuffleId: Int = _rdd.context.newShuffleId()
// 向管理員注冊一個shuffle,並獲取一個句柄,以將其傳遞給任務
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
一些常見的寬窄依賴
窄依賴:map、filter、union、mapPartitions、join(當分區器是HashPartitioner)
寬依賴:sortByKey、join(分區器不是HashPartitioner時)
最后說一下reduceByKey,順便說一下為什么當分區器HashPartitioner時就是窄依賴。
reduceByKey是用來將key分組后,執行我們傳入的函數。
它是窄依賴,它內部默認會使用HashPartitioner分區。
同一個key進去HashPartitioner得到的分區id是一樣的,這樣進行計算前后同一個key得到的分區都一樣,父RDD的分區就只被子RDD的一個分區依賴,就不需要移動數據。
所以join、reduceByKey在分區器是HashPartitioner時是窄依賴。
end. 個人理解,如有偏差,歡迎交流指正。
Reference
- 《圖解Spark核心技術與案例實戰》
- 寬窄依賴:https://www.jianshu.com/p/5c2301dfa360
扶我起來,我還能學。

個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。
