RDD是分布式內存的一個抽象概念,是一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,能橫跨集群所有節點並行計算,是一種基於工作集的應用抽象。
RDD底層存儲原理:其數據分布存儲於多台機器上,事實上,每個RDD的數據都以Block的形式存儲於多台機器上,每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。
BlockManager管理RDD的物理分區,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤上。而RDD中的Partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當於數據的一個元數據結構,存儲着數據分區及其邏輯結構映射關系,存儲着RDD之前的依賴轉換關系。
BlockManager在每個節點上運行管理Block(Driver和Executors),它提供一個接口檢索本地和遠程的存儲變量,如memory、disk、off-heap。使用BlockManager前必須先初始化。BlockManager.scala的部分源碼如下所示:
private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging {
BlockManagerMaster會持有整個Application的Block的位置、Block所占用的存儲空間等元數據信息,在Spark的Driver的DAGScheduler中,就是通過這些信息來確認數據運行的本地性的。Spark支持重分區,數據通過Spark默認的或者用戶自定義的分區器決定數據塊分布在哪些節點。RDD的物理分區是由Block-Manager管理的,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤。而RDD中的partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當於數據的一個元數據結構(一個RDD就是一組分區),存儲着數據分區及Block、Node等的映射關系,以及其他元數據信息,存儲着RDD之前的依賴轉換關系。分區是一個邏輯概念,Transformation前后的新舊分區在物理上可能是同一塊內存存儲。
Spark通過讀取外部數據創建RDD,或通過其他RDD執行確定的轉換Transformation操作(如map、union和groubByKey)而創建,從而構成了線性依賴關系,或者說血統關系(Lineage),在數據分片丟失時可以從依賴關系中恢復自己獨立的數據分片,對其他數據分片或計算機沒有影響,基本沒有檢查點開銷,使得實現容錯的開銷很低,失效時只需要重新計算RDD分區,就可以在不同節點上並行執行,而不需要回滾(Roll Back)整個程序。落后任務(即運行很慢的節點)是通過任務備份,重新調用執行進行處理的。
因為RDD本身支持基於工作集的運用,所以可以使Spark的RDD持久化(persist)到內存中,在並行計算中高效重用。多個查詢時,我們就可以顯性地將工作集中的數據緩存到內存中,為后續查詢提供復用,這極大地提升了查詢的速度。在Spark中,一個RDD就是一個分布式對象集合,每個RDD可分為多個片(Partitions),而分片可以在集群環境的不同節點上計算。
RDD作為泛型的抽象的數據結構,支持兩種計算操作算子:Transformation(變換)與Action(行動)。且RDD的寫操作是粗粒度的,讀操作既可以是粗粒度的,也可以是細粒度的。RDD.scala的源碼如下:
/** * Internally, each RDD is characterized by five main properties: * 每個RDD都有5個主要特性 * - A list of partitions 分區列表 * - A function for computing each split 每個分區都有一個計算函數 * - A list of dependencies on other RDDs 依賴於其他RDD的列表 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 數據類型(key-value)的RDD分區器 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 每個分區都有一個分區位置列表 */ abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
其中,SparkContext是Spark功能的主要入口點,一個SparkContext代表一個集群連接,可以用其在集群中創建RDD、累加變量、廣播變量等,在每一個可用的JVM中只有一個SparkContext,在創建一個新的SparkContext之前,必須先停止該JVM中可用的SparkContext,這種限制可能最終會被修改。SparkContext被實例化時需要一個SparkConf對象去描述應用的配置信息,在這個配置對象中設置的信息,會覆蓋系統默認的配置。
RDD五大特性:
(1)分區列表(a list of partitions)。Spark RDD是被分區的,每一個分區都會被一個計算任務(Task)處理,分區數決定並行計算數量,RDD的並行度默認從父RDD傳給子RDD。默認情況下,一個HDFS上的數據分片就是一個Partition,RDD分片數決定了並行計算的力度,可以在創建RDD時指定RDD分片個數,如果不指定分區數量,當RDD從集合創建時,則默認分區數量為該程序所分配到的資源的CPU核數(每個Core可以承載2~4個Partition),如果是從HDFS文件創建,默認為文件的Block數。
(2)每一個分區都有一個計算函數(a function for computing each split)。每個分區都會有計算函數,Spark的RDD的計算函數是以分片為基本單位的,每個RDD都會實現compute函數,對具體的分片進行計算,RDD中的分片是並行的,所以是分布式並行計算。有一點非常重要,就是由於RDD有前后依賴關系,遇到寬依賴關系,例如,遇到reduceBykey等寬依賴操作的算子,Spark將根據寬依賴划分Stage,Stage內部通過Pipeline操作,通過Block Manager獲取相關的數據,因為具體的split要從外界讀數據,也要把具體的計算結果寫入外界,所以用了一個管理器,具體的split都會映射成BlockManager的Block,而具體split會被函數處理,函數處理的具體形式是以任務的形式進行的。
(3)依賴於其他RDD的列表(a list of dependencies on other RDDs)。RDD的依賴關系,由於RDD每次轉換都會生成新的RDD,所以RDD會形成類似流水線的前后依賴關系,當然,寬依賴就不類似於流水線了,寬依賴后面的RDD具體的數據分片會依賴前面所有的RDD的所有的數據分片,這時數據分片就不進行內存中的Pipeline,這時一般是跨機器的。因為有前后的依賴關系,所以當有分區數據丟失的時候,Spark會通過依賴關系重新計算,算出丟失的數據,而不是對RDD所有的分區進行重新計算。RDD之間的依賴有兩種:窄依賴(Narrow Dependency)、寬依賴(Wide Dependency)。RDD是Spark的核心數據結構,通過RDD的依賴關系形成調度關系。通過對RDD的操作形成整個Spark程序。
RDD有Narrow Dependency和Wide Dependency兩種不同類型的依賴,其中的Narrow Dependency指的是每一個parent RDD的Partition最多被child RDD的一個Partition所使用,而Wide Dependency指的是多個child RDD的Partition會依賴於同一個parent RDD的Partition。可以從兩個方面來理解RDD之間的依賴關系:一方面是該RDD的parent RDD是什么;另一方面是依賴於parent RDD的哪些Partitions;根據依賴於parent RDD的Partitions的不同情況,Spark將Dependency分為寬依賴和窄依賴兩種。Spark中寬依賴指的是生成的RDD的每一個partition都依賴於父RDD的所有partition,寬依賴典型的操作有groupByKey、sortByKey等,寬依賴意味着shuffle操作,這是Spark划分Stage邊界的依據,Spark中寬依賴支持兩種Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基於Hash的Shuffle機制,后者是基於排序的Shuffle機制。Spark 2.2現在的版本中已經沒有Hash Shuffle的方式。
(4)key-value數據類型的RDD分區器(-Optionally,a Partitioner for key-value RDDS),控制分區策略和分區數。每個key-value形式的RDD都有Partitioner屬性,它決定了RDD如何分區。當然,Partition的個數還決定每個Stage的Task個數。RDD的分片函數,想控制RDD的分片函數的時候可以分區(Partitioner)傳入相關的參數,如HashPartitioner、RangePartitioner,它本身針對key-value的形式,如果不是key-value的形式,它就不會有具體的Partitioner。Partitioner本身決定了下一步會產生多少並行的分片,同時,它本身也決定了當前並行(parallelize)Shuffle輸出的並行數據,從而使Spark具有能夠控制數據在不同節點上分區的特性,用戶可以自定義分區策略,如Hash分區等。Spark提供了“partitionBy”運算符,能通過集群對RDD進行數據再分配來創建一個新的RDD。
(5)每個分區都有一個優先位置列表(-Optionally,a list of preferred locations to compute each split on)。它會存儲每個Partition的優先位置,對於一個HDFS文件來說,就是每個Partition塊的位置。觀察運行spark集群的控制台會發現Spark的具體計算,具體分片前,它已經清楚地知道任務發生在什么節點上,也就是說,任務本身是計算層面的、代碼層面的,代碼發生運算之前已經知道它要運算的數據在什么地方,有具體節點的信息。這就符合大數據中數據不動代碼動的特點。數據不動代碼動的最高境界是數據就在當前節點的內存中。這時有可能是memory級別或Alluxio級別的,Spark本身在進行任務調度時候,會盡可能將任務分配到處理數據的數據塊所在的具體位置。據Spark的RDD.Scala源碼函數getPreferredLocations可知,每次計算都符合完美的數據本地性。
RDD類源碼文件中的4個方法和一個屬性對應上述闡述的RDD的5大特性。RDD.scala的源碼如下:
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. 通過子類實現給定分區的計算 */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T] /** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * 通過子類實現,返回一個RDD分區列表,這個方法只被調用一次,它是安全的執行一次耗時計算 * * 數組中的分區必須符合以下屬性設置 * The partitions in this array must satisfy the following property: * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }` */ protected def getPartitions: Array[Partition] /** * 返回對父RDD的依賴列表,這個方法僅只被調用一次,它是安全的執行一次耗時計算 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps /** * 可選的,指定優先位置,輸入參數是spilt分片,輸出結果是一組優先的節點位置 * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** * Optionally overridden by subclasses to specify how they are partitioned. * 可選的,通過子類實現,指定如何分區 */ @transient val partitioner: Option[Partitioner] = None
其中,TaskContext是讀取或改變執行任務的環境,用org.apache.spark.TaskContext.get()可返回當前可用的TaskContext,可以調用內部的函數訪問正在運行任務的環境信息。Partitioner是一個對象,定義了如何在key-Value類型的RDD元素中用Key分區,從0到numPartitions-1區間內映射每一個Key到Partition ID。Partition是一個RDD的分區標識符。Partition.scala的源碼如下。
/** * An identifier for a partition in an RDD. */ trait Partition extends Serializable { /** * Get the partition's index within its parent RDD */ def index: Int // A better default implementation of HashCode override def hashCode(): Int = index override def equals(other: Any): Boolean = super.equals(other) }