SparkStreaming--reduceByKeyAndWindow


1、reduceByKeyAndWindow(_+_,Seconds(3),  Seconds (2))
    可以看到我們定義的window窗口大小Seconds(3s) ,是指每2s滑動時,需要統計前3s內所有的數據。

2、對於他的重載函數 reduceByKeyAndWindow(_+_,_-_,Seconds(3s),seconds(2))
     設計理念是,當 滑動窗口的時間Seconds(2) < Seconds(3)(窗口大小)時,兩個統計的部分會有重復,那么我們就可以
     不用 重新獲取或者計算,而是通過獲取舊信息來更新新的信息,這樣即節省了空間又節省了內容,並且效率也大幅提升。
    
     如上圖所示,2次統計重復的部分為time3對用的時間片內的數據,這樣對於window1,和window2的計算可以如下所示
     win1 = time1 + time2 + time3
      win2 = time3 + time4 + time5
      
      更新為
     win1 = time1 + time2 + time3
      win2 = win1+ time4 + time5 - time2 - time3
      
      這樣就理解了吧,  _+_是對新產生的時間分片(time4,time5內RDD)進行統計,而_-_是對上一個窗口中,過時的時間分片
     (time1,time2) 進行統計    

3、注意事項
/**
* Return a new DStream by applying incremental `reduceByKey` over a sliding window.
* The reduced value of over a new window is calculated using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
*
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
*
* This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param filterFunc Optional function to filter expired key-value pairs;
* only pairs that satisfy the function are retained
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration = self.slideDuration,
numPartitions: Int = ssc.sc.defaultParallelism,
filterFunc: ((K, V)) => Boolean = null
): DStream[(K, V)] = ssc.withScope {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowDuration,
slideDuration, defaultPartitioner(numPartitions), filterFunc
)
}

                                                                                                                                                                                          


      





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM