摘要:
1.RDD的五大屬性
1.1 partitions(分區)
1.2 partitioner(分區方法)
1.3 dependencies(依賴關系)
1.4 compute(獲取分區迭代列表)
1.5 preferedLocations(優先分配節點列表)
2.RDD實現類舉例
2.1 MapPartitionsRDD
2.2 ShuffledRDD
2.3 ReliableCheckpointRDD
3.RDD可以嵌套嗎?
內容:
1.RDD的五大屬性
1.1partitions(分區)
partitions : 分區屬性: 每個RDD包括多個分區, 這既是RDD的數據單位, 也是計算粒度, 每個分區是由一個Task線程處理. 在RDD創建的時候可以指定分區的個數, 如果沒有指定, 那么默認分區個數由參數spark.default.parallelism指定(如果未設置這個參數 ,則在yarn或者standalone模式下有如下推導:spark.default.parallelism = max(所有executor使用的core總數, 2)).
每一分區對應一個內存block, 由BlockManager分配.
子類可以通過調用下面的方法來獲取分區列表,當處於檢查點時,分區信息會被重寫
Partition實現:
partition 與 iterator 方法
RDD 的 iterator(split: Partition, context: TaskContext): Iterator[T] 方法用來獲取 split 指定的 Partition 對應的數據的迭代器,有了這個迭代器就能一條一條取出數據來按 compute chain 來執行一個個transform 操作。iterator 的實現如下:
其先判斷 RDD 的 storageLevel 是否為 NONE,若不是,則嘗試從緩存中讀取,讀取不到則通過計算來獲取該Partition對應的數據的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應數據的迭代器,若 checkpoint 不存在則通過計算(compute屬性)
1.2partitioner(分區方法)
RDD的分區方式, 這個屬性指的是RDD的partitioner函數(分片函數), 分區函數就是將數據分配到指定的分區, 這個目前實現了HashPartitioner和RangePartitioner, 只有key-value的RDD才會有分片函數, 否則為none. 分片函數不僅決定了當前分片的個數, 同時決定parent shuffle RDD的輸出的分區個數.
HashPartitioner如何決定一個鍵對應的分區的:
其中nonNegativeMod方法考慮到了key的符號,如果key是負數,就返回key%numPartitions +numPartitions(補數);
HashPartitioner是基於Object的hashcode來分區的,所以不應該對集合類型進行哈希分區
RangePartitioner如何決定一個鍵對應的分區的:
其中rangeBounds是各個分區的上邊界的Array。而rangeBounds的具體計算是通過抽樣進行估計的,具體代碼可以參照RangePartitioner 實現簡記
RangePartitioner是根據key值大小進行分區的,所以支持RDD的排序類算子
1.3dependencies(依賴關系)
Spark的運行過程就是RDD之間的轉換, 因此, 必須記錄RDD之間的生成關系(新RDD是由哪個或哪幾個父RDD生成), 這就是所謂的依賴關系, 這樣既有助於階段和任務的划分, 也有助於在某個分區出錯的時候, 只需要重新計算與當前出錯的分區有關的分區,而不需要計算所有的分區.
dependencies_是一個記錄Dependency關系的序列(Seq):
RDD是如何記錄依賴關系的:
、
依賴類型:
-
- 窄依賴:父 RDD 的 partition 至多被一個子 RDD partition 依賴(OneToOneDependency,RangeDependency)
- 寬依賴:父 RDD 的 partition 被多個子 RDD partitions 依賴(ShuffleDependency)
圖示:
窄依賴是一對一的關系,所以可以直接從父分區中獲取;寬依賴則不行。以下是寬依賴(實現是ShuffleDependency)的幾個重要屬性。更加具體的shuffle原理可以查看Spark Shuffle原理、Shuffle操作問題解決和參數調優
1.4compute(獲取分區迭代列表)
計算屬性: 當調用 RDD#iterator 方法無法從緩存或 checkpoint 中獲取指定 partition 的迭代器時,就需要調用 compute 方法來獲取
RDD不僅包含有數據, 還有在數據上的計算, 每個RDD以分區為計算粒度, 每個RDD會實現compute函數, compute函數會和迭代器(RDD之間轉換的迭代器)進行復合, 這樣就不需要保存每次compute運行的結果.
代碼:
下面舉幾個算子操作:
map
上面代碼中的 firstParent 是指本 RDD 的依賴 dependencies: Seq[Dependency[_]] 中的第一個,MapPartitionsRDD 的依賴中只有一個父 RDD。而 MapPartitionsRDD 的 partition 與其唯一的父 RDD partition 是一一對應的,所以其 compute 方法可以描述為:對父 RDD partition 中的每一個元素執行傳入 map (代碼中的f(context,split.index,iterator)函數)的方法得到自身的 partition 及迭代器
圖示:
groupByKey
與 map、union 不同,groupByKey 是一個會產生寬依賴(ShuffleDependency)的 transform,其最終生成的 RDD 是 ShuffledRDD,來看看其 compute 實現:
可以看到,ShuffledRDD 的 compute 使用 ShuffleManager 來獲取一個 reader,該 reader 將從本地或遠程 BlockManager 拉取 map output 的 file 數據
圖示:
1.5preferedLocations(優先分配節點列表)
對於分區而言返回數據本地化計算的節點列表
也就是說, 每個RDD會報出一個列表(Seq), 而這個列表保存着分片優先分配給哪個Worker節點計算, spark堅持移動計算而非移動數據的原則. 也就是盡量在存儲數據的節點上進行計算.
要注意的是,並不是每個 RDD 都有 preferedLocation,比如從 Scala 集合中創建的 RDD 就沒有,而從 HDFS 讀取的 RDD 就有
spark 本地化級別:PROCESS_LOCAL => NODE_LOCAL => NO_PREF => RACK_LOCAL => ANY
PROCESS_LOCAL 進程本地化:task要計算的數據在同一個Executor中
NODE_LOCAL 節點本地化:速度比 PROCESS_LOCAL 稍慢,因為數據需要在不同進程之間傳遞或從文件中讀取
NODE_PREF 沒有最佳位置這一說,數據從哪里訪問都一樣快,不需要位置優先。比如說SparkSQL讀取MySql中的數據
RACK_LOCAL 機架本地化,數據在同一機架的不同節點上。需要通過網絡傳輸數據及文件 IO,比 NODE_LOCAL 慢
ANY 跨機架,數據在非同一機架的網絡上,速度最慢
2.RDD實現類舉例
2.1 MapPartitionsRDD
2.2 ShuffledRDD
2.3 ReliableCheckpointRDD
ReliableCheckpointRDD將RDD寫入到HDFS中
checkpointRDD的功能就是切斷所有的之前RDD的依賴和迭代關系,所以compute方法只返回對應HDFS的文件反序列化流的一個迭代器就可以了
而且在ReliableCheckpointRDD中dependencies_屬性是空的,也就沒有實現getDependencies
另外多研究下RDD中的checkpoint方法:private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)
2.4 CoGroupedRDD (待補充)
3.RDD可以嵌套嗎?
RDD嵌套是不被支持的,也即不能在一個RDD操作的內部再使用RDD。