Spark Streaming揭秘 Day14
State狀態管理
今天讓我們進入下SparkStreaming的一個非常好用的功能,也就State相關的操作。State是SparkStreaming中用來管理歷史數據的結構。目前主要提供了updateStateByKey和MapWithStateRDD兩個方法。
updateStateByKey
首先,讓我們先找一下這個方法的位置。
我們可以發現updateStateByKey這個方法並不在DStream中,而是在PairDStreamFunctions中。

為什么在不同類中的方法可以組合起來,這個就不得不提一下scala中非常厲害的隱式轉換特性,在如下部分,希望大家能深入研究下。

從方法的位置,我們可以很明確的知道這個方法必須是針對keyValue結構的。
進入到方法內部,我們首先看到其使用到了defaultPartitoner,默認是采用HashPartitioner,特點是效率高。

下面進入計算的關鍵代碼,也就是StateDStream中的compute方法。

從上述的計算邏輯中,我們會發現一個明顯的弱點:其核心邏輯是一個cogroup,具體來說是在每次計算時,都按照key對所有數據進行掃描和集合。好處是邏輯簡單,壞處是有性能問題,每次多要全部重新掃描下,隨着數據量越來越大,性能會越來越低,所以不能常使用。
所以這個方法主要針對小數據集的處理方法,關於這個效率問題有沒有解決方法,我們看看下一個方法。
mapWithState
這個方法在目前還是試驗狀態,有可能不穩定,但其設計理念讓人眼前一亮,讓我們先看一下方法說明。

可以看到,在方法中,對state提供了增刪改查等操作,也就是,可以把state與一個keyValue內存數據表等價。具體是如何實現的呢?
首先,進入方法定義,可以看到在操作時把存儲級別定為了內存存儲,這個和前面內存表的推斷一致。

進一步深入,發現,每個partition被一個MapWithStateRDDRecord代表,在計算時調用了如下框出的方法。

對於updateRecordWithData這個方法,主要分為了兩個步驟:
步驟一:內存表newStateMap建立,主要是采用copy方法,建立一張已納入歷史數據的內存表。

步驟二:根據當前Batch的數據進行計算,並更新newStateMap的數據,

從這個計算邏輯我們可以看,相比於第一個方法updateStateByKey,mapWithState的操作是增量的!!!這個效率會高很多。
進一步講,mapWithState方法給我們上了生動的一課。說明通過封裝,在partition不變的情況下,實現了對RDD內部數據的更新。
也就是說,對於Spark來說,不可變的RDD也可以處理變化的數據!!!
欲知后事如何,且聽下回分解
DT大數據每天晚上20:00YY頻道現場授課頻道68917580
