Flink State 狀態


常用 State

Flink 有兩種常見的 State類型,分別是:

Keyed State (鍵控狀態)

Operator State(算子狀態)

1) Keyed State(鍵控狀態)

Keyed State:顧名思義就是基於 KeyedStream 上的狀態,這個狀態是跟特定的Key綁定的。KeyedStrean 流上的每一個Key,都對應一個 State。Flink針對Keyed State提供了以下可以保存 State 的數據結構:

※ ValueState<T>:保存一個可以更新和檢索的值(如上所述,每個值都對應到當前的輸入數據的 Key,

因此算子接收到的每個Key都可能對應一個值)。這個值可以通過 update(T)進行更新,通過 T value() 進行檢索。

※ ListState<T>:保存一個元素的列表。可以往這個列表中追加數據,並在當前的列表上進行檢索。可以通過 add(T) 或者 addAll(List<T>) 進行添加元素,通過 Iterable<T> get() 獲取整個列表。還可以通過 update(List<T>) 覆蓋當前的列表。

※ ReducingState<T>:保存一個單值,表示添加到狀態的所有聚合。接口與ListState 類似,使用 add(T)增加元素,會使用提供的 ReduceFunction 進行聚合

※ AggregatingState<IN.OUT>:保留一個單值,表示添加到狀態的所以值得聚合。和ReducingState 相反得是,聚合類型可能與添加到狀態得元素得類型不同。接口與 ListState 類似,但使用 add(IN) 添加的元素會用指定的 AggregateFunction 進行聚合。

※ FoldingState<T,ACC>:保留一個單值,表示添加到狀態的所有值的聚合。與 ReducingState 相反,聚合類型可能與添加到狀態的元素類型不同。接口與ListState類型,但使用 add(T) 添加的元素會用指定的FoldFunction 折疊成聚合值。

※ MapState<UK,UV>:維護了一個添加映射列表。你可以添加鍵值對到狀態中,也可以獲得反映當前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 分別檢索映射、鍵和值的可迭代視圖。

2)Operator State(算子狀態)

Operator State 與 Key 無關,而是與 Operator 綁定,整個 Operator 只對應一個 State。比如:Flink 中的 Kafka Connector 就使用了 Operator State,它會在每個 Connector 實例中,保存該實例消費 Topic 的所有(partition,offset)映射。

 


 

station.log 內容:

station_0,18600003612,18900004575,barring,1577080453123,0 station_9,18600003686,18900002113,success,1577080453123,32 station_3,18600003794,18900009608,success,1577080453123,4 station_1,18600000005,18900007729,fail,1577080453123,0 station_8,18600007461,18900006987,barring,1577080453123,0 station_5,18600009356,18900006066,busy,1577080455129,0 station_4,18600001914,18900003949,busy,1577080455129,0

 

StationLog類:

/**
 *
 * @param sid      基站ID
 * @param callOut  主叫號碼
 * @param callIn   被叫號碼
 * @param callType 通話類型eg:呼叫失敗(fail),占線(busy),拒接(barring),接通(success):
 * @param callTime 呼叫時間戳,精確到毫秒
 * @param duration 通話時長 單位:秒
 */
case class StationLog(sid: String, callOut: String, callIn: String, callType: String, callTime: Long, duration: Long)

 

實現代碼:

package com.apple.flink.keyedstate

import com.apple.flink.source.StationLog
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

/**
 * 統計每個 手機呼叫間隔時間,並輸出
 */
object StateCallInterval {

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    streamEnv.setParallelism(1)

    //導入隱式轉換,建議寫在這里,可以防止IDEA代碼提示出錯的問題
    import org.apache.flink.streaming.api.scala._

    //讀取文件數據
    val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
      .map(line => {
        var arr = line.split(",")
        new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
      })

    //方法一
    data.keyBy(_.callIn) //按照呼叫手機號分組
      .flatMap(new CallIntervalFunction())
      .print()

    //方法二:調用 flatMapWithState算子
    data.keyBy(_.callIn) //按照呼叫手機號分組
      .flatMapWithState[(String, Long), StationLog] {
        case (in: StationLog, None) => (List.empty, Some(in)) //如果狀態中沒有,則存入
        case (in: StationLog, pre: Some[StationLog]) => { //如果狀態中有值則計算時間間隔
          var interval = in.callTime - pre.get.callTime
          (List((in.callIn, interval)), Some(in))
        }
      }.print()

    //方法三 mapWithState 算子也可以
    data.keyBy(_.callIn) //按照呼叫手機號分組
      .mapWithState[(String, Long), StationLog] {
        case (in: StationLog, None) => ((in.callIn, 0), Some(in))
        case (in: StationLog, pre: Some[StationLog]) => {
          var interval = in.callTime - pre.get.callTime
          ((in.callIn, interval), Some(in))
        }
      }.print()

    streamEnv.execute()

  }

  class CallIntervalFunction extends RichFlatMapFunction[StationLog, (String, Long)] {

    //定義一個保存前一條呼叫的數據的狀態對象
    private var preData: ValueState[StationLog] = _

    override def open(parameters: Configuration): Unit = {
      val stateDescriptor = new ValueStateDescriptor[StationLog]("pre", classOf[StationLog])
      preData = getRuntimeContext.getState(stateDescriptor)
    }

    override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = {
      var pre: StationLog = preData.value()
      if (pre == null) { //如果狀態中沒有,則存入
        preData.update(in)
      } else { //如果狀態有值則計算時間間隔var
        var interval = in.callTime - pre.callTime
        collector.collect((in.callIn, interval))
      }
    }
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM