Spark RDD深度解析-RDD計算流程


Spark RDD深度解析-RDD計算流程

摘要  RDDResilient Distributed Datasets)是Spark的核心數據結構,所有數據計算操作均基於該結構進行,包括Spark sql Spark Streaming。理解RDD有助於了解分布式計算引擎的基本架構,更好地使用Spark進行批處理與流計算。本文以Spark2.0源代碼為主,對RDD的生成、計算流程、加載順序等作深入的解析。

RDD印象

直觀上,RDD可理解為下圖所示結構,即RDD包含多個Partition(分區),每個Partition代表一部分數據並位於一個計算節點。

 

RDD本質上Spark中的一個抽象類,所有子RDDHadoopRDDMapPartitionRDDJdbcRDD等)都要繼承並實現其中的方法。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD包含以下成員方法或屬性

1、compute方法

提供在計算過程中Partition元素的獲取與計算方式

2、partition的列表

每一個partition代表一個並行的最小划分單元;

3、dependencies列表

描述RDD依賴哪些父RDD生成即RDD的血緣關系

4、partition的位置列表

定義如何最快速的獲取partition數據,加快計算,這個是可選的,可作為本地化計算的優化選項;

5、partitioner方法

定義如何對數據進行分區。

RDD生成方式

1、scala集合

Partition的默認值:defaultParallelism

defaultParallelismspark的部署模式相關:

    • Local 模式:本機 cpu cores 的數量
    • Mesos 模式:8
    • Yarnmax(2, 所有 executors cpu cores 個數總和)

2、物理數據載入

默認為min(defaultParallelism, 2)

3、其他RDD轉換

根據具體的轉換算子而定

Partition

Partiton不直接持有數據,僅僅代表了分區的位置(index的值)。

trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}

Dependency

從名字可以猜想,他描述了RDD之間的依賴關系。成員rdd就是父RDD,會在構造RDD時被賦值。

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

由上述RDDDependcy關系可畫出下圖,通過這種方式,子RDD能輕易找到父RDD的位置等信息,從而構建出RDD的轉換路徑,為DAGScheduler的任務划分及任務執行時尋找依賴的數據提供依據。

 

到此應該能大致明白RDD中涉及的各個概念的含義及其之間的聯系。但是仔細思考,會發現存在很多問題,比如:

既然RDD不攜帶數據,那么數據是何時加載的?怎么加載的?怎么分布到不同計算節點的?

不同類型RDD是怎么完成轉換的?


RDD計算流程

以下面幾行代碼為例,解答上述問題。

var sc = new SparkContext();

var hdfs_rdd = sc.textFile(hdfs://master:9000/examples/people.txt);  //  加載數據

var rdd = hdfs_rdd.map(_.split(“,”));  //  對每行數據按逗號分隔

print(rdd.count());  //  打印數據的條數

 

RDD的轉換

 

首先從直觀上了解上述代碼執行過程中RDD的轉換,如下圖,Spark按照HDFS中文件的block將數據加載到內存,成為初始RDD1,經過每一步操作后轉換為相應RDD

 

首先分析textFile方法的作用,源碼如下:

 def textFile(

    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

着重看紅色語句,textFile方法實際上是調用了hadoopFile方法,再利用其返回值調用map方法,HadoopFile執行了什么,返回了什么呢?

def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()

  // This is a hack to enforce loading hdfs-site.xml.
  // See SPARK-11227 for details.
  FileSystem.getLocal(hadoopConfiguration)

  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

很明顯,hadoopFile實際上是獲取了HADOOP的配置,然后構造並返回了HadoopRDD對象,HadoopRDDRDD的子類。因此textFile最后調用的是HadoopRDD對象的map方法,其實RDD接口中定義並實現了map方法,所有繼承了RDD的類調用的map方法都來自於此。

觀察RDDmap方法:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

map方法很簡單,首先包裝一下傳進來的函數,然后返回MapPartitionsRDD對象。至此,textFile結束,他最終只是返回了MapPartitionsRDD,並沒有執行數據讀取、計算操作。

 

 接着看下一語句:var rdd = hdfs_rdd.map(_.split(“,”));

由上面的分析可知hdfs_rdd是一個MapPartitionsRDD對象,於是其map方法內容與上文的一模一樣,也只是返回一個包含用戶函數的MapPartitionsRDD對象。

目前為止每個方法的調用只是返回不同類型的RDD對象,還未真正執行計算。

 

接着var cnt = rdd.count();

count是一種action類型的操作,會觸發RDD的計算,為什么說count會觸發RDD的計算呢?需要看看count的實現:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

可以看到,count方法中調用了scsparkContext)的runJob方法,該操作將觸發DagScheduler去分解任務並提交到集群執行。count方法會返回Array[U]類型的結果,數組中每個值代表了當前RDD每個分區中包含的元素個數,因此sum的結果就是RDD中所有元素的個數,本例的結果就是HDFS文件中存在幾行數據。

 

RDD的計算

下面介紹任務提交后RDD是怎么計算出來的。

任務分解並提交后開始執行,task最后一個RDD上執行compute方法

以上述代碼為例,最后一個RDD的類型是MapPartitionsRDD,看其compute方法:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

其中splitRDD的分區,firstParent是父RDD;最外層的f其實是構造MapPartitionsRDD時傳入的一個參數,改參數是一個函數對象,接收三個參數並返回Iterator

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)

f是何時生成的呢?就看何時生成的MapPartitionsRDD,參考上文可知MapPartitionsRDD是在map方法里構造的第二個構造參數就是f的具體實現。

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

綜上可知,MapPartitionsRDDcomputef的作用就是就是對f的第三個參數iter執行iter.map(cleanF),其中cleanF就是用戶調用map時傳入的函數,而iter又firstParent[T].iterator(split, context)的返回值。

firstParent[T].iterator(split, context)又是什么呢?他是對父RDD執行iterator方法,該方法是RDD接口的final方法,因此所有子RDD調用的都是該方法。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

通過進一步查看可知,iterator先判斷 RDD storageLevel 是否為 NONE,若不是,則嘗試從緩存中讀取,讀取不到則通過計算來獲取該 Partition 對應的數據的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應數據的迭代器,若 checkpoint 不存在則通過計算來獲取。

Iterator方法將返回一個迭代器,通過迭代器可以訪問父RDD個分區的每個元素,如果內存中不存在父RDD的數據,則調用父RDDcompute方法進行計算。

RDD真正的計算由RDDaction 操作觸發,對於action 操作之前的所有Transformation 操作,Spark只記錄Transformation的RDD生成軌跡,即各個RDD之間的相互依賴關系。

 

總結

Spark RDD的計算方式為:spark是從最后一個RDD開始計算(調用compute),計算時尋找父RDD,若父RDD在內存就直接使用,否則調用父RDDcompute計算得出,以此遞歸,過程可抽象為下圖:

 

從對象產生的順序看,先生成了HadoopRDD,調用兩次map方法后依次產生兩個MapPartitionsRDD;從執行的角度看,先執行最后一個RDDcompute方法,在計算過程中遞歸執行父RDDcompute,以生成對應RDD的數據數據加載角度看,第一個構造出來的RDD執行compute時才會將數據載入內存(本例中為HDFS讀入內存),然后在這些數據上執行用戶傳入的方法,依次生成子RDD的內存數據。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM