1.RDD的官網定義
- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- Represents an immutable,partitioned collection of elements that can be operated on in parallel.
- 翻譯:
彈性分布式數據集(RDD),Spark中的基本抽象。表示不可變的,分區的可以並行操作的元素集合。
解釋:
- RDD是Resilient Distributed Dataset(彈性分布式數據集)的簡稱。RDD的彈性體現在計算方面,當Spark進行計算時,某一階段出現數據丟失或者故障,可以通過RDD的血緣關系就行修復。
1、內存的彈性:內存與磁盤的自動切換
2、容錯的彈性:數據丟失可以自動恢復
3、計算的彈性:計算出錯重試機制
4、分片的彈性:根據需要重新分片 - RDD是不可變(immutable)的,一旦創建就不可改變。RDDA-->RDDB,RDDA經過轉換操作變成RDDB,這兩個RDD具有血緣關系,但是是兩個不同的RDD,體現了RDD一旦創建就不可變的性質。
RDD源碼
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
...
}
解讀: 1)抽象類(abstract):不能直接使用,需要借助於子類實現,使用時直接使用其子類即可
2)序列化:在分布式計算框架里,序列化框架性能的好壞直接影響整個框架性能的優劣
3)logging:日志記錄,2.0版本后不自帶,需要自己寫一個
4)T:泛型 支持各種數據類型
5)sparkcontext
6)@transient
RDD的5大特點(扒源碼)
* - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file)
解釋
1)RDD由很多partition構成,在spark中,計算式,有多少partition就對應有多少個task來執行,一個分區在一台機器上,一個分區其實就是放在一台機器的內存上,
一台機器上可以有多個分區 2)A function for computing each split 對RDD做計算,相當於對RDD的每個split或partition做計算 3)A list of dependencies on other RDDs RDD之間有依賴關系,可溯源
eg:rdd1.map(_*10).flatMap(..).map(..).reduceByKey(...)
構建成為DAG,這個DAG會構造成很多個階段,這些階段叫做stage,RDDstage之間會有依賴關系,后面根據前面的依賴關系來構建,如果前面的數據丟了,
它會記住前面的依賴,從前面進行重新恢復。每一個算子都會產生新的RDD.
textFile 與flatMap會產生兩個RDD.
spark中的DAG就是rdd內部的轉換關系,這些轉換關系會被轉換成依賴關系,進而被划分成不同階段,從而描繪出任務的先后順序。
4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 如果RDD里面存的數據是key -value形式,則可以傳遞一個自定義的Partitioner進行重新分區,比如可以按key的hash值分區 5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)最優的位置去計算,也就是數據的本地性 計算每個split時,在split所在機器的本地上運行task是最好的,避免了數據的移動;split有多個副本,所以preferred location不止一個 數據在哪里,應優先把作業調度到數據所在機器上,減少數據的IO和網絡傳輸,這樣才能更好地減少作業運行時間
(木桶原理:作業運行時間取決於運行最慢的task所需的時間),提高性能,不過數據進行最后匯總的時候就要走網絡。(hdfs file的block塊)
RDD5大特性在源碼中的體現
特性1:getPartitions返回的必然是一系列Partition類型的數據組成的數組
/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * * The partitions in this array must satisfy the following property: * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` */ protected def getPartitions: Array[Partition]
特性2:compute函數的入參必然是partition,因為對RDD做計算相當於對每個partition做計算
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T]
特性3:RDD之間有依賴關系
/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps
- 特性4: Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil
- 特性5:Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)
/** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None