窗口函數,就是在DStream流上,以一個可配置的長度為窗口,以一個可配置的速率向前移動窗口,根據窗口函數的具體內容,分別對當前窗口中的這一波數據采取某個對應的操作算子。
需要注意的是窗口長度,和窗口移動速率需要是batch time的整數倍。
1.window(windowLength, slideInterval)
該操作由一個DStream對象調用,傳入一個窗口長度參數,一個窗口移動速率參數,然后將當前時刻當前長度窗口中的元素取出形成一個新的DStream。
//input: ------- java scala ------- java scala ------- val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .map((_, 1)) .window(Seconds(50), Seconds(10)) .print() ssc.start() ssc.awaitTermination() //output: //input: ------- java scala ------- java scala java scala -------
2. countByWindow(windowLength,slideInterval)
返回指定長度窗口中的元素個數。
注:需要設置checkpoint
//input: ------- java ------- java scala ------- val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 設置checkpoint ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .map((_, 1)) .countByWindow(Seconds(30), Seconds(10)) .print() ssc.start() ssc.awaitTermination() // output: ------ 1 ------ 3 ------
3. countByValueAndWindow(windowLength,slideInterval, [numTasks])
統計當前時間窗口中元素值相同的元素的個數
注:需要設置checkpoint
// input: ----------- java ----------- java scala ----------- val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 設置checkpoint ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .countByValueAndWindow(Seconds(30), Seconds(10)) .print() ssc.start() ssc.awaitTermination() // ouput: --------- (java,1) --------- (java,2) (scala,1) ---------
4. reduceByWindow(func, windowLength,slideInterval)
在調用DStream上首先取窗口函數的元素形成新的DStream,然后在窗口元素形成的DStream上進行reduce。
// input: ---------- java ---------- java spark ---------- val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 設置checkpoint // ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .reduceByWindow(_ + ":" + _, Seconds(30), Seconds(10)) .print() ssc.start() ssc.awaitTermination() // output: ---------- java ---------- java:java:spark ----------
5.reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow的數據源是基於該DStream的窗口長度中的所有數據進行計算。該操作有一個可選的並發數參數。
// input: ---------- java ----------- java scala ----------- val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 設置checkpoint // ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .map((_, 1)) .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10)) .print() ssc.start() ssc.awaitTermination() //output: ----------- (java,1) ----------- (java,2) (scala,1) -----------
6. reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
這個窗口操作和上一個的區別是多傳入一個函數invFunc。前面的func作用和上一個reduceByKeyAndWindow相同,后面的invFunc是用於處理流出rdd的。
val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 設置checkpoint // ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") // 注意:窗口長度,窗口移動速率需要是batch time的整數倍 ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .map((_, 1)) .reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b, Seconds(20), Seconds(10)) .print() ssc.start() ssc.awaitTermination()