上一篇博客中,已經對股票預測的例子做了簡單的講解,下面對其中的幾個關鍵的技術點再作一些總結。
1、updateStateByKey
由於在1.6版本中有一個替代函數,據說效率比較高,所以作者就順便研究了一下該函數的用法。
def mapWithState[StateType, MappedType](spec :StateSpec[K, V, StateType, MappedType]) : MapWithStateDStream[K, V, StateType, MappedType] = { }
上面是函數的原型,接收一個StateSpec的對象,其實就是對updateStateByKey相關參數的一個封裝。該對象接收4個類型參數,KEY值的類型,VALUE的類型,State的類型,Mapped的類型。理解這個四個類型參數也比較關鍵,這個跟updateStateByKey有少許區別:K,V這兩個類型參數不需要太多解釋;State的類型可以是任意類型,Float,(Float,Int),OneObject等等;MappedType是映射結果的類型,也就是說返回的類型也可以是任意類型,這點與updateStateByKey有少許不同。下面是一個示例
/** mapWithState.function是用每個key的state對(k,v)進行map * 對輸入的每一個(stockMame,stockPrice)鍵值對,使用每個key的state進行映射,返回新的結果 * 此處的state是每個stockName的上一次的價格 * 用輸入的(stockName,stockPrice)中的stockPrice更行state中的上一次的價格(state.update函數) * 映射結果為(stockName,(stockPrice-上一次價格,1)) ,當然映射結果也可以是其他值,例如(stockName,最后一次價格變化的方向) * */ val updatePriceTrend = (key:String, newPrice: Option[Float],state:State[Float]) => { val lstPrice:Float = state.getOption().getOrElse(newPrice.getOrElse(0.0f)) state.update(newPrice.getOrElse(0.0f)) // println(new SimpleDateFormat("HH:mm:ss").format(new Date())+"-"+newPrice.getOrElse(0.0f)+","+lstPrice) (key,(newPrice.getOrElse(0.0f)-lstPrice,1)) }
2、reduceByKeyAndWindow
上一個例子中,雖然使用到了該函數,但其實是在官方例子的基礎上依葫蘆畫瓢寫的,並不能很好的理解該函數的具體用法。下面是此次優化后的代碼
val reduceFunc = (reduced: (Float,Int), newPair: (Float,Int)) => { if (newPair._1 > 0) (reduced._1 + newPair._1, reduced._2 + newPair._2) else (reduced._1 + newPair._1, reduced._2 - newPair._2) } val invReduceFunc = (reduced: (Float,Int), oldPair: (Float,Int)) => { if (oldPair._1 > 0) (reduced._1 + oldPair._1, reduced._2 - oldPair._2) else (reduced._1 + oldPair._1, reduced._2 + oldPair._2) } /** 每隔slideLen個BatchTime對過去windowLen個(不包含當前Batch)BatchTime的RDD進行計算 * */ val windowedPriceChanges = stockPrice.reduceByKeyAndWindow(reduceFunc,invReduceFunc, Seconds(3),//windowLen Seconds(1) //slideLen )
其中兩個函數很關鍵:reduceFunc、invReduceFunc。reduceFunc是對進入窗口的數據進行的計算,invReduceFunc是對離開窗口的數據進行的計算。那么怎么理解進入窗口和離開窗口呢?要首先理解窗口函數的基本意義,下圖展示了滑動窗口的概念 。
如上圖所示,一個滑動窗口時間段((sliding window length)內的所有RDD會進行合並以創建windowed DStream所對應的RDDD。每個窗口操作有兩個參數:
- window length - The duration of the window (3 in the figure),滑動窗口的時間跨度,指本次window操作所包含的過去的時間間隔(圖中包含3個batch interval,可以理解時間單位)
- sliding interval - The interval at which the window operation is performed (2 in the figure).(窗口操作執行的頻率,即每隔多少時間計算一次)
These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 這表示,sliding window length的時間長度以及sliding interval都要是batch interval的整數倍。batch interval是在構造StreamingContext時傳入的(1 in the figure)
那么上圖中,在time5的時候,reduceFunc處理的數據就是time4和time5;invReduceFunc處理的數據就是time1和time2。此處需要特別特別特別處理,這里的window at time 5要理解成time 5的最后一刻,如果這里的time是一秒的話,那么time 5其實就是第5秒最后一刻,也就是第6秒初。關於這一點在后面的博文中會具體講解。
關鍵點解釋的差不多了,reduceFunc的函數就好理解了,該函數的第一個參數reduced可以理解成在time 3的時候計算的最終結果,第二個參數其實也就分別是time 4和time 5的數據(該函數應該會被調用多次的);那么time 4和time 5的這兩批數據是怎么匯總的呢?仍然是調用reduceFunc,也即是對這兩批數據中的每一條具體的記錄按照時間的先后順序調用reduceFunc,其實也就是leftReduce。invReduceFunc同理。
好了,兩個關鍵函數就算解釋清楚了,如果還有不清楚的地方,可以留言評論,最后附上源碼的git地址:http://git.oschina.net/gabry_wu/BigDataPractice
PS:未經允許,禁止轉載,否則將追究法律責任!