Spark RDD基本概念、寬窄依賴、轉換行為操作


本文介紹一下rdd的基本屬性概念、rdd的轉換/行動操作、rdd的寬/窄依賴。

RDD:Resilient Distributed Dataset 彈性分布式數據集,是Spark中的基本抽象。

RDD表示可以並行操作的元素的不變分區集合

RDD提供了許多基本的函數(map、filter、reduce等)供我們進行數據處理。

RDD概述

通常來說,每個RDD有5個主要的屬性組成:

  • 分區列表

    RDD是由多個分區組成的,分區是邏輯上的概念。RDD的計算是以分區為單位進行的。

  • 用於計算每個分區的函數

    作用於每個分區數據的計算函數。

  • 對其他RDD的依賴關系列表

    RDD中保存了對於父RDD的依賴,根據依賴關系組成了Spark的DAG(有向無環圖),實現了spark巧妙、容錯的編程模型

  • 針對鍵值型RDD的分區器

    分區器針對鍵值型RDD而言的,將key傳入分區器獲取唯一的分區id。在shuffle中,分區器有很重要的體現。

  • 對每個分區進行計算的首選位置列表

    根據數據本地性的特性,獲取計算的首選位置列表,盡可能的把計算分配到靠近數據的位置,減少數據的網絡傳輸。

RDD的內部代碼

先看看基本概念的代碼:
//創建此RDD的SparkContext
def sparkContext: SparkContext = sc
// 唯一的id
val id: Int = sc.newRddId()
// rdd友善的名字
@transient var name: String = _
// 分區器
val partitioner: Option[Partitioner] = None
// 獲取依賴列表
// dependencies和partitions中都用到了checkpointRDD,如果進行了checkpoint,checkpointRDD表示進行checkpoint后的rdd
final def dependencies: Seq[Dependency[_]] = {
    // 一對一的窄依賴
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
        if (dependencies_ == null) {
            dependencies_ = getDependencies
        }
        dependencies_
    }
}
// 獲取分區列表
final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
        if (partitions_ == null) {
            partitions_ = getPartitions
            partitions_.zipWithIndex.foreach { case (partition, index) =>
                require(partition.index == index,
                        s"partitions($index).partition == ${partition.index}, but it should equal $index")
            }
        }
        partitions_
    }
}
// 獲取分區的首選位置
final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
    }
}
// 對應到每個分區的計算函數
def compute(split: Partition, context: TaskContext): Iterator[T]

主要就是圍繞上面5個重要屬性的一些操作

常用的函數/算子
// 返回僅包含滿足過濾條件的元素的新RDD。
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
}
// 通過將函數應用於此RDD的所有元素來返回新的RDD。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
// 首先向該RDD的所有元素應用函數,然后將結果展平,以返回新的RDD。
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

我們可以發現幾乎每個算子都會以當前RDD和對應的計算函數創建新的RDD,每個子RDD都持有父RDD的引用。

這就印證了RDD的不變性,也表明了RDD的計算是通過對RDD進行轉換實現的。

案例

val words = Seq("hello spark", "hello scala", "hello java")
val rdd = sc.makeRDD(words)
rdd
    .flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_ + _)
    .foreach(println(_))

上面是一個簡單的RDD的操作,我們先調用makeRDD創建了一個RDD,之后對rdd進行一頓算子調用。

首先調用flatMap,flatMap內部會以當前rdd和我們傳入的_.split(" ")構建新的MapPartitionsRDD;

之后map,map以上步生成的MapPartitionsRDD和我們傳入的(_, 1)構造新的MapPartitionsRDD;

之后reduceByKey,reduceByKey構造新的RDD;

走到foreach,foreach是行動操作,觸發計算,輸出。

小總結

  • RDD內部的計算除action算子以外,其他算子都是懶執行,不會觸發計算,只是進行RDD的轉換。
  • RDD的計算是基於分區為單位計算的,我們傳進去的函數,作用於分區進行計算

轉換、行動算子

從上面知道RDD是懶執行的,只有遇到行動算子才執行計算。

轉換操作:在內部對根據父RDD創建新的RDD,不執行計算

行動操作:內部會調用sc.runJob,提交作業、划分階段、執行作業。

一些常見的行動操作

foreach、foreachPartition、collect、reduce、count

除行動操作外,都是轉換操作

寬、窄依賴

寬窄依賴是shuffle划分調度的重要依據。

先看看spark中與依賴有關的幾個類(一層一層繼承關系):

Dependency依賴的頂級父類
	NarrowDependency 窄依賴
		OneToOneDependency 表示父RDD和子RDD分區之間的一對一依賴關系的窄依賴
		RangeDependency 表示父RDD和子RDD中分區范圍之間的一對一依賴關系的窄依賴
	ShuffleDependency 寬依賴

先說寬窄依賴的概念:

窄依賴:父RDD的每個分區只被一個子RDD分區使用

寬依賴:父RDD的每個分區都有可能被多個子RDD分區使用

其實就是父RDD的一個分區會被傳到幾個子RDD分區的區別。如果被傳到一個子RDD分區,就可以不需要移動數據(移動計算);如果被傳到多個子RDD分區,就需要進行數據的傳輸。

接下來看看Dependency內部的一些屬性及方法:

// 依賴對應的rdd,其實就是當前rdd的父rdd。寬依賴和窄依賴都有這個屬性
def rdd: RDD[T]
// 獲取子分區對應的父分區(窄依賴的方法)
def getParents(partitionId: Int): Seq[Int]

// 以下是寬依賴的屬性及方法
// 對應鍵值RDD的分區器
val partitioner: Partitioner
// 在數據傳輸時的序列化方法
val serializer: Serializer = SparkEnv.get.serializer
// 鍵的排序方式
val keyOrdering: Option[Ordering[K]] = None
// 一組用於聚合數據的功能
val aggregator: Option[Aggregator[K, V, C]] = None
// 是否需要map端預聚合
val mapSideCombine: Boolean = false
// 當前寬依賴的id
val shuffleId: Int = _rdd.context.newShuffleId()
// 向管理員注冊一個shuffle,並獲取一個句柄,以將其傳遞給任務
val shuffleHandle: ShuffleHandle =  _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)

一些常見的寬窄依賴

窄依賴:map、filter、union、mapPartitions、join(當分區器是HashPartitioner)

寬依賴:sortByKey、join(分區器不是HashPartitioner時)


最后說一下reduceByKey,順便說一下為什么當分區器HashPartitioner時就是窄依賴。

reduceByKey是用來將key分組后,執行我們傳入的函數。

它是窄依賴,它內部默認會使用HashPartitioner分區。

同一個key進去HashPartitioner得到的分區id是一樣的,這樣進行計算前后同一個key得到的分區都一樣,父RDD的分區就只被子RDD的一個分區依賴,就不需要移動數據。

所以join、reduceByKey在分區器是HashPartitioner時是窄依賴。


end. 個人理解,如有偏差,歡迎交流指正。

Reference

扶我起來,我還能學。



個人公眾號:碼農峰,定時推送行業資訊,持續發布原創技術文章,歡迎大家關注。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM