Flink ProcessFunction API


我們之前學習的轉換算子是無法訪問事件的時間戳信息和水位線信息的。而這在一些應用場景下,極為重要。

例如MapFunction這樣的map轉換算子就無法訪問時間戳或者當前事件的事件時間。

基於此,DataStream API提供了一系列的Low-Level轉換算子。

可以訪問時間戳、watermark以及注冊定時事件。還可以輸出特定的一些事件,例如超時事件等。

Process Function用來構建事件驅動的應用以及實現自定義的業務邏輯(使用之前的window函數和轉換算子無法實現)。

例如,Flink SQL就是使用Process Function實現的。

Flink提供了8個Process Function:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

1、KeyedProcessFunction

KeyedProcessFunction用來操作KeyedStream。

KeyedProcessFunction會處理流的每一個元素,輸出為0個、1個或者多個元素。

所有的Process Function都繼承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。

而KeyedProcessFunction[KEY, IN, OUT]還額外提供了兩個方法:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一個元素都會調用這個方法,調用結果將會放在Collector數據類型中輸出。Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs)。
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一個回調函數。當之前注冊的定時器觸發時調用。參數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context參數一樣,提供了上下文的一些信息,例如定時器觸發的時間信息(事件時間或者處理時間)。
 1 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
 2 env.setParallelism(1)
 3 
 4 val dataDS: DataStream[String] = env.readTextFile("input/data1.txt")
 5 
 6 val mapDS: DataStream[(String, Long, Int)] = dataDS.map(data => {
 7     val datas = data.split(",")
 8     (datas(0), datas(1).toLong, datas(2).toInt)
 9 })
10 
11 mapDS.keyBy(0)
12    .process(
13             new KeyedProcessFunction[Tuple,(String, Long, Int), String]{
14                 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, Long, Int), String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out)
15                 
16 override def processElement(value: (String, Long, Int), ctx: KeyedProcessFunction[Tuple, (String, Long, Int), String]#Context, out: Collector[String]): Unit = {
17                     println(ctx.getCurrentKey)
18                     out.collect(value.toString())
19                 }
20             }
21         ).print("keyprocess:")

 

2、TimerService 和 定時器(Timers)

Context和OnTimerContext所持有的TimerService對象擁有以下方法:

  • currentProcessingTime(): Long 返回當前處理時間
  • currentWatermark(): Long 返回當前watermark的時間戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 會注冊當前key的processing time的定時器。當processing time到達定時時間時,觸發timer。
  • registerEventTimeTimer(timestamp: Long): Unit 會注冊當前key的event time 定時器。當水位線大於等於定時器注冊的時間時,觸發定時器執行回調函數。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 刪除之前注冊處理時間定時器。如果沒有這個時間戳的定時器,則不執行。
  • deleteEventTimeTimer(timestamp: Long): Unit 刪除之前注冊的事件時間定時器,如果沒有此時間戳的定時器,則不執行。

當定時器timer觸發時,會執行回調函數onTimer()。

注意定時器timer只能在keyed streams上面使用。

需求:監控水位傳感器的水位值,如果水位值在五秒之內(processing time)連續上升,則報警。

 1  def main(args: Array[String]): Unit = {
 2     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
 3     env.setParallelism(1)
 4     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 5 
 6     val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999)
 7     val waterDS = dataDS.map(
 8         data=>{
 9             val datas = data.split(",")
10             WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
11         }
12     )
13 
14     // 設定數據的事件時間已經定義Watermark
15     val markDS: DataStream[WaterSensor] = waterDS.assignAscendingTimestamps(_.ts * 1000)
16 
17     //對分區后的數據進行處理
18     markDS.keyBy(_.id)
19         .process( new KeyedProcessFunction[String, WaterSensor, String] {
20 
21             private var lastWaterVal : Int = 0
22             private var alarmTS : Long = 0L
23             
24             // TODO 當連續五秒水位上升時,需要發出警報
25             override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext, out: Collector[String]): Unit = {
26                 out.collect("當前傳感器連續五秒水位上升")
27                 alarmTS = 0L
28             }
29 
30             // 處理每一條傳感器數據
31             override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
32 
33                 // 當前傳感器的值是否大於上一次的值
34                 if ( value.vc > lastWaterVal && alarmTS == 0 ) {
35                     // 如果中間過程不被打斷,那么需要5s后報警
36                     alarmTS = value.ts * 1000 + 5000
37                     ctx.timerService().registerEventTimeTimer(alarmTS)
38                 } else if ( value.vc < lastWaterVal || lastWaterVal == 0 ) {
39                     // TODO 如果水位下降
40                     ctx.timerService().deleteEventTimeTimer(alarmTS)
41                     alarmTS = 0L
42                 }
43 
44                 // 設定lastWaterVal的值等於當前傳感器的值
45                 lastWaterVal = value.vc
46             }
47         } ).print("alarm>>>>")
48     markDS.print("mark>>>>>>>")
49     env.execute()
50     
51 }

 

def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment;
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    

    val dataDS: DataStream[String] = env.socketTextStream("linux1", 9999)
    val waterDS = dataDS.map(
        data=>{
            val datas = data.split(",")
            WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
        }
    )

    // 設定數據的事件時間已經定義Watermark
    val markDS: DataStream[WaterSensor] = waterDS.assignAscendingTimestamps(_.ts * 1000)

    // TODO 對分區后的數據進行處理
    markDS.keyBy(_.id)
        .process( new KeyedProcessFunction[String, WaterSensor, String] {

            //private var lastWaterVal : Int = 0
            //private var alarmTS : Long = 0L
            private var lastWaterVal:ValueState[Int] =
                getRuntimeContext.getState(
                    new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int])
                )

            private var alarmTS:ValueState[Long] =
                getRuntimeContext.getState(
                    new ValueStateDescriptor[Long]("alarmTS", classOf[Long])
                )

//                    private var lastWaterVal:ValueState[Int] = _
//                    private var alarmTS:ValueState[Long] = _

            // 初始化,一般完成狀態對象的初始化
            override def open(parameters: Configuration): Unit = {
//                        lastWaterVal =
//                            getRuntimeContext.getState(
//                                new ValueStateDescriptor[Int]("lastWaterVal", classOf[Int])
//                            )
//                        alarmTS =
//                            getRuntimeContext.getState(
//                                new ValueStateDescriptor[Long]("alarmTS", classOf[Long])
//                            )
            }

            // TODO 當連續五秒水位上升時,需要發出警報
            override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, WaterSensor, String]#OnTimerContext, out: Collector[String]): Unit = {
                out.collect("當前傳感器連續五秒水位上升")
                // 清空預警狀態
                alarmTS.clear()
            }

            // 處理每一條傳感器數據
            override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {

                val lastVal = lastWaterVal.value()
                val alarm = alarmTS.value()
                
                // TODO 當前傳感器的值是否大於上一次的值
                if ( value.vc > lastVal && alarm == 0 ) {
                    // 如果中間過程不被打斷,那么需要5s后報警
                    val newTS = value.ts * 1000 + 5000
                    // 更新預警狀態
                    alarmTS.update(newTS)
                    ctx.timerService().registerEventTimeTimer(newTS)
                } else if ( value.vc < lastVal || lastVal == 0 ) {
                    // TODO 如果水位下降
                    ctx.timerService().deleteEventTimeTimer(alarm)
                    alarmTS.clear()
                }

                // 設定lastWaterVal的值等於當前傳感器的值
                lastWaterVal.update(value.vc)
            }
        } ).print("alarm>>>>")
    markDS.print("mark>>>>>>>")
    env.execute()
    
}

 

3、側輸出流(SideOutput)

  • 大部分的DataStream API的算子的輸出是單一輸出,也就是某種數據類型的流。
  • 除了split算子,可以將一條流分成多條流,這些流的數據類型也都相同。
  • process function的side outputs功能可以產生多條流,並且這些流的數據類型可以不一樣。
  • 一個side output可以定義為OutputTag[X]對象,X是輸出流的數據類型。
  • process function可以通過Context對象發送一個事件到一個或者多個side outputs。

When using side outputs, you first need to define an OutputTag that will be used to identify a side output stream:

val outputTag = OutputTag[String]("side-output")

 

Emitting data to a side output is possible from the following functions:

You can use the Context parameter, which is exposed to users in the above functions, to emit data to a side output identified by an OutputTag. Here is an example of emitting side output data from a ProcessFunction:

val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("side-output")

val mainDataStream = input
  .process(new ProcessFunction[Int, Int] {
    override def processElement(
        value: Int,
        ctx: ProcessFunction[Int, Int]#Context,
        out: Collector[Int]): Unit = {
      // emit data to regular output
      out.collect(value)

      // emit data to side output
      ctx.output(outputTag, "sideout-" + String.valueOf(value))
    }
  })

 

For retrieving the side output stream you use getSideOutput(OutputTag) on the result of the DataStream operation. This will give you a DataStream that is typed to the result of the side output stream:

val outputTag = OutputTag[String]("side-output")

val mainDataStream = ...

val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM