相關文章鏈接
Flink之ProcessFunction的使用(1):定時器和狀態管理的使用
Flink之ProcessFunction的使用(2):側輸出流的使用
具體實現代碼如下所示:
main函數中代碼如下:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val socketStream: DataStream[String] = env.socketTextStream("localhost", 9999) val sensorStream: DataStream[SensorReading] = socketStream.map(new MyMapToSensorReading) // 檢測每一個傳感器的溫度是否在10秒內連續上升(連續10秒上升后報警,並之后每一秒都報警,當溫度下降時重新計時) sensorStream.keyBy(_.id).process(new TempIncreWarning(10 * 1000)) env.execute("TimerAndStateDemo")
自定義類實現KeyedProcessFunction接口(KeyedProcessFunction接口為ProcessFunction接口的子接口):
/** * 自定義的溫度持續上升警報類 * @param duration 溫度持續上升時間 */ class TempIncreWarning(duration:Long) extends KeyedProcessFunction[String, SensorReading, String] { /** * 知識點: * 1、ProcessFunction中的狀態跟類的成員變量類似,但需要從環境上下文中獲取,即運行了才有狀態 * 2、在方法中使用狀態跟使用成員變量一樣,但狀態相對成員變量來說,安全性高、不會跟其他key沖突,一般使用狀態 */ // 因為需要跟上一個溫度做對比,將上一個溫度封裝到狀態中(狀態可以通過運行時上下文獲取,也因為狀態是在運行時上下文中,所以定義成lazy狀態,防止編譯時報錯) lazy val lastTempState: ValueState[Double] =getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double])) // 為了方便刪除定時器,還需要保存定時器的時間戳 lazy val curTimerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("cur-timer-ts", classOf[Long])) /** * 出來流中元素的方法,流中每一條元素都會調用一次此方法 * @param value 流中的元素 * @param ctx 環境上下文 * @param out 將結果輸出的類 */ override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { // 1、先將狀態中的數據取出 val lastTemp: Double = lastTempState.value() val curTimerTs: Long = curTimerTsState.value() // 2、更新溫度 lastTempState.update(value.temperature) // 3、判斷溫度變化,並采取對應操作 if (value.temperature > lastTemp && curTimerTs == 0){ // 3.1、當現在溫度比上一個溫度高,並且沒有定時器時,注冊一個定時器 val ts: Long = ctx.timerService().currentProcessingTime() + duration ctx.timerService().registerProcessingTimeTimer(ts) curTimerTsState.update(ts) } else if (value.temperature < lastTemp){ // 3.2、如果溫度下降,刪除定時器 ctx.timerService().deleteProcessingTimeTimer(curTimerTs) curTimerTsState.clear() } // 3.3、當溫度沒變化時,不做操作 } /** * 定時器觸發時執行的操作(說明10s內沒有溫度值下降,報警) * @param timestamp 時間戳 * @param ctx 環境上下文 * @param out 輸出類 */ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { val key: String = ctx.getCurrentKey out.collect(s"傳感器:$key, 已連續" + duration/1000 + "秒溫度持續上升") curTimerTsState.clear() } }
