Trigger作用在WindowStream上,也就是說,trigger是跟隨在window()算子之后的。Trigger決定了窗口中的數據何時可以被window function處理, 每一個窗口分配器都有一個默認的觸發器,如果默認的觸發器不能滿足需要,你可以通過調用WindowedStream.trigger(...)來指定一個自定義的觸發器。
例如:TumblingEventTimeWindows(滾動窗口)默認觸發器為EventTimeTrigger,默認情況下在當前水印時間大於等於當前窗口最大時間(窗口結束時間-1)時觸發window function。
和onTimer的區別:
1.onTimer用在process function中,也就是說,onTimer是基於DataStream和KeyedStream的。
2.trigger是用在window func
Trigger有 5 個方法來允許觸發器處理不同的事件:
onElement()方法,每個元素被添加到窗口時調用
onEventTime()方法,當一個已注冊的事件時間計時器啟動時調用
onProcessingTime()方法,當一個已注冊的處理時間計時器啟動時調用
onMerge()方法,與狀態性觸發器相關,當使用會話窗口時,兩個觸發器對應的窗口合並時,合並兩個觸發器的狀態。
clear()方法,執行任何需要清除的相應窗口。
上面的方法中有兩個需要注意的地方:
1.前三個方法通過返回一個TriggerResult來決定如何操作調用他們的事件,這些操作可以是下面操作中的一個:
CONTINUE:什么也不做
FIRE:觸發計算
PURGE:清除窗口中的數據
FIRE_AND_PURGE:觸發計算並清除窗口中的數據
2.這些函數可以注冊 “處理時間定時器” 或者 “事件時間計時器”,被用來為后續的操作使用,比如:ctx.registerEventTimeTimer(window.maxTimestamp())
默認情況下,內置的觸發器只返回 FIRE,不會清除窗口狀態
注意:清除將僅刪除窗口的內容,並將保留有關該窗口的任何潛在元信息和任何觸發狀態。
觸發器可以訪問流的時間屬性以及定時器,還可以對state狀態編程。所以觸發器和process function一樣強大。例如我們可以實現一個觸發邏輯:當窗口接收到一定數量的元素時,觸發器觸發。再比如當窗口接收到一個特定元素時,觸發器觸發。還有就是當窗口接收到的元素里面包含特定模式(5秒鍾內接收到了兩個同樣類型的事件),觸發器也可以觸發。在一個事件時間的窗口中,一個自定義的觸發器可以提前(在水位線沒過窗口結束時間之前)計算和發射計算結果。這是一個常見的低延遲計算策略,盡管計算不完全,但不像默認的那樣需要等待水位線沒過窗口結束時間。
下面的例子展示了一個觸發器在窗口結束時間之前觸發。當第一個事件被分配到窗口時,這個觸發器注冊了一個定時器,定時時間為水位線之前一秒鍾。當定時事件執行,將會注冊一個新的定時事件,這樣,這個觸發器每秒鍾最多觸發一次。
自定義實現: 通過觸發器在15秒的窗口內每秒觸發一次計算。
import com.atguigu.StreamingJob.{SensorReading, SensorSource} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector case class SensorReading(id:String,timestamp :Long,temperature : Double) object TriggersTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //設置並行度 env.setParallelism(1) //設置事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //獲取事件源 val stream = env.addSource(new SensorSource) val setTime = stream.assignAscendingTimestamps(_.timestamp) //設置事件時間的獲取方式 val result = setTime.keyBy(_.id).timeWindow(Time.seconds(15)).trigger(new OneSecondIntervalTrigger ).process(new AllWindom) result.print() env.execute() } //創建窗口的全量函數 in out key windom class AllWindom extends ProcessWindowFunction[SensorReading,(String,Double,Double,Long),String,TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[(String, Double, Double,Long)]): Unit = { var doubles: Iterable[Double] = elements.map(_.temperature) out.collect( (key,doubles.max,doubles.min,context.window.getEnd)) } } } //設置一秒鍾一次的觸發器 class OneSecondIntervalTrigger extends Trigger[SensorReading , TimeWindow]{ //回調函數 override def onEventTime(l: Long, //觸發定時器的時間,即前文設置的定時時間,默認窗口結束時會調用一次 w: TimeWindow, //窗口 triggerContext: Trigger.TriggerContext): TriggerResult = { //判斷l是否為窗口的結束時間 if(l==w.getEnd){ //觸發窗口的計算,並且清空數據 print("=============================") TriggerResult.FIRE_AND_PURGE }else{ val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000)) if(t<w.getEnd){ triggerContext.registerEventTimeTimer(t) } } //支觸發計算 TriggerResult.FIRE } //這是系統時間的 ,不執行業務邏輯 override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } //每個窗口的結束時調用 override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { val firstSeen: ValueState[Boolean] = triggerContext .getPartitionedState( new ValueStateDescriptor[Boolean]( "firstSeen", classOf[Boolean] ) ) //注銷之前設置的事件 firstSeen.clear() } //每個數據調用一次 override def onElement(t: SensorReading, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { //獲取分區的狀態變量 var firstSeen: ValueState[Boolean] = triggerContext.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen", Types.of[Boolean])) //每個窗口的第一個數據才會進入到里面設置回調事件的事件 if(!firstSeen.value()){ //獲取當前水位線 val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000)) //注冊事件時間的回調事件,注冊下一秒的事件 triggerContext.registerEventTimeTimer(t) //注冊窗口結束時的事件 triggerContext.registerEventTimeTimer(w.getEnd) //關閉時間的注冊,保證每一秒內的事件不重復注冊 firstSeen.update(true) } TriggerResult.CONTINUE }