本文翻譯自Pre-defined Timestamp Extractors / Watermark Emitter
------------------------------------------------------------------------------------------
正如timestamps and watermark handling中所述,Flink提供了抽象類來讓開發者賦值自己的時間戳並發送他們自己的Watermark。更具體來說,開發者需要依照不同用例情況來實現接口AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。簡而言之,前一個接口將會周期性發送Watermark,而第二個接口根據一些到達數據的屬性,例如一旦在流中碰到一個特殊的element便發送Watermark。
為了進一步簡化開發者開發類似的task,Flink自帶了一些預先實現的timestamp assigner。本節提供了它們的一個列表。除過引用即用的函數,這些預先實現的assigner還可以作為自定義assigner的實現示例。
遞增時間戳的Assigner
最簡單的周期性Watermark生成的特例便是由一個給定的Source task所見的時間戳都以遞增順序發生的情況。在這種情況下,由於不會有比當前時間戳更早的時間戳到達,故總是可以將當前時間戳看作是一個Watermark。
注意上述情況僅在每個並行數據源task的時間戳都是以遞增順序到達時才是必要的(應當是必要條件?--翻譯不確定),例如,在某特定部署中,一個Kafka分區是由一個並行性數據源讀取的,那么上述情況僅在每個Kafka分區內的時間戳都是遞增順序出現時才是必要的。Flink的Watermark合並機制保證會在任何並行流在進行shuffle、 union、 connect或merge后都可以生成正確的Watermark。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
允許固定量的遲到數據的Assigner
另一個周期性Watermark生成的例子是Watermark落在流中的一個固定時間段內觀察到的最大(事件時間的)時間戳的后面。該情況同樣包括預先知道在流中將會遇到的最大遲到量(lateness)的情況,例如創建的一個測試用的自定義source中,它的element的時間戳會分布在一個固定的時間段內。Flink為這種情況提供了BoundedOutofOrdernessTimestampExtractor接口,該接口需要參數maxOutofOrderness,即在一個element被給定窗口在計算最終結果時忽略之前(即該element過期前),所允許該element遲到的最大lateness。lateness的值為"t-t_w",其中t是一個element的(事件時間的)時間戳,t_w是前一個watermark。如果lateness > 0,則我們就認為該element已經遲到,並且在job計算對應窗口的結果時忽略它。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});