Flink之ProcessFunction的使用(1):定時器和狀態管理的使用


相關文章鏈接

Flink之ProcessFunction的使用(1):定時器和狀態管理的使用

Flink之ProcessFunction的使用(2):側輸出流的使用

具體實現代碼如下所示:

main函數中代碼如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999)

val sensorStream: DataStream[SensorReading] = socketStream.map(new MyMapToSensorReading)

// 檢測每一個傳感器的溫度是否在10秒內連續上升(連續10秒上升后報警,並之后每一秒都報警,當溫度下降時重新計時)
sensorStream.keyBy(_.id).process(new TempIncreWarning(10 * 1000))


env.execute("TimerAndStateDemo")

自定義類實現KeyedProcessFunction接口(KeyedProcessFunction接口為ProcessFunction接口的子接口):

/**
 * 自定義的溫度持續上升警報類
 * @param duration 溫度持續上升時間
 */
class TempIncreWarning(duration:Long) extends KeyedProcessFunction[String, SensorReading, String] {

    /**
     * 知識點:
     *  1、ProcessFunction中的狀態跟類的成員變量類似,但需要從環境上下文中獲取,即運行了才有狀態
     *  2、在方法中使用狀態跟使用成員變量一樣,但狀態相對成員變量來說,安全性高、不會跟其他key沖突,一般使用狀態
     */
    // 因為需要跟上一個溫度做對比,將上一個溫度封裝到狀態中(狀態可以通過運行時上下文獲取,也因為狀態是在運行時上下文中,所以定義成lazy狀態,防止編譯時報錯)
    lazy val lastTempState: ValueState[Double] =getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
    // 為了方便刪除定時器,還需要保存定時器的時間戳
    lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("cur-timer-ts", classOf[Long]))

    /**
     * 出來流中元素的方法,流中每一條元素都會調用一次此方法
     * @param value 流中的元素
     * @param ctx 環境上下文
     * @param out 將結果輸出的類
     */
    override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
        // 1、先將狀態中的數據取出
        val lastTemp: Double = lastTempState.value()
        val curTimerTs: Long = curTimerTsState.value()

        // 2、更新溫度
        lastTempState.update(value.temperature)

        // 3、判斷溫度變化,並采取對應操作
        if (value.temperature > lastTemp && curTimerTs == 0){
            // 3.1、當現在溫度比上一個溫度高,並且沒有定時器時,注冊一個定時器
            val ts: Long = ctx.timerService().currentProcessingTime() + duration
            ctx.timerService().registerProcessingTimeTimer(ts)
            curTimerTsState.update(ts)
        } else if (value.temperature < lastTemp){
            // 3.2、如果溫度下降,刪除定時器
            ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
            curTimerTsState.clear()
        }
        // 3.3、當溫度沒變化時,不做操作
    }

    /**
     * 定時器觸發時執行的操作(說明10s內沒有溫度值下降,報警)
     * @param timestamp 時間戳
     * @param ctx 環境上下文
     * @param out 輸出類
     */
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
        val key: String = ctx.getCurrentKey
        out.collect(s"傳感器:$key, 已連續" + duration/1000 + "秒溫度持續上升")
        curTimerTsState.clear()
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM