FLINK-窗口操作符-觸發器Trigger


Trigger作用在WindowStream上,也就是說,trigger是跟隨在window()算子之后的。Trigger決定了窗口中的數據何時可以被window function處理, 每一個窗口分配器都有一個默認的觸發器,如果默認的觸發器不能滿足需要,你可以通過調用WindowedStream.trigger(...)來指定一個自定義的觸發器。
例如:TumblingEventTimeWindows(滾動窗口)默認觸發器為EventTimeTrigger,默認情況下在當前水印時間大於等於當前窗口最大時間(窗口結束時間-1)時觸發window function。

和onTimer的區別:

1.onTimer用在process function中,也就是說,onTimer是基於DataStream和KeyedStream的。

2.trigger是用在window func

Trigger有 5 個方法來允許觸發器處理不同的事件:

onElement()方法,每個元素被添加到窗口時調用

onEventTime()方法,當一個已注冊的事件時間計時器啟動時調用

onProcessingTime()方法,當一個已注冊的處理時間計時器啟動時調用

onMerge()方法,與狀態性觸發器相關,當使用會話窗口時,兩個觸發器對應的窗口合並時,合並兩個觸發器的狀態。

clear()方法,執行任何需要清除的相應窗口。

上面的方法中有兩個需要注意的地方:
1.前三個方法通過返回一個TriggerResult來決定如何操作調用他們的事件,這些操作可以是下面操作中的一個:
CONTINUE:什么也不做
FIRE:觸發計算
PURGE:清除窗口中的數據
FIRE_AND_PURGE:觸發計算並清除窗口中的數據
2.這些函數可以注冊 “處理時間定時器” 或者 “事件時間計時器”,被用來為后續的操作使用,比如:ctx.registerEventTimeTimer(window.maxTimestamp())
默認情況下,內置的觸發器只返回 FIRE,不會清除窗口狀態
注意:清除將僅刪除窗口的內容,並將保留有關該窗口的任何潛在元信息和任何觸發狀態。
觸發器可以訪問流的時間屬性以及定時器,還可以對state狀態編程。所以觸發器和process function一樣強大。例如我們可以實現一個觸發邏輯:當窗口接收到一定數量的元素時,觸發器觸發。再比如當窗口接收到一個特定元素時,觸發器觸發。還有就是當窗口接收到的元素里面包含特定模式(5秒鍾內接收到了兩個同樣類型的事件),觸發器也可以觸發。在一個事件時間的窗口中,一個自定義的觸發器可以提前(在水位線沒過窗口結束時間之前)計算和發射計算結果。這是一個常見的低延遲計算策略,盡管計算不完全,但不像默認的那樣需要等待水位線沒過窗口結束時間。
下面的例子展示了一個觸發器在窗口結束時間之前觸發。當第一個事件被分配到窗口時,這個觸發器注冊了一個定時器,定時時間為水位線之前一秒鍾。當定時事件執行,將會注冊一個新的定時事件,這樣,這個觸發器每秒鍾最多觸發一次。
自定義實現: 通過觸發器在15秒的窗口內每秒觸發一次計算。
import com.atguigu.StreamingJob.{SensorReading, SensorSource}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case  class SensorReading(id:String,timestamp :Long,temperature : Double)
object TriggersTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設置並行度
    env.setParallelism(1)
    //設置事件時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //獲取事件源
    val stream = env.addSource(new SensorSource)
    val setTime = stream.assignAscendingTimestamps(_.timestamp)
    //設置事件時間的獲取方式
   val result = setTime.keyBy(_.id).timeWindow(Time.seconds(15)).trigger(new OneSecondIntervalTrigger ).process(new AllWindom)
   result.print()
   env.execute()
  }
  //創建窗口的全量函數 in out key windom
  class  AllWindom extends ProcessWindowFunction[SensorReading,(String,Double,Double,Long),String,TimeWindow]{
    override def process(key: String,
                         context: Context,
                         elements: Iterable[SensorReading],
                         out: Collector[(String, Double, Double,Long)]): Unit = {
      var doubles: Iterable[Double] = elements.map(_.temperature)
    out.collect( (key,doubles.max,doubles.min,context.window.getEnd))
    }
  }
}
//設置一秒鍾一次的觸發器
class OneSecondIntervalTrigger extends  Trigger[SensorReading , TimeWindow]{
  //回調函數
  override def onEventTime(l: Long, //觸發定時器的時間,即前文設置的定時時間,默認窗口結束時會調用一次
                           w: TimeWindow, //窗口
                           triggerContext: Trigger.TriggerContext): TriggerResult = {
//判斷l是否為窗口的結束時間
    if(l==w.getEnd){
      //觸發窗口的計算,並且清空數據
      print("=============================")
      TriggerResult.FIRE_AND_PURGE
    }else{
val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000))
      if(t<w.getEnd){
        triggerContext.registerEventTimeTimer(t)
      }
    }
    //支觸發計算
    TriggerResult.FIRE
  }
//這是系統時間的 ,不執行業務邏輯
  override def onProcessingTime(l: Long,
                                w: TimeWindow,
                                triggerContext: Trigger.TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
  //每個窗口的結束時調用
  override def clear(w: TimeWindow,
                     triggerContext: Trigger.TriggerContext): Unit = {
    val firstSeen: ValueState[Boolean] = triggerContext
      .getPartitionedState(
        new ValueStateDescriptor[Boolean](
          "firstSeen", classOf[Boolean]
        )
      )
    //注銷之前設置的事件
    firstSeen.clear()
  }
  //每個數據調用一次
  override def onElement(t: SensorReading,
                         l: Long,
                         w: TimeWindow,
                         triggerContext: Trigger.TriggerContext): TriggerResult = {
    //獲取分區的狀態變量
    var firstSeen: ValueState[Boolean] = triggerContext.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen", Types.of[Boolean]))
    //每個窗口的第一個數據才會進入到里面設置回調事件的事件
    if(!firstSeen.value()){
      //獲取當前水位線
      val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000))
      //注冊事件時間的回調事件,注冊下一秒的事件
      triggerContext.registerEventTimeTimer(t)
      //注冊窗口結束時的事件
      triggerContext.registerEventTimeTimer(w.getEnd)
      //關閉時間的注冊,保證每一秒內的事件不重復注冊
      firstSeen.update(true)
    }
   TriggerResult.CONTINUE
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM