內置水印生成器
1.有序生成
只需提取事件時間的時間戳作為水印即可。
java
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
2.有界無序生成策略
設置延遲的上限。我們知道每個事件都會延遲一段時間才到達,而這些延遲差異會比較大,所以有些事件會比其他事件延遲更多。一種簡單的方法是假設這些延遲不會超過某個最大值。
java
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
自定義水印生成器
1.定期水印
AssignerWithPeriodicWatermarks 分配時間戳並定期生成水印(可能取決於流元素,或純粹基於處理時間)。
通過 ExecutionConfig.setAutoWatermarkInterval(...) 定義生成水印的間隔(每n毫秒)。 每次都會調用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大於前一個水印,則將發出一個新的水印。
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
// 1 min 延遲
val bound: Long = 60 * 1000
// the maximum observed timestamp
var maxTs: Long = _
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
時間戳方法為處理元素時調用
@Override
public void processElement(StreamRecord<T> element) throws Exception {
final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
output.collect(element.replace(element.getValue(), newTimestamp));
}
2.帶標記的水印
AssignerWithPunctuatedWatermarks 根據元素的特定標記生成新的水印。 對於此類,Flink 將首先調用 extractTimestamp(...) 方法為該元素分配時間戳,然后立即在該元素上調用 checkAndGetNextWatermark(...)方法。
將 extractTimestamp(...) 方法中分配的時間戳傳遞給 checkAndGetNextWatermark(...) 方法,並決定是否要生成水印。 每當 checkAndGetNextWatermark(...) 方法返回非空水印,並且該水印大於最新的先前水印時,就會發出新的水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
// 1 min in ms
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
// emit watermark if reading is from sensor_1
new Watermark(extractedTS - bound)
} else {
// do not emit a watermark
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// assign record timestamp
r.timestamp
}
}
參考文章
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html
