spark streaming 的batchDuration slideDuration windowDuration關系


batchDuration:嘗試提交Job任務的間隔,請注意這里是嘗試。具體代碼如下

  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
  private[streaming] def isTimeValid(time: Time): Boolean = {
    if (!isInitialized) {
      throw new SparkException (this + " has not been initialized")
    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
        " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
      false
    } else {
      logDebug("Time " + time + " is valid")
      true
    }
  }

假設slideDuration的大小是batchDuration的N倍,那么前N-1次嘗試都會無法創建job去執行。

只有第N次嘗試的時候,才會提交job。

默認情況下,batchDuration和slideDuration值是相等的,因此每次嘗試的時候都會成功。

InputDStream
  override def slideDuration: Duration = {
    if (ssc == null) throw new Exception("ssc is null")
    if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null")
    ssc.graph.batchDuration
  }

MappedDStream
  override def slideDuration: Duration = parent.slideDuration

但是有一個意外,哪就是如果有window窗口的時候,情況就不一樣了。

  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
    new WindowedDStream(this, windowDuration, slideDuration)
  }

  /**
   * Return a new DStream in which each RDD has a single element generated by reducing all
   * elements in a sliding window over this DStream.
   * @param reduceFunc associative reduce function
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def reduceByWindow(
      reduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
    this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
  }

可以看到的是,諸如需要window的方法,都可以自定義slideDuration,可以是slideDuration的值是batchDuration的倍數的任何值。這個值的修改只會影響之后的DStream,之前的DStream的slideDuration還是和batchDuration相等。

 

那么當任務是如何執行的呢?

 

每個DStream都會存在一個方法

 override def compute(validTime: Time): Option[RDD[T]]

比如 MappedDStream,他的實現是

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }

 

很簡單,就是調用父DStream的getOrCompute,然后在執行map方法。然后逐級調用,直到沒有父DStream為止。

 

我們知道slideDuration的值是在windowDStream才被改變的,那么它會有什么實現呢?

 

  override def compute(validTime: Time): Option[RDD[T]] = {
    val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
    val rddsInWindow = parent.slice(currentWindow)
    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
      logDebug("Using partition aware union for windowing at " + validTime)
      new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
    } else {
      logDebug("Using normal union for windowing at " + validTime)
      new UnionRDD(ssc.sc, rddsInWindow)
    }
    Some(windowRDD)
  }

我們看到,WindowedDStream中,會首先獲取到此window的范圍

val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)

然后調用父DStream的slice方法

val rddsInWindow = parent.slice(currentWindow)

再次提醒一下,此時父DStream的slideDuration已經變了,變成和batchDuration一樣了。

slice的具體實現是

 

  /**
   * Return all the RDDs between 'fromTime' to 'toTime' (both included)
   */
  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope {
    if (!isInitialized) {
      throw new SparkException(this + " has not been initialized")
    }

    val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) {
      toTime
    } else {
      logWarning("toTime (" + toTime + ") is not a multiple of slideDuration ("
          + slideDuration + ")")
        toTime.floor(slideDuration, zeroTime)
    }

    val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) {
      fromTime
    } else {
      logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
      + slideDuration + ")")
      fromTime.floor(slideDuration, zeroTime)
    }

    logInfo("Slicing from " + fromTime + " to " + toTime +
      " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")

    alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })
  }

我們只看最后一段代碼就行。

 alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
      if (time >= zeroTime) getOrCompute(time) else None
    })

將window范圍的time,根據slideDuration,也就是batchDuration的步長,生成batch進行計算。

  /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */
  private[streaming] def isTimeValid(time: Time): Boolean = {
    if (!isInitialized) {
      throw new SparkException (this + " has not been initialized")
    } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) {
      logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime +
        " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime))
      false
    } else {
      logDebug("Time " + time + " is valid")
      true
    }
  }

這個代碼就返回結果就是true。

等把所有執行的結果返回后,windowedDStream會把結果進行整合。

    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
      logDebug("Using partition aware union for windowing at " + validTime)
      new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
    } else {
      logDebug("Using normal union for windowing at " + validTime)
      new UnionRDD(ssc.sc, rddsInWindow)
    }
    Some(windowRDD)
  }

最終得到我們想要的rdd。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM