狀態管理函數
Spark Streaming中狀態管理函數包括updateStateByKey和mapWithState,都是用來統計全局key的狀態的變化的。它們以DStream中的數據進行按key做reduce操作,然后對各個批次的數據進行累加,在有新的數據信息進入或更新時。能夠讓用戶保持想要的不論任何狀狀。
1. updateStateByKey
概念
updateStateByKey會統計全局的key的狀態,不管又沒有數據輸入,它會在每一個批次間隔返回之前的key的狀態。updateStateByKey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函數操作。如果通過更新函數對state更新后返回來為none,此時刻key對應的state狀態會被刪除(state可以是任意類型的數據的結構)。
適用場景
updateStateByKey可以用來統計歷史數據,每次輸出所有的key值。例如統計不同時間段用戶平均消費金額,消費次數,消費總額,網站的不同時間段的訪問量等指標
使用實例
條件
1)首先會以DStream中的數據進行按key做reduce操作,然后再對各個批次的數據進行累加 。
2)updateStateBykey要求必須要設置checkpoint點。
3)updateStateByKey 方法中 updateFunc就要傳入的參數,。Seq[V]表示當前key對應的所有值,Option[S] 是當前key的歷史狀態,返回的是新的封裝的數據。
代碼
object SparkStreamingUpdateStateByKey { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) def main(args: Array[String]) { val spark = SparkSession.builder() .appName("UpdateStateByKey") .master("local[3]") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) // 必須設置checkpoint ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") val wordCount: DStream[(String, Int)] = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .flatMap(_.split(" ")) .map((_, 1)) .updateStateByKey((values: Seq[Int], state: Option[Int]) => { /** state.getOrElse(0) 得到歷史可以值,如果key值不存在,則為0 */ Option(values.foldLeft(state.getOrElse(0))(_ + _)) }) wordCount.print() ssc.start() ssc.awaitTermination() } }
2. mapWithState
概念
mapWithState也會統計全局的key的狀態,但是如果沒有數據輸入,便不會返回之前的key的狀態,只會返回batch中存在的key值統計,類似於增量的感覺。
適用場景
mapWithState可以用於一些實時性較高,延遲較少的一些場景,例如你在某寶上下單買了個東西,付款之后返回你賬戶里的余額信息。
使用實例
條件
1)如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值
2)還可以指定timeout函數,該函數的作用是,如果一個key超過timeout設定的時間沒有更新值,那么這個key將會失效。這個控制需要在func中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之后,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。
3) checkpoint 不是必須的
object SparkStreamingMapWithState { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) def main(args: Array[String]) { val spark = SparkSession.builder() .master("local[2]") .appName("UpdateStateByKeyDemo") .getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Seconds(10)) val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]()) // 可以不設置checkpoint ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint") val wordCount: MapWithStateDStream[String, Int, Int, Any] = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream") .map((_, 1)) .mapWithState(StateSpec.function(func).initialState(initialRDD).timeout(Seconds(30))) wordCount.print() ssc.start() ssc.awaitTermination() } /** * word : 代表統計的單詞 * option:代表的是歷史數據(使用option是因為歷史數據可能有,也可能沒有,如第一次進來的數據就沒有歷史記錄) * state:代表的是返回的狀態 */ val func = (word: String, option: Option[Int], state: State[Int]) => { if (state.isTimingOut()) { println(word + "is timeout") } else { // getOrElse(0)不存在賦初始值為零 val sum = option.getOrElse(0) + state.getOption().getOrElse(0) // 單詞和該單詞出現的頻率/ 獲取歷史數據,當前值加上上一個批次的該狀態的值 val wordFreq = (word, sum) state.update(sum) wordFreq } } }
updateStateByKey和mapWithState的區別
updateStateByKey可以在指定的批次間隔內返回之前的全部歷史數據,包括新增的,改變的和沒有改變的。由於updateStateByKey在使用的時候一定要做checkpoint,當數據量過大的時候,checkpoint會占據龐大的數據量,會影響性能,效率不高。
mapWithState只返回變化后的key的值,這樣做的好處是,我們可以只是關心那些已經發生的變化的key,對於沒有數據輸入,則不會返回那些沒有變化的key的數據。這樣的話,即使數據量很大,checkpoint也不會像updateStateByKey那樣,占用太多的存儲,效率比較高(再生產環境中建議使用這個)。