batchDuration:嘗試提交Job任務的間隔,請注意這里是嘗試。具體代碼如下
/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ private[streaming] def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new SparkException (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { logDebug("Time " + time + " is valid") true } }
假設slideDuration的大小是batchDuration的N倍,那么前N-1次嘗試都會無法創建job去執行。
只有第N次嘗試的時候,才會提交job。
默認情況下,batchDuration和slideDuration值是相等的,因此每次嘗試的時候都會成功。
InputDStream override def slideDuration: Duration = { if (ssc == null) throw new Exception("ssc is null") if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") ssc.graph.batchDuration } MappedDStream override def slideDuration: Duration = parent.slideDuration
但是有一個意外,哪就是如果有window窗口的時候,情況就不一樣了。
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope { new WindowedDStream(this, windowDuration, slideDuration) } /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration ): DStream[T] = ssc.withScope { this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) }
可以看到的是,諸如需要window的方法,都可以自定義slideDuration,可以是slideDuration的值是batchDuration的倍數的任何值。這個值的修改只會影響之后的DStream,之前的DStream的slideDuration還是和batchDuration相等。
那么當任務是如何執行的呢?
每個DStream都會存在一個方法
override def compute(validTime: Time): Option[RDD[T]]
比如 MappedDStream,他的實現是
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
很簡單,就是調用父DStream的getOrCompute,然后在執行map方法。然后逐級調用,直到沒有父DStream為止。
我們知道slideDuration的值是在windowDStream才被改變的,那么它會有什么實現呢?
override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logDebug("Using partition aware union for windowing at " + validTime) new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) } else { logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc, rddsInWindow) } Some(windowRDD) }
我們看到,WindowedDStream中,會首先獲取到此window的范圍
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
然后調用父DStream的slice方法
val rddsInWindow = parent.slice(currentWindow)
再次提醒一下,此時父DStream的slideDuration已經變了,變成和batchDuration一樣了。
slice的具體實現是
/** * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) { toTime } else { logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" + slideDuration + ")") toTime.floor(slideDuration, zeroTime) } val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) { fromTime } else { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") fromTime.floor(slideDuration, zeroTime) } logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None }) }
我們只看最后一段代碼就行。
alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None })
將window范圍的time,根據slideDuration,也就是batchDuration的步長,生成batch進行計算。
/** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ private[streaming] def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new SparkException (this + " has not been initialized") } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) false } else { logDebug("Time " + time + " is valid") true } }
這個代碼就返回結果就是true。
等把所有執行的結果返回后,windowedDStream會把結果進行整合。
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logDebug("Using partition aware union for windowing at " + validTime) new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) } else { logDebug("Using normal union for windowing at " + validTime) new UnionRDD(ssc.sc, rddsInWindow) } Some(windowRDD) }
最終得到我們想要的rdd。