spark streaming(2) DAG靜態定義及DStream,DStreamGraph


DAG

 

中文名有向無環圖。它不是spark獨有技術。它是一種編程思想 ,甚至於hadoop陣營里也有運用DAG的技術,比如Tez,Oozie。有意思的是,Tez是從MapReduce的基礎上深化而來的分布式計算框架。其核心思想是將Map和Reduce兩個階段分成更多的函數,各個函數之間可自由組合,形成DAG dependencies鏈,延遲計算。可見DAG思想適合多階段的分布式計算,如果是MapReduce,Map本身就是InputStream,Reduce本身就是OutputStream,根本就不需要dependencies了。如果使用DAG思想反而得不償失。

spark的算子分為兩大類。一類是Transformation,一類是action。Transformation會在邏輯上將batch時間內的RDD形成一個DAG,然后在action觸發后,在物理上通過dependencies回溯進行RDD的計算。

那么從RDD到DAG是怎樣生成的呢?

 

DStream

 


RDD首先第一步要先變成DStream。

一個spark streaming程序首先要從數據源講起,這里以kafka作為數據為例,通過以下代碼可以得到一個InputDStream。

val inputDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

這是Driect的方式,也可以通過Reciver的方式得到ReceiverInputDStream,但是它本身也是繼承自InputDStream。

val receiverInputDStream = KafkaUtils.createStream(ssc, kafkaParams, topics, storageLevel) abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext) extends InputDStream[T](_ssc)

通過源碼可以看到InputDStream繼承自DStream。

abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc)

由此RDD變成了DStream。下面我們來仔細研究一下DStream究竟是個什么樣子呢。

 我們知道一個spark streaming處理邏輯包括接收數據(InputStream),處理數據(transformation和action),輸出結果(OutputStream)。前面從RDD到DStream部份實際上就是接收數據部分。那么從DStream的角度來看看數據處理的部份。

從源碼可以看到每個transformation算子都有對應的DStream實現類。比如map->MappedDStream,flatMap->FlatMappedDStream,filter->FilteredDStream。

  // ======================================================================= // DStream operations // =======================================================================

  /** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope { new MappedDStream(this, context.sparkContext.clean(mapFunc)) } /** * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope { new FilteredDStream(this, context.sparkContext.clean(filterFunc)) }

我們以FilteredDStream為例,看看源碼。可以看到兩個關鍵屬性:parent和dependencies。parent是一個DStream,而dependencies是一個DStream的集合。parent相當於指針,指向當前DStream的父級DStream,從而形成DAG圖的一環。而dependencies則是當前DStream之前的所有DStream的集合。parent相當於在邏輯上表明各個DStream的關系,dependencies相當於在物理上表明整個DAG圖的所有RDD集合,以便回溯計算。

package org.apache.spark.streaming.dstream import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time} private[streaming] class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { parent.getOrCompute(validTime).map(_.filter(filterFunc)) } }

 前面是transformation算子所形成的DStream,那么action算子所形成的DStream呢。事實上所有的action最終都只會形成ForEachDstream,因為不管是foreachRDD還是print還是saveAsObjectFiles,通過查看源碼,會發現最終調用的還是foreachRDD。所以我們以我們以foreachRDD為例。

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } this.foreachRDD(saveFunc, displayInnerRDDOps = false) } /** * Save each RDD in this DStream as at text file, using string representation * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc, displayInnerRDDOps = false) } def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println
        println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println
 } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */
  private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
View Code

 ForEachDStream和所有transformation算子的DStream一樣,也有兩個關鍵屬性:parent和dependencies。同樣的,parent相當於在邏輯上表明各個DStream的關系,dependencies相當於在物理上表明整個DAG圖的所有RDD集合,以便回溯計算。但唯一不同的是,多了一個生成job的函數。這也不難理解,在action最后生成的ForEachDStream需要使用用戶自定義的函數對結果進行輸出。即是在這里進行。

package org.apache.spark.streaming.dstream import scala.reflect.ClassTag import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.scheduler.Job /** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent Parent DStream * @param foreachFunc Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * by `foreachFunc` will be displayed in the UI; only the scope and * callsite of `DStream.foreachRDD` will be displayed. */
private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }

 那么現在就比較明了了,如下圖。

 

 實際上,我們整個這一部份都是在討論RDD怎樣變成DStream。但是,這句話在表達上是有問題的,RDD表示彈性分布式數據集,它是不可變的。RDD真的是變成了DStream了嗎?它們之間是什么關系呢?

 實際上DStream是包含了RDD的數據集加指針。RDD還是沒有變,它只是通過轉換算子改變了形狀。(Return a new DStream in which each RDD is generated by applying a function on each RDD of 'this' DStream.)。RDD僅包括數據。DStream得到對RDD數據集的引用,並反映了各個RDD所對應的DStream的邏輯順序指針,以及當前批次(batch)的信息。

RDD = DStream at batch T

或者

DStream = RDD + dependencies + slideDuration

RDD和DStream的關系如下圖

DStreamGraph 

 前面理解了RDD怎樣形成了DStream,那么在整個batch里所有的操作包括數據輸入,中間算子,結果輸出產生的所有DStream是怎樣串聯在一起的呢?答案就是DStreamGraph 。

首先我們來看數據輸入,即InputDstream。通過源碼可以發現,它首先即是通過 DStreamGraph的addInputStream函數把數據保存在一個InputDStream的集合里。之所以是集合,表示可以接收多個數據源。

abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
  extends DStream[T](_ssc)

ssc.graph.addInputStream(this) =================================
final private[streaming] class DStreamGraph extends Serializable with Logging
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
def addInputStream(inputStream: InputDStream[_]) {
    this.synchronized {
      inputStream.setGraph(this)
      inputStreams += inputStream
    }
  }

然后在輸出即OutputStram中,可以看到每個action算子對應的DStream,前面講過,所有的action最終都會foreachRDD函數並得到ForEachDStream。在foreachRDD里會調用register函數,將此OutputDStream加入DStreamGraph。

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging
 
private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)     this
  }

==========================
final private[streaming] class DStreamGraph extends Serializable with Logging
def addOutputStream(outputStream: DStream[_]) {
    this.synchronized {
      outputStream.setGraph(this)
      outputStreams += outputStream
    }
  }

 然后是中間的算子的DStream,閱讀源碼發現DStreamGraph並沒有提供顯示的將FilteredDstream或者MappedDstream加入的函數,比如類似於addOutputStream或者addInputStream的addDStream或者addFilteredDstream。那么,DStreamGraph是怎樣將轉換算子產生的DStream與首尾相連的呢?其實它只是使用parent便可以將全部的RDD使用dependencies串接了起來。在action的時候通過addOutputStream把ForEachDStream加入dependencies的同時,把dependencies也傳遞給了DStreamGraph。這樣DStreamGraph就掌握了整個batch的DAG邏輯和物理圖。

 

 

小結

step1 對接數據源,得到InputDStream。

step2 通過ssc.graph.addInputStream(this),將InputDStream加入DStreamGraph,並通過 override def dependencies: List[DStream[_]] = List()初始化依賴鏈dependencies。此時DStreamGraph持久有InputDStream。                         此時DStream持有dependencies並為空。

step3 在Map算子中,通過override def dependencies: List[DStream[_]] = List(parent),將MappedDStream通過parent傳入依賴鏈dependencies。此時DStream持有dependencies。dependencies = MappedDStream。

step4 在filter算子中,通過override def dependencies: List[DStream[_]] = List(parent),將FilteredDStream通過parent傳入依賴鏈dependencies。此時DStream持有dependencies。

          且dependencies =[ MappedDStream->FilteredDStream]

step5 在action動作foreachRdd中,通過override def dependencies: List[DStream[_]] = List(parent),將ForEachDStream通過parent傳入依賴鏈dependencies。此時DStream持有dependencies。

          且dependencies =[ MappedDStream->FilteredDStream->ForEachDStream]

step6 在ForEachDStream實例化過后通過register函數中 ssc.graph.addOutputStream(this)通過當前對象將依賴鏈dependencies傳入DStreamGraph。此時DStreamGraph持久有InputDStream。  而且此時DStreamGraph擁有

          dependencies。 且dependencies =[ MappedDStream->FilteredDStream->ForEachDStream]

step7 DStreamGraph將InputDStream的dependencies整合。此時dependencies = [ InputDStream->MappedDStream->FilteredDStream->ForEachDStream]。即為DAG圖。

 格式原因,排版不是那么友好,圖片可能更為清楚。

 

現在完成了DAG的靜態鏈,形成了一個計算邏輯的模板。下一篇會探討spark streaming如何在每個batch根據DAG模板動態生成相應的DAG實例,並提交job,執行。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM