sparkStreaming滑動窗口操作


一、sparkStreaming窗口函數概念:

1、reduceByKeyAndWindow(_+_,Seconds(3), Seconds(2))

    可以看到我們定義的window窗口大小Seconds(3s) ,是指每2s滑動時,需要統計前3s內所有的數據。
 

2、重載函數reduceByKeyAndWindow(_+_,_-_,Seconds(3s),seconds(2))

     設計理念是,當 滑動窗口的時間Seconds(2) < Seconds(3)(窗口大小)時,兩個統計的部分會有重復,那么我們就可以
     不用重新獲取或者計算,而是通過獲取舊信息來更新新的信息,這樣即節省了空間又節省了內容,並且效率也大幅提升。
    
     如上圖所示,2次統計重復的部分為time3對用的時間片內的數據,這樣對於window1,和window2的計算可以如下所示
     win1 = time1 + time2 + time3
     win2 = time3 + time4 + time5
     
     更新為
     win1 = time1 + time2 + time3
     win2 = win1+ time4 + time5 - time2 - time3
     
     這樣就理解了吧,  _+_是對新產生的時間分片(time4,time5內RDD)進行統計,而_-_是對上一個窗口中,過時的時間分片
     (time1,time2) 進行統計   
 

 二、應用場景:

  在項目中,若有相關的業務需求需要進行跨批次的操作,例如,項目中的sparkStreaming設置的批次為5s,而業務計算過程中,需要應用一個5min的數據,這時候就可以使用滑動窗口函數來實現。

https://www.jianshu.com/p/2f0d2cb1faf4

 

三、代碼:

/**
 * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
 * The reduced value of over a new window is calculated using the old window's reduced value :
 *  1. reduce the new values that entered the window (e.g., adding new counts)
 *
 *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
 *
 * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
 * However, it is applicable to only "invertible reduce functions".
 * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
 * @param reduceFunc associative reduce function
 * @param invReduceFunc inverse reduce function
 * @param windowDuration width of the window; must be a multiple of this DStream's
 *                       batching interval
 * @param slideDuration  sliding interval of the window (i.e., the interval after which
 *                       the new DStream will generate RDDs); must be a multiple of this
 *                       DStream's batching interval
 * @param filterFunc     Optional function to filter expired key-value pairs;
 *                       only pairs that satisfy the function are retained
 */
def reduceByKeyAndWindow(
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    windowDuration: Duration,
    slideDuration: Duration = self.slideDuration,
    numPartitions: Int = ssc.sc.defaultParallelism,
    filterFunc: ((K, V)) => Boolean = null
  ): DStream[(K, V)] = ssc.withScope {
  reduceByKeyAndWindow(
    reduceFunc, invReduceFunc, windowDuration,
    slideDuration, defaultPartitioner(numPartitions), filterFunc
  )
}

 

輸入數據:<Id  <value,1>>
JavaPairDStream<String, Tuple2<Integer, Integer>> resultDStream = 
                 monitorId2SpeedDStream.reduceByKeyAndWindow(new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
                return new Tuple2<Integer, Integer>(v1._1+v2._1, v1._2+v2._2);
            }
        }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
                
                return new Tuple2<Integer, Integer>(v1._1 - v2._1,v2._2 - v2._2);
            }
        }, Durations.minutes(5), Durations.seconds(5));

 

轉載博客:https://www.cnblogs.com/zDanica/p/5471592.html

https://blog.csdn.net/Thomson617/article/details/87780167


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM