flink 觸發器
觸發器確定窗口(由窗口分配程序形成)何時准備由窗口函數處理。每個WindowAssigner都帶有一個默認觸發器。
如果默認觸發器不適合需求,我們就需要自定義觸發器。
主要方法
觸發器接口有五種方法,允許觸發器對不同的事件作出反應
onElement()
添加到每個窗口的元素都會調用此方法。onEventTime()
當注冊的事件時間計時器觸發時,將調用此方法。onProcessingTime()
當注冊的處理時間計時器觸發時,將調用此方法。onMerge()
與有狀態觸發器相關,並在兩個觸發器對應的窗口合並時合並它們的狀態,例如在使用會話窗口時。(目前沒使用過,了解不多)clear()
執行刪除相應窗口時所需的任何操作。(一般是刪除定義的狀態、定時器等)
TriggerResult
onElement(),onEventTime(),onProcessingTime()
都要求返回一個TriggerResult
TriggerResult包含以下內容
- CONTINUE:表示啥都不做。
- FIRE:表示觸發計算,同時保留窗口中的數據
- PURGE:簡單地刪除窗口的內容,並保留關於窗口和任何觸發器狀態的任何潛在元信息。
- FIRE_AND_PURGE:觸發計算,然后清除窗口中的元素。(默認情況下,預先實現的觸發器只觸發而不清除窗口狀態。)
案例
- 需求
- 當窗口中的數據量達到一定數量的時候觸發計算
- 根據執行時間每隔一定時間且窗口中有數據觸發計算,如果沒有數據不觸發計算
- 窗口關閉的時候清除數據
實現過程
- 依賴
<properties>
<hadoop.version>3.1.1.3.1.0.0-78</hadoop.version>
<flink.version>1.9.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.7</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
- 實現代碼
//調用
dStream
.keyBy(_.event_id)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.trigger(new CustomTrigger(10, 1 * 60 * 1000L))
//-------------------------------------------------------------------------
package com.meda.demo
import java.text.SimpleDateFormat
import com.meda.utils.DatePattern
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
class CustomTrigger extends Trigger[eventInputDT, TimeWindow] {
//觸發計算的最大數量
private var maxCount: Long = _
//定時觸發間隔時長 (ms)
private var interval: Long = 60 * 1000
//記錄當前數量的狀態
private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
//記錄執行時間定時觸發時間的狀態
private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
//記錄時間時間定時器的狀態
private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])
def this(maxCount: Int) {
this()
this.maxCount = maxCount
}
def this(maxCount: Int, interval: Long) {
this(maxCount)
this.interval = interval
}
override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
val countState = ctx.getPartitionedState(countStateDescriptor)
//計數狀態加1
countState.add(1L)
//如果沒有設置事件時間定時器,需要設置一個窗口最大時間觸發器,這個目的是為了在窗口清除的時候 利用時間時間觸發計算,否則可能會缺少部分數據
if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) {
ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp())
ctx.registerEventTimeTimer(window.maxTimestamp())
}
if (countState.get() >= this.maxCount) {
//達到指定指定數量
//刪除事件時間定時觸發的狀態
ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
//清空計數狀態
countState.clear()
//觸發計算
TriggerResult.FIRE
} else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {
//未達到指定數量,且沒有指定定時器,需要指定定時器
//當前定時器狀態值加上間隔值
ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval)
//注冊定執行時間定時器
ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
TriggerResult.CONTINUE
} else {
TriggerResult.CONTINUE
}
}
// 執行時間定時器觸發
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) {
println(s"數據量未達到 $maxCount ,由執行時間觸發器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 觸發計算")
ctx.getPartitionedState(processTimerStateDescriptor).clear()
ctx.getPartitionedState(countStateDescriptor).clear()
TriggerResult.FIRE
} else {
TriggerResult.CONTINUE
}
}
//事件時間定時器觸發
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //還有未觸發計算的數據
println(s"事件時間到達最大的窗口時間,並且窗口中還有未計算的數據:${ctx.getPartitionedState(countStateDescriptor).get()},觸發計算並清除窗口")
ctx.getPartitionedState(eventTimerStateDescriptor).clear()
TriggerResult.FIRE_AND_PURGE
} else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //沒有未觸發計算的數據
println("事件時間到達最大的窗口時間,但是窗口中沒有有未計算的數據,清除窗口 但是不觸發計算")
TriggerResult.PURGE
} else {
TriggerResult.CONTINUE
}
}
//窗口結束時清空狀態
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
// println(s"清除窗口狀態,定時器")
ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get())
ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
ctx.getPartitionedState(processTimerStateDescriptor).clear()
ctx.getPartitionedState(eventTimerStateDescriptor).clear()
ctx.getPartitionedState(countStateDescriptor).clear()
}
//更新狀態為累加值
class Sum extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}
//更新狀態為取新的值
class Update extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value2
}
}
留下的疑問:
之前看資料的時候好像說定時器只能設置一個,你設置多個它也只會選擇一個執行。
但是我這里事件、執行時間定時器都設置,好像都生效了。這點還沒看懂。
后續研究下啥情況。
本文為個人原創文章,轉載請注明出處。!!!!