流處理 —— Spark Streaming中的操作(狀態管理函數 updateStateByKey和mapWithState)


狀態管理函數
  Spark Streaming中狀態管理函數包括updateStateByKey和mapWithState,都是用來統計全局key的狀態的變化的。它們以DStream中的數據進行按key做reduce操作,然后對各個批次的數據進行累加,在有新的數據信息進入或更新時。能夠讓用戶保持想要的不論任何狀狀。

1. updateStateByKey

概念

updateStateByKey會統計全局的key的狀態,不管又沒有數據輸入,它會在每一個批次間隔返回之前的key的狀態。updateStateByKey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函數操作。如果通過更新函數對state更新后返回來為none,此時刻key對應的state狀態會被刪除(state可以是任意類型的數據的結構)。

適用場景

updateStateByKey可以用來統計歷史數據,每次輸出所有的key值。例如統計不同時間段用戶平均消費金額,消費次數,消費總額,網站的不同時間段的訪問量等指標

使用實例

條件

1)首先會以DStream中的數據進行按key做reduce操作,然后再對各個批次的數據進行累加 。

2)updateStateBykey要求必須要設置checkpoint點

3)updateStateByKey 方法中 updateFunc就要傳入的參數,。Seq[V]表示當前key對應的所有值,Option[S] 是當前key的歷史狀態,返回的是新的封裝的數據。

代碼

object SparkStreamingUpdateStateByKey {

  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
      .appName("UpdateStateByKey")
      .master("local[3]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    // 必須設置checkpoint
    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    val wordCount: DStream[(String, Int)] = ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
      .flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
        /** state.getOrElse(0) 得到歷史可以值,如果key值不存在,則為0 */
        Option(values.foldLeft(state.getOrElse(0))(_ + _))
      })

    wordCount.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

 

2.  mapWithState

概念

mapWithState也會統計全局的key的狀態,但是如果沒有數據輸入,便不會返回之前的key的狀態,只會返回batch中存在的key值統計,類似於增量的感覺。

適用場景

mapWithState可以用於一些實時性較高,延遲較少的一些場景,例如你在某寶上下單買了個東西,付款之后返回你賬戶里的余額信息。

使用實例

條件

1)如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值

2)還可以指定timeout函數,該函數的作用是,如果一個key超過timeout設定的時間沒有更新值,那么這個key將會失效。這個控制需要在func中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之后,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。

3) checkpoint 不是必須的

object SparkStreamingMapWithState {

  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("UpdateStateByKeyDemo")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())


    // 可以不設置checkpoint
    ssc.checkpoint("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\checkpoint")

    val wordCount: MapWithStateDStream[String, Int, Int, Any] =
      ssc.textFileStream("file:\\D:\\workspace\\idea\\silent\\src\\main\\resources\\stream")
        .map((_, 1))
        .mapWithState(StateSpec.function(func).initialState(initialRDD).timeout(Seconds(30)))

    wordCount.print()

    ssc.start()
    ssc.awaitTermination()

  }

  /**
   * word : 代表統計的單詞
   * option:代表的是歷史數據(使用option是因為歷史數據可能有,也可能沒有,如第一次進來的數據就沒有歷史記錄)
   * state:代表的是返回的狀態
   */
  val func = (word: String, option: Option[Int], state: State[Int]) => {

    if (state.isTimingOut()) {
      println(word + "is timeout")
    } else {
      // getOrElse(0)不存在賦初始值為零
      val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
      // 單詞和該單詞出現的頻率/ 獲取歷史數據,當前值加上上一個批次的該狀態的值
      val wordFreq = (word, sum)
      state.update(sum)
      wordFreq
    }
  }

}

updateStateByKey和mapWithState的區別

  updateStateByKey可以在指定的批次間隔內返回之前的全部歷史數據,包括新增的,改變的和沒有改變的。由於updateStateByKey在使用的時候一定要做checkpoint,當數據量過大的時候,checkpoint會占據龐大的數據量,會影響性能,效率不高。

  mapWithState只返回變化后的key的值,這樣做的好處是,我們可以只是關心那些已經發生的變化的key,對於沒有數據輸入,則不會返回那些沒有變化的key的數據。這樣的話,即使數據量很大,checkpoint也不會像updateStateByKey那樣,占用太多的存儲,效率比較高(再生產環境中建議使用這個)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM