Flink 操作 —— 水印


內置水印生成器

1.有序生成

只需提取事件時間的時間戳作為水印即可。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

 

2.有界無序生成策略

設置延遲的上限。我們知道每個事件都會延遲一段時間才到達,而這些延遲差異會比較大,所以有些事件會比其他事件延遲更多。一種簡單的方法是假設這些延遲不會超過某個最大值。

java

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

scala

val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

 

自定義水印生成器

1.定期水印

AssignerWithPeriodicWatermarks 分配時間戳並定期生成水印(可能取決於流元素,或純粹基於處理時間)。
通過 ExecutionConfig.setAutoWatermarkInterval(...)  定義生成水印的間隔(每n毫秒)。 每次都會調用分配者的 getCurrentWatermark() 方法,如果返回的水印非空且大於前一個水印,則將發出一個新的水印。

class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {

  // 1 min 延遲
  val bound: Long = 60 * 1000
  // the maximum observed timestamp
  var maxTs: Long = _

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // update maximum timestamp
    maxTs = maxTs.max(r.timestamp)
    // return record timestamp
    r.timestamp
  }

時間戳方法為處理元素時調用

@Override
public void processElement(StreamRecord<T> element) throws Exception {
	final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
			element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

	output.collect(element.replace(element.getValue(), newTimestamp));
}

 

2.帶標記的水印

AssignerWithPunctuatedWatermarks 根據元素的特定標記生成新的水印。 對於此類,Flink 將首先調用 extractTimestamp(...) 方法為該元素分配時間戳,然后立即在該元素上調用 checkAndGetNextWatermark(...)方法。
將 extractTimestamp(...) 方法中分配的時間戳傳遞給 checkAndGetNextWatermark(...) 方法,並決定是否要生成水印。 每當 checkAndGetNextWatermark(...) 方法返回非空水印,並且該水印大於最新的先前水印時,就會發出新的水印。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {

  // 1 min in ms
  val bound: Long = 60 * 1000

  override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
    if (r.id == "sensor_1") {
      // emit watermark if reading is from sensor_1
      new Watermark(extractedTS - bound)
    } else {
      // do not emit a watermark
      null
    }
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // assign record timestamp
    r.timestamp
  }
}

 

參考文章


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamp_extractors.html

http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/event_timestamps_watermarks.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM