原文鏈接:http://blog.csdn.net/zisheng_wang_data/article/details/51712392
本講內容:
a. updateStateByKey解密
b. mapWithState解密
注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。
上節回顧
上一講中,我們從Spark Streaming源碼解讀Driver容錯安全性:那么什么是Driver容錯安全性呢?
a. 從數據層面:ReceivedBlockTracker為整個Spark Streaming應用程序記錄元數據信息
b. 從調度層面:DStreamGraph和JobGenerator是Spark Streaming調度的核心,記錄當前調度到哪一進度,和業務有關
c. 從運行角度: 作業生存層面,JobGenerator是Job調度層面
談Driver容錯性我們需要考慮Driver中有那些需要維持狀態的運行
a. ReceivedBlockTracker跟蹤了數據,因此需要容錯。通過WAL方式容錯
b. DStreamGraph表達了依賴關系,恢復狀態的時候需要根據DStream恢復計算邏輯級別的依賴關系。通過checkpoint方式容錯
c. JobGenerator表面是基於ReceiverBlockTracker中的數據,以及DStream構成的依賴關系不斷的產生Job的過程。也可以這么理解這個過程中消費了那些數據,並且跟蹤進行到了一個怎樣的程度
具體分析如下圖:
最后我們可以這樣總結道:
a. ReceivedBlockTracker是通過WAL方式來進行數據容錯的。
b. DStreamGraph和JobGenerator是通過checkpoint方式來進行數據容錯的。
開講
本講我們講Spark Streaming中一個非常重要的內容:State狀態管理
a. 為了說明state狀態管理,拿兩個非常具體非常有價值的方法updateStateByKey和mapWithState這兩個方法來說明 sparkstreaming是如何實現對state狀態管理的。Sparkstreaming是按照batchduration划分job的,但是有時 我們想算過去一個小時或者過去一天的數據,在大於batchduration的時候對數據進行符合業務邏輯的操作,這時候不可避免的要進行狀態維護。 Sparkstreaming每個batchduration都會產生一個job,job里面都是RDD,所以現在面臨的一個問題就是,他每個 batchduration產生RDD,怎么對他的狀態進行維護的問題(像updateStateByKey)。例如計算一天的商品的點擊量,這時候就需 要類似於updateStateByKey或者mapWithState這樣的方法幫助完成核心的步驟
b. Spark 的狀態管理其實有很多函數,比較典型的有類似的UpdateStateByKey、MapWithState方法來完成核心的步驟
updateStateByKey和mapWithState這兩個方法在DStream中並不能找到。因為updateStateByKey和 mapWithState這兩個方法都是針對key-value類型的數據進行操作,也就是pair類型的,和前邊講RDD是一樣的,RDD這個類本事並 不會對key-value類型的數據進行操作,所以這時候就需要借助scala的語法隱式轉換。隱式轉換一般放在類的伴生對象中,將DStream轉換成 PairDStreamFunctions。這是從地獄中召喚出來的功能,使用后又回到地獄。運行機制就是找不到DStream的 updateStateByKey和mapWithState,他們是PairDStreamFunctions的方法,就找隱式轉換,隱式轉換中發現 toPairDStreamFunctions這個功能,就使用了implicit功能
繼續跟蹤PairDStreamFunctions類中有次方法定義
updateStateByKey
在PairDStreamFunctions中updateStateByKey具體實現如下
在已有的歷史基礎上,updateFunc對歷史數據進行更新。該函數的返回值是DStream類型的。
現在解讀一下updateStateByKey方法,updateStateByKey是在已有狀態基礎上采用updateFunc對歷史數據進行更新,具體怎么更新就是updateFunc這樣的函數進行操作的,返回一個DStream
這里采用的是defaultPartitioner,不論是基於狀態的計算還是基於batchduration的計算都是基於RDD的,RDD這里 面肯定需要Partitioner,默認采用的是HashPartitioner,HashPartitioner其實就對應着Hash的shuffle 或者Hash的計算方式。Hash的特點就是效率高,不需要進行排序等
(spark sql如果操作hive數據倉庫中表的時候,假設自己設制了一個並行度,對spark sql on hive是否會生效?也就是分多少partition會不會受自定義分片的控制?:不會,這是spark sql特殊的一個地方,不會的時候有時候就會導致一種情況就是並行度太低,因為RDD的操作是后面的RDD繼承自前面的RDD的partition,前面 並行度太低就會影響后面的計算,影響GC等等。這時候就需要repartition而不是使用collase的方式)
rememberPartitioner前面傳的是true,所以Partitioner都會傳下去。new StateDStream
繼續跟蹤StateDStream,繼承了DStream,如果對狀態不斷的操作就會產生很多的StateDStream狀態對象
這里進行persist是在磁盤上,super.persist(StorageLevel.MEMORY_ONLY_SER),因為計算的是過去 很長時間的數據,數據可能會很多。核心的代碼是val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner),cogroup就是按照key對value進行聚合,按照key對所有數據進行掃描然后聚合,這樣做好處是對rdd的計算;
不好的地方就是性能,cogroup對所有數據進行掃描,隨着時間流逝數據規模越來越大性能越低,cogroup rdd和另一個cogroup rdd數據進行掃描合並
所以,如果數據很多的時候不建議使用updateStateByKey
updateStateByKey函數實現如下:
mapWithState
下面來看mapWithState,返回一個MapWithStateDStream,使用一個函數不斷作用於這個DStream中key- value的元素,基於key進行狀態維護和更新。mapWithState這個方法參數是StateSpec,這里邊封裝了對數據操作的函數。對 State可以理解為是一個內存數據表,這個表中記錄了所有的歷史狀態,可以對表中不同key的數據進行操作。更新的時候根據key在state的基礎上 更新對應的value。就相當於對一個表進行增刪改查
StateSpecImpl是一個case class,里面封裝了對數據操作的function
走進 MapWithStateDStream源碼
可以看到MapWithStateDStreamImpl這個類的計算交給了內部一個類InternalMapWithStateDStream。這個類是對數據進行更新
InternalMapWithStateDStream的compute方法創建了一個 MapWithStateRDD,MapWithStateRDD中包含了mapWithState的數據,以及對數據怎么操作。這個RDD的每一個 partition被一個MapWithStateRDDRecord代表的
MapWithStateRDDRecord有對數據的具體更新,這里邊有兩個關鍵的數據結構mappedData 和wrappedState 。newStateMap 是對以前RDD數據進行復制。每次更新數據的時候是對當前barchduration的數據進行遍歷,而不是像updateStateByKey一樣要遍 歷所有的數據
dataIterator.foreach dataIterator是當前barchduration的數據。每次遍歷的時候會先記錄以往數據中相同key的state,再根據指定的函數對遍歷的 數據進行操作,然后把操作后的狀態保存。沒有對歷史數據進行從新遍歷。這個效率會高很多。返回的還是MapWithStateRDDRecord ,從RDD的角度看
MapWithStateRDDRecord 並沒有變,但是它內部變了。相當於RDD原先指向一條數據,這條數據不能修改。現在RDD也是指向一條數據,只是這個數據中又封裝了數據。可以改變封裝的 數據,從RDD角度看,RDD指向的數據並沒有變。所以RDD可以處理變化的數據。只是要自定義一個RDD指向的數據結構
MapWithState實現如下:
最后我們附上代碼執行流程圖:
(來源:http://blog.csdn.net/hanburgud/article/details/51545414,感謝作者)
備注:
1、DT大數據夢工廠微信公眾號DT_Spark
2、Spark專家:王家林
3、新浪微博: http://www.weibo.com/ilovepains