內置水印生成器
1.有序生成
只需提取事件時間的時間戳作為水印即可。
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { @Override public long extractAscendingTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
2.有界無序生成策略
設置延遲的上限。我們知道每個事件都會延遲一段時間才到達,而這些延遲差異會比較大,所以有些事件會比其他事件延遲更多。一種簡單的方法是假設這些延遲不會超過某個最大值。
java
DataStream<MyEvent> stream = ... DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { @Override public long extractTimestamp(MyEvent element) { return element.getCreationTime(); } });
scala
val stream: DataStream[MyEvent] = ... val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
自定義水印生成器
1.定期水印
AssignerWithPeriodicWatermarks 分配時間戳並定期生成水印(可能取決於流元素,或純粹基於處理時間)。
通過 ExecutionConfig.setAutoWatermarkInterval(...) 定義生成水印的間隔(每n毫秒)。 每次都會調用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大於前一個水印,則將發出一個新的水印。
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] { // 1 min 延遲 val bound: Long = 60 * 1000 // the maximum observed timestamp var maxTs: Long = _ override def getCurrentWatermark: Watermark = { new Watermark(maxTs - bound) } override def extractTimestamp(r: SensorReading, previousTS: Long): Long = { // update maximum timestamp maxTs = maxTs.max(r.timestamp) // return record timestamp r.timestamp }
時間戳方法為處理元素時調用
@Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); }
2.帶標記的水印
AssignerWithPunctuatedWatermarks 根據元素的特定標記生成新的水印。 對於此類,Flink 將首先調用 extractTimestamp(...) 方法為該元素分配時間戳,然后立即在該元素上調用 checkAndGetNextWatermark(...)方法。
將 extractTimestamp(...) 方法中分配的時間戳傳遞給 checkAndGetNextWatermark(...) 方法,並決定是否要生成水印。 每當 checkAndGetNextWatermark(...) 方法返回非空水印,並且該水印大於最新的先前水印時,就會發出新的水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] { // 1 min in ms val bound: Long = 60 * 1000 override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = { if (r.id == "sensor_1") { // emit watermark if reading is from sensor_1 new Watermark(extractedTS - bound) } else { // do not emit a watermark null } } override def extractTimestamp(r: SensorReading, previousTS: Long): Long = { // assign record timestamp r.timestamp } }
參考文章
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html