spark streaming之 windowDuration、slideDuration、batchDuration​


spark streaming 不同於sotm,是一種准實時處理系統。storm 中,把批處理看錯是時間教程的實時處理。而在spark streaming中,則反過來,把實時處理看作為時間極小的批處理。

1、三個時間參數

spark streaming 中有三個關於時間的參數,分別如下:

窗口時間windowDuration​:當前窗口要統計多長時間的數據,是批量時間的整數倍

滑動時間slideDuration​:要多長時間更新一次結果,是批量時間的整數倍

批量時間batchDuration​:多長時間創建一個批次,與實際業務無關,只與數據量有關,數據量大則可以設置短一些,數據量小則設置長一些,但必須小於其他兩個時間,

2、該怎么設置?

為方便理解,就拿咱們最常見的日啟、日活、周啟、周活作為示例

注:1、實際中日啟、日活、周啟、周活更多是用批處理,此處只是拿來方便大家理解

      2、此處不是嚴格意義上的日啟、周啟。此處的日:最近24小時,周:最近7天

案例1:每隔一小時,統計產品的日啟、日活,

窗口時間:1日,滑動時間:1小時,批量時間:1小時、半小時、15分鍾、10分鍾、5分鍾、2分鍾均可,視數據量大小而定

案例2:每天統計最近七天累計啟動、活躍

窗口時間:7日,滑動時間:1日 批量時間:一小時、半小時、10分鍾、5分鍾

3、實戰

為了理解上邊參數是怎么設置的,我們對假定現在有個需求,需要對輸入的字母進行計數。

使用nc -lk 9999 模擬生產者,發送數據,streaming 通過socket接收數據

實戰1:每10秒統計當前輸入的字符

適用:徹底非累加業務

  import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
  import org.apache.spark.SparkConf

  val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
  //10秒創建一個批次
  val ssc = new StreamingContext(sparkConf, Seconds(10))
  val lines = ssc.socketTextStream("localhost", 9999)
    val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))
    val current_stream = wordCounts.reduceByKey(_ + _)
    current_stream.print()
    current_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_current.")

    ssc.start()
    ssc.awaitTermination()

啟動生產者 nc -lk 9999

在spark-shell中輸入上邊代碼

在nc 的終端下, 

輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果   【    (a,1) (b,1) 】  原因:我們當前僅輸入了a、b 

輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件   期望 【 (c,1) (d,1)】 原因:我們當前輸入了c、d

輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 】 原因:當前我們沒有輸入, 所以沒有任何字符可以統計

實戰2、每10秒統計歷史所有輸入的字符。

適用范圍:計算歷史(包含窗口之外)累計數據,經常用於統計“總裝機量”之類 

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.SparkConf

val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
  //10秒創建一個批次
  val ssc = new StreamingContext(sparkConf, Seconds(10))
//累加所有經過的數據
    val updateFunc = (values: Seq[Long], state: Option[Long]) => {
      val currentCount = values.foldLeft(0l)(_ + _)
      val previousCount = state.getOrElse(0l)
      Some(currentCount + previousCount)
    }
    ssc.checkpoint("socket_wordcount_history")
    val lines = ssc.socketTextStream("localhost", 9999)
    val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))
    val history_stream = wordCounts.updateStateByKey[Long](updateFunc)//合並當前數據和歷史數據

    history_stream.print()
    history_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_history.")

    ssc.start()
    ssc.awaitTermination()

 啟動生產者 nc -lk 9999

在spark-shell中輸入上邊代碼

在nc 的終端下, 

輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果   【    (a,1) (b,1) 】  原因:我們當前輸入了a、b 

輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:我們當前輸入了c、d,歷史輸入過 a、b

輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:當前我們沒有輸入,但是,歷史曾經輸入過a、b、c、d  

輸入字符操作4、這時,仍不需要操作,等待30秒,在spark-shell中確認第四次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】原因: 當前我們沒有輸入,但是,歷史曾經輸入過a、b、c、d 

之后,即使沒有輸入abcd,統計結果仍包含abcd這四個字符各1次

 

實戰3、每隔30秒,統計最近1分鍾輸入的字母。窗口內歷史累加

(適用范圍:非累加業務,這里的累加指的是超出window范圍)

sc.stop

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}import org.apache.spark.SparkConf

val updateFunc = (values: Seq[Long], state: Option[Long]) => {
val currentCount = values.foldLeft(0l)(_ + _)
val previousCount = state.getOrElse(0l) 
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))   //10秒創建一個批次
ssc.checkpoint("socket-kafka-wordcount_recent")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
val stateDstream = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), Seconds(30))  //每30秒算一次,數據范圍為最近一分鍾內收到的數據  另外,使用window時,需要設置checkpoint

stateDstream.print()
stateDstream.repartition(1).saveAsTextFiles("/data/socket-streaming-wordcount.log")

ssc.start()
ssc.awaitTermination()

啟動生產者 nc -lk 9999

在spark-shell中輸入上邊代碼

在nc 的終端下,

輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果   【    (a,1) (b,1) 】  原因:最近1分鍾,我們只輸入了a、b

輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:最近1分鍾,我們只輸入了a、b、c、d

輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 (a,0) (b,0) (c,1) (d,1)】 原因:最近1分鍾,我們只輸入了c、d ,1分鍾之前輸入的a、b將不再在統計范圍之內

輸入字符操作4、這時,仍不需要操作,等待30秒,在spark-shell中確認第四次計算完成后,查看新產生文件 期望 【 (a,0) (b,0) (c,0) (d,0)】原因:最近1分鍾,我們沒有任何輸入,1 分鍾之前輸入的a、b、c、d將不再在統計范圍之內


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM