Spark 並行計算模型:RDD


Spark 允許用戶為driver(或主節點)編寫運行在計算集群上,並行處理數據的程序。在Spark中,它使用RDDs代表大型的數據集,RDDs是一組不可變的分布式的對象的集合,存儲在executors中(或從節點)。組成RDDs的對象稱為partitions,並可能(但是也不是必須的)在分布式系統中不同的節點上進行計算。Spark cluster manager根據Spark application設置的參數配置,處理在集群中啟動與分布Spark executors,用於計算,如下圖:

 

Spark 並不會立即執行driver 程序中的每個RDD 變換,而是懶惰執行:僅在最后的RDD數據需要被計算時(一般是在寫出到存儲系統,或是收集一個聚合數據給driver時)才觸發計算RDD變換。Spark可以將一個RDD加載到executor節點的內存中(在整個Spark 應用的生命周期),以在進行迭代計算時,達到更快的訪問速度。因為RDDs是不可變的,由Spark實現,所以在轉換一個RDD時,返回的是一個新的RDD,而不是已經存在的那個RDD。Spark的這些性質(惰性計算,內存存儲,以及RDD不可變性)提供了它易於使用、容錯、可擴展、以及高效運行的特點。

 

惰性計算

許多其他系統,對in-memory 存儲的支持,基於的是:對可變(mutable)對象的細粒度更新。例如:對內存中存儲的某個條目的更新。而在Spark中,RDDs是完全惰性的。直到一個action被調用之前,Spark不會開始計算partition。這里的action是一個Spark操作,除了返回一個RDD以外,還會觸發對分區的計算,或是可能返回一些輸出到非Spark系統中(如outside of the Spark executors)。例如,將數據發送回driver(使用類似count或collect 操作),或是將數據寫入到外部存儲系統(例如copyToHadoop)。Actions會觸發scheduler,scheduler基於RDD transformations之間的依賴關系,構建一個有向無環圖(DAG)。換句話說,Spark在執行一個action時,是從后向前定義的執行步驟,以產生最終分布式數據集(每個分區)中的對象。通過這些步驟(稱為 execution plan),scheduler對每個stage 計算它的missing partitions,直到它計算出最終的結果。

這里需要注意的是:所有的RDD變換都是100% 惰性的。sortByKey 需要計算RDD以決定數據的范圍,所以它同時包含了一個變換與一個action。

 

惰性計算的性能與可用性優勢

惰性計算允許Spark結合多個不需要與driver進行交互的操作(稱為1對1依賴變換),以避免多次數據傳輸。例如,假設一個Spark 程序在同樣的RDD上調用一個map和filter函數。Spark可以將這兩個指令發送給每個executor。然后Spark可以在每個partition上執行map與filter,這些操作僅需要訪問數據僅一次即可,而不是需要發送兩次指令(map與filter),也不需要訪問兩次partition數據。這個理論上可以減少一半的計算復雜度。

Spark的惰性執行不僅更高效。對比一個不同的計算框架(例如MapReduce),Spark上可以更簡單的實現同樣的計算邏輯。在MapReduce框架中,開發者需要做一些開發工作以合並他們的mapping 操作。但是在Spark中,它的惰性執行策略可以讓我們以更少的代碼實現相同的邏輯:我們可以將窄依賴鏈(chain)起來,並讓Spark執行引擎完成合並它們的工作。

考慮最經典的wordcount例子,在官方提供的例子中,即使最簡單的實現都包含了50行Java代碼。而在Spark的實現中,僅需要15行Java代碼,或是5行Scala 代碼:

def simpleWordCount(rdd: RDD[String]):RDD[(String, Int)]={
  val words = rdd.flatMap(_.split(" "))
  val wordPairs = words.map((_, 1))
  val wordCounts = wordPairs.reduceByKey(_ + _)
  wordCounts
}

 

使用Spark實現 word count的另一個優點是:它易於修改更新。假設我們需要修改函數,將一些“stop words”與標點符號從每個文檔中剔除,然后在進行word count 計算。在MapReduce中,這需要增加一個filter的邏輯到mapper中,以避免傳輸兩次數據並處理。而在Spark中,僅需要簡單地加一個filter步驟在map步驟前面即可。例如:

def withStopWordsFiltered(rdd : RDD[String], illegalTokens : Array[Char],
                          stopWords : Set[String]): RDD[(String, Int)] = {
  val seperator = illegalTokens ++ Array[Char](' ')
  val tokens: RDD[String] = rdd.flatMap(_.split(seperator).map(_.trim.toLowerCase))
  val words = tokens.filter(token => !stopWords.contains(token) && (token.length > 0))
  val wordPairs = words.map((_, 1))
  val wordCounts = wordPairs.reduceByKey(_ + _)
  wordCounts
}

 

惰性執行與容錯

Spark是有容錯性的,也就是說,在遇到主機或是網絡故障時,Spark不會失敗、丟失數據、或是返回錯誤的結果。Spark這個獨特的容錯方法的實現,得益於:數據的每個partition都包含了重新計算此partition需要的所有信息。大部分分布式計算中,提供容錯性的方式是:對可變的(mutable)對象(RDD為immutable 對象),日志記錄下更新操作,或是在機器之間創建數據副本。而在Spark中,它並不需要維護對每個RDD的更新日志,或是日志記錄實際發生的中間過程。因為RDD它自身包含了用於復制它每個partition所需的所有信息。所以,如果一個partition丟失,RDD有足夠的有關它血統的信息,用於重新計算。並且計算過程可以被並行執行,以快速恢復。當某個Worker節點上的Task失敗時,可以利用DAG重新調度計算這些失敗的Task(執行成功的Task可以從CheckPoint(檢查點)中讀取,而不用重新計算)。

 

惰性計算與DEBUGGING

由於惰性計算,所以Spark 程序僅會在執行action時才報錯,即使程序邏輯在RDD變換時就有問題了。並且此時Stack trace也僅會提示在action時報的錯。所以此時debug 程序時會稍有困難。

 

Immutability 與 RDD 接口

Spark定義了每個RDD類型都需要實現的RDD接口與其屬性。在一個RDD上執行變換時,不會修改原有RDD,而是返回一個新的RDD,新的RDD中的屬性被重新定義。RDDs可由三種方式創建:(1)從一個已存在的RDD變換得到;(2)從一個SparkContext,它是應用到Spark的一個API gateway;(3)轉換一個DataFrame或Dataset(從SparkSession創建)

SparkContext表示的是一個Spark集群與一個正在運行的Spark application之間的連接。
在Spark內部,RDD有5個主要屬性:

  1. 一組組成RDD的partitions
  2. 計算每個split的函數
  3. 依賴的其他RDDs
  4. (可選)對key-value RDDs的Partitioner(例如,某個RDD是哈希分區的)
  5. (可選)一組計算每個split的最佳位置(例如,一個HDFS文件的各個數據塊位置)

對於一個客戶端用戶來說,很少會用到這些屬性,不過掌握它們可以對Spark機制有一個更好的理解。這些屬性對應於下面五個提供給用戶的方法:

1. partitions:

final def partitions: Array[Partition] = {
  checkpointRDD.map(_.partitions).getOrElse {
    if (partitions_ == null) {
      partitions_ = getPartitions
      partitions_.zipWithIndex.foreach { case (partition, index) =>
        require(partition.index == index,
          s"partitions($index).partition == ${partition.index}, but it should equal $index")
      }
    }
    partitions_
 
}
}

 

返回這個RDD的partitions數組,會考慮到RDD是否有被做檢查點(checkpoint)。partitions方法查找分區數組的優先級為:從CheckPoint查找 -> 讀取partitions_ 屬性 -> 調用getPartitions 方法獲取。getPartitions 由子類實現,且此方法僅會被調用一次,所以實現時若是有較為消耗時間的計算,也是可以被接受的。

 

2. iterator:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

 

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

 

RDD的內部方法,用於對RDD的分區進行計算。如果有cache,先讀cache,否則執行計算。一般不被用戶直接調用。而是在Spark計算actions時被調用。

 

3. dependencies:

final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
 
}
}

 

獲取此RDD的依賴列表,會將RDD是否有checkpoint(檢查點)考慮在內。RDD的依賴列表可以讓scheduler知道當前RDD如何依賴於其他RDDs。從代碼來看,dependencies方法的執行步驟為:(1)從checkpoint獲取RDD信息,並將這些信息封裝為OneToOneDependency列表。如果從checkpoint中獲取到了依賴,則返回RDD依賴。否則進入第二步;(2)如果dependencies_ 為null,則調用getDependencies獲取當前RDD的依賴,並賦值給dependencies_,最后返回dependencies_。

在依賴關系中,主要有兩種依賴關系:寬依賴與窄依賴。會在之后討論。

 

4. partitioner:

/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None

 

返回一個Scala使用的partitioner 對象。此對象定義一個key-value pair 的RDD中的元素如何根據key做partition,用於將每個key映射到一個partition ID,從 0 到 numPartitions - 1。對於所有不是元組類型(非key/value數據)的RDD來說,此方法永遠返回None。

 

5. preferredLocations:

final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}

 

返回一個partition的位置信息(用於data locality)。具體地講,這個函數返回一系列String,表示的是split(Partition)存儲在些節點中。若是一個RDD表示的是一個HDFS文件,則preferredLocations 的結果中,每個String對應的是一個存儲partition的一個datanode節點名。

 

RDD上的函數:Transformations 與 Actions

在RDDs中定義了兩種函數類型:actions與transformations。Actions返回的不是一個RDD,而是執行一個操作(例如寫入外部存儲);transformations 返回的是一個新的RDD。

每個Spark 程序必須包含一個action,因為它會觸發Spark程序的計算,將結果信息返回給driver或是向外部存儲寫入數據。Persist 調用也會觸發程序執行,但是一般不會被標注為Spark job 的結束。向driver返回數據的actions包括:collect,count,collectAsMap,sample,reduce以及take。

這里需要注意的是,盡量使用take,count以及reduce等操作,以免返回給driver的數據過多,造成內存溢出(例如使用collect,sample)。

向外部存儲寫入數據的actions包括saveAsTextFile,saveAsSequenceFile,以及saveAsObjectFile。大部分寫入Hadoop 的actions僅適用於有key/value 對的 RDDs中,它們定義在PairRDDFunctions類(通過隱式轉換為元組類型的RDDs提供方法)以及NewHadoopRDD 類(它是從Hadoop中創建RDD的實現)中。一些saving 函數,例如saveAsTextFile 與 saveAsObjectFile,在所有RDDs中都可以使用,它們在實現時,都是隱式地添加了一個Null key到每個record 中(在saving 階段會被忽略掉),例如 saveAsTextFile 代碼:

def saveAsTextFile(path: String): Unit = withScope {
  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
  val textClassTag = implicitly[ClassTag[Text]]
  val r = this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

 

從代碼可以看出,在保存文件時,為每條記錄增加了一個Null key,OutputFormat使用的是Hadoop中的TextOutputFormat。

 

寬依賴與窄依賴

窄依賴,簡單的說就是:子RDD的所依賴的父RDD之間是一對一或是一對多的。

窄依賴需要滿足的條件:

  1. 父子RDD之間的依賴關系是可以在設計階段即確定的
  2. 與父RDD中的records的值無關
  3. 每個父RDD至多僅有一個子RDD

明確的說,在窄變換中的partition,要么是僅基於一個父partition(如map操作),要么是基於父partitions的一個特定子集(在design階段即可知道依賴關系,如coalesce操作)。所以窄變換可以在數據的一個子集上執行,而不需要依賴其他partition的信息。常見的窄依賴操作有:map,filter,mapPartitions,flatMap等,如下圖所示:

 

 

右邊的圖是一個coalesce的例子,它也是一個窄依賴。所以就算一個子partition依賴於多個父partition,它也可以是一個窄依賴,只要依賴的父RDD是明確的,且與partition中數據的值無關。

與之相反的是寬依賴,寬依賴無法僅在任意行上執行,而是需要將數據以特定的方式進行分區(例如根據key的值將數據分區)。例如sort方法,records需要被分區,同樣范圍的key被分區到同一個partition中。寬依賴的變換包括sort,reduceByKey,groupByKey,join,以及任何調用rePartition的函數。下面是寬依賴的一個示例圖:

 

 

寬依賴中的依賴關系,直到數據被計算前,都是未知的。相對於coalesce操作,數據需要根據key-value的值決定分到哪個區中。任何觸發shuffle的操作(如groupByKey,reduceByKey,sort,以及sortByKey)均符合此模式。但是join操作會有些復雜,因為根據兩個父RDDs被分區的方式,它們可以是窄依賴或是寬依賴。

在某些特定例子中,例如,當Spark已經知道了數據以某種方式分區,寬依賴的操作不會產生一個shuffle。如果一個操作需要執行一個shuffle,Spark會加入一個ShuffledDependency 對象到RDD的dependency 列表中。一般來說,shuffle操作是昂貴的,特別是在大量數據被移動到一個新的partition時。這點也是可以用於在程序中進行優化的,通過減少shuffle數量以及shuflle數據的傳輸,可以提升Spark程序的性能。

 

Spark Job

由於Spark使用的是惰性計算,所以直到driver程序調用一個action之前,Spark 應用基本上不會做任何事情。對每個action,Spark Scheduler會構造一個execution graph 並啟動一個Spark job。每個Spark job 包含一個或多個 stages ,stages即為計算出最終RDD時數據需要的transformation步驟。每個stage包含一組tasks,它們代表每個並行計算,並執行在executors上。

下圖是Spark應用的一個組成部分示意圖,其中每個stage對應一個寬依賴:

 

DAG

Spark的high-level調度層,使用RDD的依賴關系,為每個Spark job 構造一個stages的有向無環圖。在Spark API 中,它被稱為DAG Scheduler。你可能有注意到,在很多情況下的報錯,如連接集群、配置參數、或是launch一個Spark job,最終都會顯示為DAG Scheduler 錯誤。因為Spark job的執行是由DAG處理的。DAG為每個job構建一個stage圖,決定每個task執行的位置,並將信息傳遞給TaskScheduler。TaskScheduler負責在集群上執行tasks。TaskScheduler在partition之間創建一個依賴關系圖。

 

Jobs

Job是Spark執行的的層次關系圖中的最高元素。每個Spark job對應一個action,而每個action由driver程序調用。spark 執行圖(execution graph)的邊界基於的是RDD變換中partitions之間的依賴。所以,如果一個操作返回的不是一個RDD,而是另外的返回(如寫入外部存儲等),則此RDD不會有子RDD。也就是說,在圖論中,這個RDD就是一個DAG中的一個葉子節點。若是調用了一個action,則action不會生成子RDD,也就是說,不會有新的RDD加入到DAG圖中。所以此時application會launch一個job,包含了所有計算出最后一個RDD所需的所有transformation信息,開始執行計算。

這里需要區分的是 job 與stages的概念。一個job是由action觸發的,如collect,take,foreach等。並不是由寬依賴區分的,寬依賴區分的是stage,一個job包含多個stage。

 

Stages

一個job是由調用一個action后定義的。這個action可能包含一個或多個transformations,寬依賴的transformation將job划分為不同的stages。

每個stage對應於一個shuffle dependency,shuffle dependency 由寬依賴創建。從更高的視角來看,一個stage可以認為是一組計算(tasks)組成,每個計算都可以在一個executor上運行,且不需要與其他executors或是driver通信。也就是說,當workers之間需要做網絡通信時(例如shuffle),即標志着一個新的stage開始。

這些創建了stage邊界的dependencies(依賴)稱為ShuffleDependencies。Shuffle是由寬依賴產生的,例如sort或groupByKey,它們需要將數據在partition中重新分布。多個窄依賴的transformations可以被組合到一個stage中。

在我們之前介紹過的word count 例子中(使用stop words 做filter,並做單詞計數),Spark可以將flatMap,map以及filter 步驟(steps)結合到一個stage中,因為它們中沒有需要shuffle的transformation。所以每個executor都可以連續地應用flatMap,map以及filter 步驟在一個數據分區中。一般來說,設計程序時,盡量使用更少的shuffles。

 

Tasks

一個stage由多個task組成。Task是執行任務的最小單元,每個task代表一個本地計算。一個stage中的所有task都是在對應的每個數據分片上執行相同的代碼。一個task不能在多個executor上執行,而一個executor上可以執行多個tasks。每個stage中的tasks數目,對應於那個stage輸出的RDD的partition數。

下面是一個展示stage邊界的例子:

def simpleSparkProgram(rdd : RDD[Double]): Long ={
  //stage1
 
rdd.filter(_< 1000.0)
    .map(x => (x, x) )
    //stage2
   
.groupByKey()
    .map{ case(value, groups) => (groups.sum, value)}
    //stage 3
   
.sortByKey()
    .count()
}

 

在driver中執行此程序時,對應的流程圖如下:

 

藍色框代表的是shuffle 操作(groupByKey與sortByKey)定義的邊界。每個stage包含多個並行執行的tasks,每個task對應於RDD transformation結果(紅色的長方形框)中的每個partition。

在task並行中,如果任務的partitions數目(也就是需要並行的tasks數據)超出了當前可用的executor slots數目,則不會一次並行就執行完一個stage的所有tasks。所以可能需要兩輪或是多輪運行,才能跑完一個stage的所有tasks。但是,在開始下一個stage的計算之前,前一個stage所有tasks必須先全部執行完成。這些tasks的分發與執行由TaskScheduler完成,它根據scheduler使用的策略(如FIFO或fair scheduler)執行相應的調度。

 

References:

Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM