流處理 —— Spark Streaming中的Window操作


窗口函數,就是在DStream流上,以一個可配置的長度為窗口,以一個可配置的速率向前移動窗口,根據窗口函數的具體內容,分別對當前窗口中的這一波數據采取某個對應的操作算子。

需要注意的是窗口長度,和窗口移動速率需要是batch time的整數倍

 

 

1.window(windowLength, slideInterval)

該操作由一個DStream對象調用,傳入一個窗口長度參數,一個窗口移動速率參數,然后將當前時刻當前長度窗口中的元素取出形成一個新的DStream。 

//input:
-------
java
scala
-------
 java
scala
-------   
val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .map((_, 1))
      .window(Seconds(50), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

//output:
//input:
-------
java
scala
-------
java
scala
java
scala
-------   

2.  countByWindow(windowLength,slideInterval)

返回指定長度窗口中的元素個數。 

注:需要設置checkpoint

//input:
-------
java
-------
java
scala
------- 
val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 設置checkpoint
    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .map((_, 1))
      .countByWindow(Seconds(30), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

// output:
------
1
------
3
------

3. countByValueAndWindow(windowLength,slideInterval, [numTasks])

統計當前時間窗口中元素值相同的元素的個數

注:需要設置checkpoint

// input:
-----------
java
-----------
java
scala
-----------

 val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 設置checkpoint
    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .countByValueAndWindow(Seconds(30), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

// ouput:
---------
(java,1)
---------
(java,2)
(scala,1)
---------

4. reduceByWindow(func, windowLength,slideInterval)

在調用DStream上首先取窗口函數的元素形成新的DStream,然后在窗口元素形成的DStream上進行reduce。 

// input:
----------
java
----------
java
spark
----------
val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 設置checkpoint
    //    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .reduceByWindow(_ + ":" + _, Seconds(30), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

// output:
----------
java
----------
java:java:spark
----------

5.reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

reduceByKeyAndWindow的數據源是基於該DStream的窗口長度中的所有數據進行計算。該操作有一個可選的並發數參數。 

// input:
----------
java
-----------
java
scala
----------- 
val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 設置checkpoint
    //    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .map((_, 1))
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

//output:
-----------
(java,1)
-----------
(java,2)
(scala,1)
-----------

6. reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

 這個窗口操作和上一個的區別是多傳入一個函數invFunc。前面的func作用和上一個reduceByKeyAndWindow相同,后面的invFunc是用於處理流出rdd的。 

val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 設置checkpoint
    //    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    // 注意:窗口長度,窗口移動速率需要是batch time的整數倍
    ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .map((_, 1))
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, (a: Int, b: Int) => a - b,
        Seconds(20), Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM