SparkStreaming使用mapWithState時,設置timeout()無法生效問題解決方案


前言

當我在測試SparkStreaming的狀態操作mapWithState算子時,當我們設置timeout(3s)的時候,3s過后數據還是不會過期,不對此key進行操作,等到30s左右才會清除過期的數據。

百度了很久,關於timeout的資料很少,更沒有解決這個問題的文章,所以說,百度也不是萬能的,有時候還是需要靠自己。

所以我就在周末研究了一下,然后將結果整理了出來,希望能幫助大家更全面的理解Spark狀態計算。

mapWithState

按理說Spark Streaming實時處理,數據就像流水,每個批次之間的數據都是獨立的,處理完就處理完了,不留下任何狀態。但是免不了一些有狀態的操作,例如統計從流啟動到現在,某個單詞出現了多少次,所以狀態操作就出現了。

狀態操作分為updateStateByKey和mapWithState,兩者有着很大的區別。簡單的來說,前者每次輸出的都是全量狀態,后者輸出的是增量狀態。

過期原理

過期這一塊估計很多人開始都理解錯了,我剛開始理解就是數據從出現,經過多少秒之后就會過期。其實不是,這里的過期指的是空閑時間。

注釋大概是這個意思:timeout()傳入一個時間間隔參數,如果一個key在大於此間隔沒有此key的數據流入,則被認為是空閑的,就會單獨調用一次mapWithState中的func來清除這些空閑數據狀態。

先寫結論

使用了timeout()之后,需要使用以下代碼來在間隔內清除失效key。

stream.checkpoint(Seconds(6))

checkpoint的時候,會開啟全面掃描,才會對state中的失效key進行清理。

測試

   val conf = new SparkConf().setMaster("local[2]").setAppName("state")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.checkpoint("./tmp")
    val streams: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999)
      .map(x => (x, 1))

    val result = streams.mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => {
        val count = state.getOption().getOrElse(0)
        println(k)
        println(v)
        var sum = 0
        if (!state.isTimingOut()) {
          sum = count + v.get
          state.update(sum)
        } else {
          println("timeout")
        }
        Option(sum)
      })
      .timeout(Seconds(3))
    )
    // 這行代碼是觸發清除機制的關鍵
    // result.checkpoint(Seconds(6))
    result.print()
    ssc.start()
    ssc.awaitTermination()

使用上面的代碼進行測試,設置過期時間為3s。但是3s過后發現key並沒有過期,也不會被清除,大概30S之后被清除。

在9999端口輸入一個tom后,不再進行任何操作。測試結果如下:

tom
Some(1)
-------------------------------------------
Time: 1618228587000 ms
-------------------------------------------
Some(1)


tom
None
timeout
-------------------------------------------
Time: 1618228614000 ms
-------------------------------------------
Some(0)

從測試結果可以看出,從輸入到清除大概是27s。

我們現在將注釋的代碼放開,每6s進行checkpoint一次,輸入tom:

tom
Some(1)
-------------------------------------------
Time: 1618228497000 ms
-------------------------------------------
Some(1)

tom
None
timeout
-------------------------------------------
Time: 1618228506000 ms
-------------------------------------------
Some(0)

從生成到清除用了9秒,正好是過期時間 + 下一個窗口時間,觸發了checkpoint。

猜想

第一次學狀態操作的時候,就考慮如何去掉一些過期的key,通過timeout()的方法沒有完成自己想法,從網上也沒有找到解決方案,所以就暫且擱置在一邊了。后來又回過頭來考慮這個問題,然后根據自己的想法去猜想、去驗證。

1. 我先看的是mapWithState()的返回值

2. MapWithStateDStreamImpl

每個Dstream的計算邏輯都在compute()中,這里是調用了internalStream的getOrCompute(),根據繼承關系,調用的是父類Dstream的此方法:

getOrCompute()主要功能為:計算、緩存、checkpoint。這里只需要記住幾個地方:checkpointDuration,即checkpoint間隔,和調用了checkpoint()。其實真正的計算還是調用了compute(),接着去看compute()

3. InternalMapWithStateDStream

compute()里面也調用了getOrCompute()方法,其實和上面調用的一樣,都是Dstream的,這里主要看的是使用createFromRDD()生成的StateRDD。

4. MapWithStateRDD

這個StateRDD就是參與狀態計算的數據集合,首先看它是如何生成的:

再看看StateRDD的compute()是如何計算的:

從compute()看出,當doFullScan為true的時候,才會觸發過期key的清除,updateRecordWithData()負責全面掃描清除過期key

這不,思路就來了,我們只要找到開啟FullScan的方法,不就可以自行觸發清除機制了嗎!

那么,我們先看看doFullScan的默認值:

默認是沒開啟的,接着通過快捷鍵看看哪些地方使用了doFullScan:

從圖中看出,有兩處代碼修改了doFullScan,我們找到這兩處代碼:

第一個基本上排除,那么就剩下第二個:checkpoint(),我們要知道的是,狀態操作必須要checkpoint

還記得在2中的getOrCompute()嗎,當checkpointDuration不為null的時候,調用checkpoint()。
我們來看3中InternalMapWithStateDStream是如何定義這個duration的:

如圖,sideDuration是窗口時間,乘以系數10就是默認的checkpoint時長,所以當我設置窗口為3s時,checkpoint周期就是30s,30s才會清理一次過期key。

而通過checkpoint(interval)可以設置checkpoint的間隔,所以覆蓋了上面程序中默認的30s。

5.MapWithStateRDDRecord

最后提一提,FullScan是在這個類中開啟的,所以先看看這個Record的注釋介紹:

意思就是負責存儲StateRDD的狀態KV,updateRecordWithData()負責清除過期的Record,我們來看看這個方法的實現:

removeTimedoutData就是是否開啟全面掃描,即doFullScan的值。

結語

寫完看起來感覺真的是簡簡單單,邏輯看起來也比較清晰,但是自己去解決這個問題的時候也是花了一下午時間,過期key的清除與checkpoint有關也是我憑空弄猜想,然后分析了兩次,某一瞬間才找到他們之間的關系。所以說,猜想和運氣還是很重要的。

當然,找不到關於這塊的文章和資料可能是因為這個知識點太小了。所以這次過后,要開始系統閱讀Spark源碼了,也希望在某一天能結合着自己的理解,寫一下Spark的文章。



95后小程序員,寫的都是日常工作中的親身實踐,置身於初學者的角度從0寫到1,詳細且認真。

文章會在公眾號 [入門到放棄之路] 首發,期待你的關注。

感謝每一份關注


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM