spark streaming 不同於sotm,是一種准實時處理系統。storm 中,把批處理看錯是時間教程的實時處理。而在spark streaming中,則反過來,把實時處理看作為時間極小的批處理。
1、三個時間參數
spark streaming 中有三個關於時間的參數,分別如下:
窗口時間windowDuration:當前窗口要統計多長時間的數據,是批量時間的整數倍
滑動時間slideDuration:要多長時間更新一次結果,是批量時間的整數倍
批量時間batchDuration:多長時間創建一個批次,與實際業務無關,只與數據量有關,數據量大則可以設置短一些,數據量小則設置長一些,但必須小於其他兩個時間,
2、該怎么設置?
為方便理解,就拿咱們最常見的日啟、日活、周啟、周活作為示例
注:1、實際中日啟、日活、周啟、周活更多是用批處理,此處只是拿來方便大家理解
2、此處不是嚴格意義上的日啟、周啟。此處的日:最近24小時,周:最近7天
案例1:每隔一小時,統計產品的日啟、日活,
窗口時間:1日,滑動時間:1小時,批量時間:1小時、半小時、15分鍾、10分鍾、5分鍾、2分鍾均可,視數據量大小而定
案例2:每天統計最近七天累計啟動、活躍
窗口時間:7日,滑動時間:1日 批量時間:一小時、半小時、10分鍾、5分鍾
3、實戰
為了理解上邊參數是怎么設置的,我們對假定現在有個需求,需要對輸入的字母進行計數。
使用nc -lk 9999 模擬生產者,發送數據,streaming 通過socket接收數據
實戰1:每10秒統計當前輸入的字符
適用:徹底非累加業務
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext} import org.apache.spark.SparkConf val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]") //10秒創建一個批次 val ssc = new StreamingContext(sparkConf, Seconds(10)) val lines = ssc.socketTextStream("localhost", 9999) val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l)) val current_stream = wordCounts.reduceByKey(_ + _) current_stream.print() current_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_current.") ssc.start() ssc.awaitTermination()
啟動生產者 nc -lk 9999
在spark-shell中輸入上邊代碼
在nc 的終端下,
輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果 【 (a,1) (b,1) 】 原因:我們當前僅輸入了a、b
輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件 期望 【 (c,1) (d,1)】 原因:我們當前輸入了c、d
輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 】 原因:當前我們沒有輸入, 所以沒有任何字符可以統計
實戰2、每10秒統計歷史所有輸入的字符。
適用范圍:計算歷史(包含窗口之外)累計數據,經常用於統計“總裝機量”之類
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.SparkConf
val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]") //10秒創建一個批次 val ssc = new StreamingContext(sparkConf, Seconds(10)) //累加所有經過的數據 val updateFunc = (values: Seq[Long], state: Option[Long]) => { val currentCount = values.foldLeft(0l)(_ + _) val previousCount = state.getOrElse(0l) Some(currentCount + previousCount) } ssc.checkpoint("socket_wordcount_history") val lines = ssc.socketTextStream("localhost", 9999) val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l)) val history_stream = wordCounts.updateStateByKey[Long](updateFunc)//合並當前數據和歷史數據 history_stream.print() history_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_history.") ssc.start() ssc.awaitTermination()
啟動生產者 nc -lk 9999
在spark-shell中輸入上邊代碼
在nc 的終端下,
輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果 【 (a,1) (b,1) 】 原因:我們當前輸入了a、b
輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:我們當前輸入了c、d,歷史輸入過 a、b
輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:當前我們沒有輸入,但是,歷史曾經輸入過a、b、c、d
輸入字符操作4、這時,仍不需要操作,等待30秒,在spark-shell中確認第四次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】原因: 當前我們沒有輸入,但是,歷史曾經輸入過a、b、c、d
之后,即使沒有輸入abcd,統計結果仍包含abcd這四個字符各1次
實戰3、每隔30秒,統計最近1分鍾輸入的字母。窗口內歷史累加
(適用范圍:非累加業務,這里的累加指的是超出window范圍)
sc.stop import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}import org.apache.spark.SparkConf val updateFunc = (values: Seq[Long], state: Option[Long]) => { val currentCount = values.foldLeft(0l)(_ + _) val previousCount = state.getOrElse(0l) Some(currentCount + previousCount) } val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) //10秒創建一個批次 ssc.checkpoint("socket-kafka-wordcount_recent") val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)) val stateDstream = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), Seconds(30)) //每30秒算一次,數據范圍為最近一分鍾內收到的數據 另外,使用window時,需要設置checkpoint stateDstream.print() stateDstream.repartition(1).saveAsTextFiles("/data/socket-streaming-wordcount.log") ssc.start() ssc.awaitTermination()
啟動生產者 nc -lk 9999
在spark-shell中輸入上邊代碼
在nc 的終端下,
輸入字符操作1、第一個10秒,輸入a,第二個10秒輸入b,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件,期望結果 【 (a,1) (b,1) 】 原因:最近1分鍾,我們只輸入了a、b
輸入字符操作2、第四個10秒,輸入c,第五個10秒輸入d,等待10秒,在spark-shell中確認第二次計算完成后,查看新產生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:最近1分鍾,我們只輸入了a、b、c、d
輸入字符操作3、這時,不需要操作,等待30秒,在spark-shell中確認第三次計算完成后,查看新產生文件 期望 【 (a,0) (b,0) (c,1) (d,1)】 原因:最近1分鍾,我們只輸入了c、d ,1分鍾之前輸入的a、b將不再在統計范圍之內
輸入字符操作4、這時,仍不需要操作,等待30秒,在spark-shell中確認第四次計算完成后,查看新產生文件 期望 【 (a,0) (b,0) (c,0) (d,0)】原因:最近1分鍾,我們沒有任何輸入,1 分鍾之前輸入的a、b、c、d將不再在統計范圍之內