Flink| ProcessFunction API(底層API)


 

ProcessFunction API

        之前的轉換算子 是無法訪問事件的時間戳信息和 水位線 信息的。而這在一些應用場景下極為重要。例如 MapFunction 這樣的 map 轉換算子就無法訪問時間戳或者當前事件的事件時間。基於此,

DataStream API 提供了一系列的 Low Level 轉換算子。可以訪問時間戳、 watermark 以及注冊定時事件 。還可以輸出特定的一些事件 ,例如超時事件等。Process Function 用來構建事件驅動的應用以及實現

自定義的業務邏輯 使用之前的window 函數和轉換算子無法實現 。

例如, Flink SQL 就是使用 Process Function 實現的。

Flink提供了 8 個 Process Function

  •  ProcessFunction
  •  KeyedProcessFunction
  •  CoProcessFunction
  •  ProcessJoinFunction
  •  BroadcastProcessFunction
  •  KeyedBroadcastProcessFunction
  •  ProcessWindowFunction
  • ProcessAllWindowFunction
KeyedProcessFunction 用來操作KeyedStream 
KeyedProcessFunction會處理流的每一個元素(每條數據來了之后都可以處理、過程處理函數),輸出為
0個、1個或者多個元素。
所有的 Process Function 都繼承自RichFunction接口(富函數,它可以有各種生命周期、狀態的一些操作,獲取watermark、定義鬧鍾定義定時器等),
所以都有open()、close()和getRuntimeContext() 等方法。
而KeyedProcessFunction[KEY, IN, OUT] 還額外提供了兩個方法:  ①.processElement(I value, Context ctx, Collector<O> OUt), 流中的每一個元素都會調用這個方法,調用結果將會放在Collector數據類型中輸出。
    Context可以訪問元素的時間戳,元素的key,以及TimerService時間服務。Context還可以將結果輸出到別的流(side outputs) ②.onTimer( long timestamp, OnTimerContext ctx, Collector<O> OUT )是一個回調函數。當之前注冊的定時器觸發時調用(定時器觸發時候的操作)。
    參數timestamp為定時器所設定的觸發的時間戳。Collector為輸出結果的集合。OnTimerContext和processElement的Context 參數一樣,提供了上下文的一些信息,
    例如定時器觸發的時間信息: 事件時間或者處理時間 。

 

TimerService 和 定時器 Timers Context和OnTimerContext所持有的TimerService對象擁有以下方法:   long currentProcessingTime() 返回當前處理時間   long currentWatermark() 返回當前watermark的時間戳   void registerProcessingTimeTimer(long timestamp) 會注冊當前key的processing time的定時器。當processing time到達定時時間時,觸發timer。   void registerEventTimeTimer(long timestamp) 會注冊當前key的event time 定時器。當水位線大於等於定時器注冊的時間時,觸發定時器執行回調函數。   void deleteProcessingTimeTimer(long timestamp) 刪除之前注冊處理時間定時器。如果沒有這個時間戳的定時器,則不執行。   void deleteEventTimeTimer(long timestamp) 刪除之前注冊的事件時間定時器,如果沒有此時間戳的定時器,則不執行。
當定時器timer觸發時,會執行回調函數onTimer()。注意定時器timer只能在keyed streams上面使用。

 

 

1. KeyedProcessFunction 如何操作 KeyedStream 

  需求:監控溫度傳感器的溫度值,如果溫度值在一秒鍾之內(processing time)連續上升,則報警。

import com.xxx.fink.api.sourceapi.SensorReading
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic


/**
  * 1s之內溫度連續上升就報警
  */
object ProcessFunctionTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream: DataStream[String] = env.socketTextStream("hadoop101", 7777)

    val dataStream: DataStream[SensorReading] = stream.map(data => {
      val dataArray: Array[String] = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
      })
    
    val processedStream: DataStream[String] = dataStream.keyBy(_.id)
      .process(new TempIncreAlert())

    dataStream.print("Input data:")
    processedStream.print("process data:")
    env.execute("Window test")

  }

}

class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String] {
  //溫度連續上升,跟上一條數據做對比; 把上一條數據保存成當前狀態
  //定義一個狀態,保存上一個數據的溫度值
  lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  //定義一個狀態,用來保存定時器的時間戳
  lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("currentTimer", classOf[Long]))

  override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
    //先取出上一個溫度值
    val preTemp = lastTemp.value()
    //更新溫度值
    lastTemp.update(value.temperature)

    val curTimerTs = currentTimer.value() //從當前定時器中取出來,肯定是有值的,默認值為0;

    //A. 如果溫度上升且沒有設過定時器,則注冊定時器
    if (value.temperature > preTemp && curTimerTs == 0) {
      val timerTs = ctx.timerService().currentProcessingTime() + 10000L //當前時間 + 10s
      ctx.timerService().registerProcessingTimeTimer(timerTs) //注冊定時器
      currentTimer.update(timerTs)

      //B. 如果溫度下降,或是第一條數據(定時器默認為0),刪除定時器並清空狀態
    } else if (preTemp > value.temperature || preTemp == 0.0) { //否則就刪除定時器
      ctx.timerService().deleteProcessingTimeTimer(curTimerTs)
      currentTimer.clear()  //把對應的轉態清空; 不然它一直漲會撐爆內存

    }
  }
  //在回調函數中定義: 定時器要做的事情
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
    //輸出報警信息
    out.collect(ctx.getCurrentKey + "溫度連續上升")
    currentTimer.clear() //把狀態清空
  }

}

 

 

Input data:> SensorReading(sensor_1,1547718199,35.0)
Input data:> SensorReading(sensor_1,1547718199,36.0)
process data:> sensor_1溫度連續上升

 

2. 側輸出流(SideOutput)

     大部分的DataStream API 的大多數算子的輸出是單一輸出(從一條流出來還是一條流)。

除了split算子,可以將一條流分成多條流,這些流的數據類型也都相同。 

process function的side outputs功能可以產生多條流,並且這些流的數據類型可以不一樣。

一個 side output 可以定義為 Out putTag[X] 對象,X是輸出流的數據類型。 process function 可以通過Context對象發射一個事件到一個或者多個 side outputs。

 

 

 

監控傳感器溫度值,將溫度值低於30度的數據輸出到side output。

import com.xxx.fink.api.sourceapi.SensorReading import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import org.apache.flink.api.scala._ /** * 側輸出流 代替split; 如果溫度低於某個值就放到另外一個流或者低溫報警,否則就輸出到正常流中。 */ object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.socketTextStream("hadoop101", 7777) val dataStream = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000 }) val processedStream: DataStream[SensorReading] = dataStream.process(new FreezingAlert()) processedStream.print("processed data: ") //打印的是主流  processedStream.getSideOutput(new OutputTag[String]("freezing alert")).print("alert data") //打印側輸出流  env.execute("Window test") } } //冰點報警,如果小於32F,輸出報警信息到側輸出流 class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading] { //繼承ProcessFunction,沒有keyBy了;后一個為主輸出流  lazy val alertOutPut: OutputTag[String] = new OutputTag[String]("freezing alert") override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { //如果小於32F,輸出報警信息到側輸出流 if (value.temperature < 32.0) { ctx.output(alertOutPut, "freezing alert for" + value.id) //用什么來標記側輸出流,OutputTag作為戳 //否則就輸出到正常流中 } else { out.collect(value) } } }

測試:

processed data: > SensorReading(sensor_1,1547718199,35.8) alert data> freezing alert forsensor_6 alert data> freezing alert forsensor_7 processed data: > SensorReading(sensor_10,1547718205,38.1)

 

3. CoProcessFunction

對於兩條輸入流,DataStream API提供了CoProcessFunction這樣的low-level操作。CoProcessFunction提供了操作每一個輸入流的方法: processElement1()和processElement2()。

類似於ProcessFunction,這兩種方法都通過Context對象來調用。這個Context對象可以訪問事件數據,定時器時間戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回調函數。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM