Spark之RDD的定義及五大特性


  RDD是分布式內存的一個抽象概念,是一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,能橫跨集群所有節點並行計算,是一種基於工作集的應用抽象。

  RDD底層存儲原理:其數據分布存儲於多台機器上,事實上,每個RDD的數據都以Block的形式存儲於多台機器上,每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注冊該Block,BlockManagerMaster管理RDD與Block的關系,當RDD不再需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

  BlockManager管理RDD的物理分區,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤上。而RDD中的Partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當於數據的一個元數據結構,存儲着數據分區及其邏輯結構映射關系,存儲着RDD之前的依賴轉換關系。

  BlockManager在每個節點上運行管理Block(Driver和Executors),它提供一個接口檢索本地和遠程的存儲變量,如memory、disk、off-heap。使用BlockManager前必須先初始化。BlockManager.scala的部分源碼如下所示:

private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

  BlockManagerMaster會持有整個Application的Block的位置、Block所占用的存儲空間等元數據信息,在Spark的Driver的DAGScheduler中,就是通過這些信息來確認數據運行的本地性的。Spark支持重分區,數據通過Spark默認的或者用戶自定義的分區器決定數據塊分布在哪些節點。RDD的物理分區是由Block-Manager管理的,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤。而RDD中的partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當於數據的一個元數據結構(一個RDD就是一組分區),存儲着數據分區及Block、Node等的映射關系,以及其他元數據信息,存儲着RDD之前的依賴轉換關系。分區是一個邏輯概念,Transformation前后的新舊分區在物理上可能是同一塊內存存儲。  

  Spark通過讀取外部數據創建RDD,或通過其他RDD執行確定的轉換Transformation操作(如map、union和groubByKey)而創建,從而構成了線性依賴關系,或者說血統關系(Lineage),在數據分片丟失時可以從依賴關系中恢復自己獨立的數據分片,對其他數據分片或計算機沒有影響,基本沒有檢查點開銷,使得實現容錯的開銷很低,失效時只需要重新計算RDD分區,就可以在不同節點上並行執行,而不需要回滾(Roll Back)整個程序。落后任務(即運行很慢的節點)是通過任務備份,重新調用執行進行處理的。

  因為RDD本身支持基於工作集的運用,所以可以使Spark的RDD持久化(persist)到內存中,在並行計算中高效重用。多個查詢時,我們就可以顯性地將工作集中的數據緩存到內存中,為后續查詢提供復用,這極大地提升了查詢的速度。在Spark中,一個RDD就是一個分布式對象集合,每個RDD可分為多個片(Partitions),而分片可以在集群環境的不同節點上計算。

  RDD作為泛型的抽象的數據結構,支持兩種計算操作算子:Transformation(變換)與Action(行動)。且RDD的寫操作是粗粒度的,讀操作既可以是粗粒度的,也可以是細粒度的。RDD.scala的源碼如下: 

/**
 * Internally, each RDD is characterized by five main properties:
 * 每個RDD都有5個主要特性
 *  - A list of partitions    分區列表
 *  - A function for computing each split    每個分區都有一個計算函數
 *  - A list of dependencies on other RDDs    依賴於其他RDD的列表
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)    數據類型(key-value)的RDD分區器
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for    每個分區都有一個分區位置列表
 */
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  其中,SparkContext是Spark功能的主要入口點,一個SparkContext代表一個集群連接,可以用其在集群中創建RDD、累加變量、廣播變量等,在每一個可用的JVM中只有一個SparkContext,在創建一個新的SparkContext之前,必須先停止該JVM中可用的SparkContext,這種限制可能最終會被修改。SparkContext被實例化時需要一個SparkConf對象去描述應用的配置信息,在這個配置對象中設置的信息,會覆蓋系統默認的配置。

  RDD五大特性:

  (1)分區列表(a list of partitions)。Spark RDD是被分區的,每一個分區都會被一個計算任務(Task)處理,分區數決定並行計算數量,RDD的並行度默認從父RDD傳給子RDD。默認情況下,一個HDFS上的數據分片就是一個Partition,RDD分片數決定了並行計算的力度,可以在創建RDD時指定RDD分片個數,如果不指定分區數量,當RDD從集合創建時,則默認分區數量為該程序所分配到的資源的CPU核數(每個Core可以承載2~4個Partition),如果是從HDFS文件創建,默認為文件的Block數。

  (2)每一個分區都有一個計算函數(a function for computing each split)。每個分區都會有計算函數,Spark的RDD的計算函數是以分片為基本單位的,每個RDD都會實現compute函數,對具體的分片進行計算,RDD中的分片是並行的,所以是分布式並行計算。有一點非常重要,就是由於RDD有前后依賴關系,遇到寬依賴關系,例如,遇到reduceBykey等寬依賴操作的算子,Spark將根據寬依賴划分Stage,Stage內部通過Pipeline操作,通過Block Manager獲取相關的數據,因為具體的split要從外界讀數據,也要把具體的計算結果寫入外界,所以用了一個管理器,具體的split都會映射成BlockManager的Block,而具體split會被函數處理,函數處理的具體形式是以任務的形式進行的。

  (3)依賴於其他RDD的列表(a list of dependencies on other RDDs)。RDD的依賴關系,由於RDD每次轉換都會生成新的RDD,所以RDD會形成類似流水線的前后依賴關系,當然,寬依賴就不類似於流水線了,寬依賴后面的RDD具體的數據分片會依賴前面所有的RDD的所有的數據分片,這時數據分片就不進行內存中的Pipeline,這時一般是跨機器的。因為有前后的依賴關系,所以當有分區數據丟失的時候,Spark會通過依賴關系重新計算,算出丟失的數據,而不是對RDD所有的分區進行重新計算。RDD之間的依賴有兩種:窄依賴(Narrow Dependency)、寬依賴(Wide Dependency)。RDD是Spark的核心數據結構,通過RDD的依賴關系形成調度關系。通過對RDD的操作形成整個Spark程序。

    RDD有Narrow Dependency和Wide Dependency兩種不同類型的依賴,其中的Narrow Dependency指的是每一個parent RDD的Partition最多被child RDD的一個Partition所使用,而Wide Dependency指的是多個child RDD的Partition會依賴於同一個parent RDD的Partition。可以從兩個方面來理解RDD之間的依賴關系:一方面是該RDD的parent RDD是什么;另一方面是依賴於parent RDD的哪些Partitions;根據依賴於parent RDD的Partitions的不同情況,Spark將Dependency分為寬依賴和窄依賴兩種。Spark中寬依賴指的是生成的RDD的每一個partition都依賴於父RDD的所有partition,寬依賴典型的操作有groupByKey、sortByKey等,寬依賴意味着shuffle操作,這是Spark划分Stage邊界的依據,Spark中寬依賴支持兩種Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基於Hash的Shuffle機制,后者是基於排序的Shuffle機制。Spark 2.2現在的版本中已經沒有Hash Shuffle的方式。

  (4)key-value數據類型的RDD分區器(-Optionally,a Partitioner for key-value RDDS),控制分區策略和分區數。每個key-value形式的RDD都有Partitioner屬性,它決定了RDD如何分區。當然,Partition的個數還決定每個Stage的Task個數。RDD的分片函數,想控制RDD的分片函數的時候可以分區(Partitioner)傳入相關的參數,如HashPartitioner、RangePartitioner,它本身針對key-value的形式,如果不是key-value的形式,它就不會有具體的Partitioner。Partitioner本身決定了下一步會產生多少並行的分片,同時,它本身也決定了當前並行(parallelize)Shuffle輸出的並行數據,從而使Spark具有能夠控制數據在不同節點上分區的特性,用戶可以自定義分區策略,如Hash分區等。Spark提供了“partitionBy”運算符,能通過集群對RDD進行數據再分配來創建一個新的RDD。

  (5)每個分區都有一個優先位置列表(-Optionally,a list of preferred locations to compute each split on)。它會存儲每個Partition的優先位置,對於一個HDFS文件來說,就是每個Partition塊的位置。觀察運行spark集群的控制台會發現Spark的具體計算,具體分片前,它已經清楚地知道任務發生在什么節點上,也就是說,任務本身是計算層面的、代碼層面的,代碼發生運算之前已經知道它要運算的數據在什么地方,有具體節點的信息。這就符合大數據中數據不動代碼動的特點。數據不動代碼動的最高境界是數據就在當前節點的內存中。這時有可能是memory級別或Alluxio級別的,Spark本身在進行任務調度時候,會盡可能將任務分配到處理數據的數據塊所在的具體位置。據Spark的RDD.Scala源碼函數getPreferredLocations可知,每次計算都符合完美的數據本地性。
RDD類源碼文件中的4個方法和一個屬性對應上述闡述的RDD的5大特性。RDD.scala的源碼如下:

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition. 通過子類實現給定分區的計算
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   * 通過子類實現,返回一個RDD分區列表,這個方法只被調用一次,它是安全的執行一次耗時計算
   * 
   * 數組中的分區必須符合以下屬性設置
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]

  /**
   * 返回對父RDD的依賴列表,這個方法僅只被調用一次,它是安全的執行一次耗時計算
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * 可選的,指定優先位置,輸入參數是spilt分片,輸出結果是一組優先的節點位置
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** 
   * Optionally overridden by subclasses to specify how they are partitioned. 
   * 可選的,通過子類實現,指定如何分區
   */
  @transient val partitioner: Option[Partitioner] = None

  其中,TaskContext是讀取或改變執行任務的環境,用org.apache.spark.TaskContext.get()可返回當前可用的TaskContext,可以調用內部的函數訪問正在運行任務的環境信息。Partitioner是一個對象,定義了如何在key-Value類型的RDD元素中用Key分區,從0到numPartitions-1區間內映射每一個Key到Partition ID。Partition是一個RDD的分區標識符。Partition.scala的源碼如下。  

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM