Spark 源碼分析 -- RDD


關於RDD, 詳細可以參考Spark的論文, 下面看下源碼
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be operated on in parallel.

* Internally, each RDD is characterized by five main properties:
*  - A list of partitions
*  - A function for computing each split
*  - A list of dependencies on other RDDs
*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

RDD分為一下幾類,

basic(org.apache.spark.rdd.RDD): This class contains the basic operations available on all RDDs, such as `map`, `filter`, and `persist`.

org.apache.spark.rdd.PairRDDFunctions: contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`

org.apache.spark.rdd.DoubleRDDFunctions: contains operations available only on RDDs of Doubles

org.apache.spark.rdd.SequenceFileRDDFunctions: contains operations available on RDDs that can be saved as SequenceFiles

 

RDD首先是泛型類, T表示存放數據的類型, 在處理數據是都是基於Iterator[T]
以SparkContext和依賴關系Seq deps為初始化參數
從RDD提供的這些接口大致就可以知道, 什么是RDD
1. RDD是一塊數據, 可能比較大的數據, 所以不能保證可以放在一個機器的memory中, 所以需要分成partitions, 分布在集群的機器的memory
所以自然需要getPartitions, partitioner如果分區, getPreferredLocations分區如何考慮locality

Partition的定義很簡單, 只有id, 不包含data

trait Partition extends Serializable {
  /**
   * Get the split's index within its parent RDD
   */
  def index: Int
  // A better default implementation of HashCode
  override def hashCode(): Int = index
}

2. RDD之間是有關聯的, 一個RDD可以通過compute邏輯把父RDD的數據轉化成當前RDD的數據, 所以RDD之間有因果關系
並且通過getDependencies, 可以取到所有的dependencies

3. RDD是可以被persisit的, 常用的是cache, 即StorageLevel.MEMORY_ONLY

4. RDD是可以被checkpoint的, 以提高failover的效率, 當有很長的RDD鏈時, 單純的依賴replay會比較低效

5. RDD.iterator可以產生用於迭代真正數據的Iterator[T]

6. 在RDD上可以做各種transforms和actions

abstract class RDD[T: ClassManifest](
    @transient private var sc: SparkContext, //@transient, 不需要序列化
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  
  /**輔助構造函數, 專門用於初始化1對1依賴關系的RDD,這種還是很多的, filter, map... 

Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) // 不同於一般的RDD, 這種情況因為只有一個parent, 所以直接傳入parent RDD對象即可

  // =======================================================================
  // Methods that should be implemented by subclasses of RDD
  // =======================================================================
  /** Implemented by subclasses to compute a given partition. */
  def compute(split: Partition, context: TaskContext): Iterator[T]

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getPartitions: Array[Partition]

  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

  /** Optionally overridden by subclasses to specify placement preferences. */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  val partitioner: Option[Partitioner] = None

  // =======================================================================
  // Methods and fields available on all RDDs
  // =======================================================================

  /** The SparkContext that created this RDD. */
  def sparkContext: SparkContext = sc

  /** A unique ID for this RDD (within its SparkContext). */
  val id: Int = sc.newRddId()

  /** A friendly name for this RDD */
  var name: String = null

  /**
   * Set this RDD's storage level to persist its values across operations after the first time
   * it is computed. This can only be used to assign a new storage level if the RDD does not
   * have a storage level set yet..
   */
  def persist(newLevel: StorageLevel): RDD[T] = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    storageLevel = newLevel
    // Register the RDD with the SparkContext
    sc.persistentRdds(id) = this
    this
  }

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)

  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
  def cache(): RDD[T] = persist()

  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
  def getStorageLevel = storageLevel

  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
  // be overwritten when we're checkpointed
  private var dependencies_ : Seq[Dependency[_]] = null  
  @transient private var partitions_ : Array[Partition] = null 

  /** An Option holding our checkpoint RDD, if we are checkpointed 
* checkpoint就是把RDD存到磁盤文件中, 以提高failover的效率, 雖然也可以選擇replay
* 並且在RDD的實現中, 如果存在checkpointRDD, 則可以直接從中讀到RDD數據, 而不需要compute */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
  
  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  /** 這是RDD訪問數據的核心, 在RDD中的Partition中只包含id而沒有真正數據
* 那么如果獲取RDD的數據? 參考storage模塊
* 在cacheManager.getOrCompute中, 會將RDD和Partition id對應到相應的block, 並從中讀出數據*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) {//StorageLevel不為None,說明這個RDD persist過, 可以直接讀出來 SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) //如果沒有persisit過, 只有從新計算出, 或從checkpoint中讀出 } }

  // Transformations (return a new RDD)
  //...... 各種transformations的接口,map, union...
  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
  // Actions (launch a job to return a value to the user program)
  //......各種actions的接口,count, collect...
  /**
   * Return the number of elements in the RDD.
   */
  def count(): Long = {// 只有在action中才會真正調用runJob, 所以transform都是lazy的
    sc.runJob(this, (iter: Iterator[T]) => {
      var result = 0L
      while (iter.hasNext) {
        result += 1L
        iter.next()
      }
      result
    }).sum
  }
  // =======================================================================
  // Other internal methods and fields
  // =======================================================================  
  /** Returns the first parent RDD 
返回第一個parent RDD*/
protected[spark] def firstParent[U: ClassManifest] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
  //................
}

 

這里先只討論一些basic的RDD, pairRDD會單獨討論

FilteredRDD

One-to-one Dependency, FilteredRDD

使用FilteredRDD, 將當前RDD作為第一個參數, f函數作為第二個參數, 返回值是filter過后的RDD

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))

在compute中, 對parent RDD的Iterator[T]進行filter操作

private[spark] class FilteredRDD[T: ClassManifest]( //filter是典型的one-to-one dependency, 使用輔助構造函數 
    prev: RDD[T],   //parent RDD
    f: T => Boolean) //f,過濾函數
  extends RDD[T](prev) {
  //firstParent會從deps中取出第一個RDD對象, 就是傳入的prev RDD, 在One-to-one Dependency中,parent和child的partition信息相同
  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override val partitioner = prev.partitioner    // Since filter cannot change a partition's keys

  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).filter(f) //compute就是真正產生RDD的邏輯
}

 

UnionRDD

Range Dependency, 仍然是narrow的

先看看如果使用union的, 第二個參數是, 兩個RDD的array, 返回值就是把這兩個RDD union后產生的新的RDD

  /**
   * Return the union of this RDD and another one. Any identical elements will appear multiple
   * times (use `.distinct()` to eliminate them).
   */
  def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))

 

先定義UnionPartition, Union操作的特點是, 只是把多個RDD的partition合並到一個RDD中, 而partition本身沒有變化, 所以可以直接重用parent partition

3個參數
idx, partition id, 在當前UnionRDD中的序號
rdd, parent RDD
splitIndex, parent partition的id

private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)
  extends Partition {

  var split: Partition = rdd.partitions(splitIndex)//從parent RDD中取出相應的partition, 重用

  def iterator(context: TaskContext) = rdd.iterator(split, context)//Iterator也可以重用

  def preferredLocations() = rdd.preferredLocations(split)

  override val index: Int = idx//partition id是新的, 因為多個合並后, 序號肯定會發生變化
}

定義UnionRDD

class UnionRDD[T: ClassManifest](
    sc: SparkContext,
    @transient var rdds: Seq[RDD[T]])  //parent RDD Seq
  extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies

  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](rdds.map(_.partitions.size).sum) //UnionRDD的partition數,是所有parent RDD中的partition數目的和
    var pos = 0
    for (rdd <- rdds; split <- rdd.partitions) {
      array(pos) = new UnionPartition(pos, rdd, split.index) //創建所有的UnionPartition
      pos += 1
    }
    array
  }

  override def getDependencies: Seq[Dependency[_]] = {
    val deps = new ArrayBuffer[Dependency[_]]
    var pos = 0
    for (rdd <- rdds) { 
      deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)//創建RangeDependency
      pos += rdd.partitions.size)//由於是RangeDependency, 所以pos的遞增是加上整個區間size
    }
    deps
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] =
    s.asInstanceOf[UnionPartition[T]].iterator(context)//Union的compute非常簡單,什么都不需要做

  override def getPreferredLocations(s: Partition): Seq[String] =
    s.asInstanceOf[UnionPartition[T]].preferredLocations()
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM