場景描述
如果一個task在處理過程中掛掉了,那么它在內存中的狀態都會丟失,所有的數據都需要重新計算。那么我就需要一個東西保存歷史狀態State。
首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。我們在這里討論的是state。
Spark的狀態更新
updateStateByKey
updateStateByKey會統計全局的key的狀態,不管又沒有數據輸入,它會在每一個批次間隔返回之前的key的狀態。updateStateByKey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函數操作。如果通過更新函數對state更新后返回來為none,此時刻key對應的state狀態會被刪除(state可以是任意類型的數據的結構)。
mapWithState
mapWithState也會統計全局的key的狀態,但是如果沒有數據輸入,便不會返回之前的key的狀態,類似於增量的感覺。
updateStateByKey和mapWithState的區別
updateStateByKey可以在指定的批次間隔內返回之前的全部歷史數據,包括新增的,改變的和沒有改變的。由於updateStateByKey在使用的時候一定要做checkpoint,當數據量過大的時候,checkpoint會占據龐大的數據量,會影響性能,效率不高。
mapWithState只返回變化后的key的值,這樣做的好處是,我們可以只是關心那些已經發生的變化的key,對於沒有數據輸入,則不會返回那些沒有變化的key的數據。這樣的話,即使數據量很大,checkpoint也不會像updateStateByKey那樣,占用太多的存儲,效率比較高(再生產環境中建議使用這個)。
updateStateByKey示例:
def updateFunction(currValues:Seq[Int],preValue:Option[Int]): Option[Int] = { val currValueSum = currValues.sum //上面的Int類型都可以用對象類型替換 Some(currValueSum + preValue.getOrElse(0)) //當前值的和加上歷史值 } kafkaStream.map(r => (r._2,1)).updateStateByKey(updateFunction _)
這里的updateFunction方法就是需要我們自己去實現的狀態跟新的邏輯,currValues就是當前批次的所有值,preValue是歷史維護的狀態,updateStateByKey返回的是包含歷史所有狀態信息的DStream。
mapWithState示例:
val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]()) //自定義mappingFunction,累加單詞出現的次數並更新狀態 val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => { val sum = count.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } //調用mapWithState進行管理流數據的狀態 kafkaStream.map(r => (r._2,1)).mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()
這里的initialRDD就是初始化狀態,updateStateByKey也有對應的API。這里的mappingFun也是需要我們自己實現的狀態跟新邏輯,調用state.update()就是對狀態的跟新,output就是通過mapWithState后返回的DStream中的數據形式。注意這里不是直接傳入的mappingFunc函數,而是一個StateSpec 的對象,其實也是對函數的一個包裝而已。
整理自: