Spark RDD到底是個什么東西


前言

  用Spark有一段時間了,但是感覺還是停留在表面,對於Spark的RDD的理解還是停留在概念上,即只知道它是個彈性分布式數據集,其他的一概不知

有點略顯慚愧。下面記錄下我對RDD的新的理解。

 

官方介紹

   彈性分布式數據集。 RDD是只讀的、分區記錄的集合。RDD只能基於在穩定物理存儲中的數據集和其他已有的RDD上執行確定性操作來創建。

問題

      只要你敢問度娘RDD是什么,包你看到一大片一模一樣的答案,都是說這樣的概念性的東西,沒有任何的價值。

      我只想知道 RDD為什么是彈性 而不是 不彈性, RDD到底是怎么存數據,在執行任務的過程中是咋哪個階段讀取數據。

 

什么是彈性

    我的理解如下(若有誤或不足,煩請指出更正): 

            1. RDD可以在內存和磁盤之間手動或自動切換

            2. RDD可以通過轉換成其他的RDD,即血統

            3. RDD可以存儲任意類型的數據

 

存儲的內容是什么

     根據編寫Spark任務的代碼來看,很直觀的感覺是RDD就是一個只讀的數據,例如  rdd.foreach(println)

     但是不是, RDD其實不存儲真是的數據,只存儲數據的獲取的方法,以及分區的方法,還有就是數據的類型。

     百聞不如一見, 下面看看RDD的源碼:

//其他的代碼刪除了,主要保留了它的兩個抽象方法
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
//計算某個分區數據的方法 ,將某個分區的數據讀成一個 Iterator
def compute(split: Partition, context: TaskContext): Iterator[T]
  //計算分區信息 只會被調用一次
  protected def getPartitions: Array[Partition]

}

  通過RDD的這兩個抽象方法,我們可以看出 :

                          RDD其實是不存儲真是數據的,存儲的的只是 真實數據的分區信息getPartitions,還有就是針對單個分區的讀取方法 compute

    到這里可能就有點疑惑,要是RDD只存儲這分區信息和讀取方法,那么RDD的依賴信息是怎么保存的?

     其實RDD是有保存的,只是我粘貼出的只是RDD頂層抽象類,還要一點需要注意 ,RDD只能向上依賴,而真正實現這兩個方法的RDD都是整個任務的輸入端,即處於RDD血統的頂層,初代RDD 

     舉個例子:val rdd = sc.textFile(...); val rdd1 = rdd.map(f)  .  這里的 rdd是初代RDD, 是沒有任何依賴的RDD的,所以沒就沒有保存依賴信息, 而 rdd1是子代RDD,那么它就必須得記錄下自己是來源於誰,也就是血統,

   下面展示的是HadoopRDD和  MapPartitionsRDD

 

 //負責記錄數據的分區信息  和 讀取方法 

class HadoopRDD[K, V](
  @transient sc: SparkContext,
  broadcastedConf: Broadcast[SerializableConfiguration],
  initLocalJobConfFuncOpt: Option[JobConf => Unit],
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

  override def getPartitions: Array[Partition] = { ***篇幅所限  自己查看**}

   override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {***篇幅所限  自己查看**}

}

 

//子代RDD的作用起始很簡單  就是記錄初代RDD到底在干了什么才得到了自己

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](

    prev: RDD[T],  //上一代RDD
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)  //初代RDD生成自己的方法
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

 

  到這里,我們就大概了解了RDD到底存儲了什么東西,

               初代RDD: 處於血統的頂層,存儲的是任務所需的數據的分區信息,還有單個分區數據讀取的方法,沒有依賴的RDD, 因為它就是依賴的開始。

              子代RDD: 處於血統的下層, 存儲的東西就是 初代RDD到底干了什么才會產生自己,還有就是初代RDD的引用

現在我們基本了解了RDD里面到底存儲了些什么東西,那么問題就來了,到底讀取數據發生在什么時候。

數據讀取發生在什么時候

   直接開門見山的說, 數據讀取是發生在運行的Task中,也就是說,數據是在任務分發的executor上運行的時候讀取的,上源碼:

private[spark] class ResultTask[T, U](
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient locs: Seq[TaskLocation],
    val outputId: Int,
    internalAccumulators: Seq[Accumulator[Long]])
  extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
  with Serializable {

  @transient private[this] val preferredLocs: Seq[TaskLocation] = {
    if (locs == null) Nil else locs.toSet.toSeq
  }

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))  //這里調用了 rdd.iterator , 下面看看RDD的這個方法
  }

  // This is only callable on the driver side.
  override def preferredLocations: Seq[TaskLocation] = preferredLocs

  override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")"
}


final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {

          //先判斷是否有緩存 ,有則直接從緩存中取 , 沒有就從磁盤中取出來, 然后再執行緩存操作 
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) 
  } else {

          //直接從磁盤中讀取 或 從 檢查點中讀取 
    computeOrReadCheckpoint(split, context)
  }
}

  在spark中的任務 最終是會被分解成多個TaskSet到executor上運行,TaskSet的划分是根據是否需要shuffle來的。

     在spark中就只有兩種Task,一種是ResultTask ,一種是ShuffleTask, 兩種Task都是以相同的方式讀取RDD的數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM