時間戳和Watermark生成
本文翻譯自Generating Timestamp / Watermarks
------------------------------------------------------------------
本文是Flink在使用事件時間(Event Time)時相關內容,有關事件時間、處理時間和提取時間的介紹,請見event time introduction。
流程序需要設置時間特征為Event time,才能在程序中使用事件時間。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
一、時間戳賦值
為了使事件時間可以正常使用,Flink需要知道時間的時間戳,即流中的每個element都需要被賦予它自己的時間戳。Flink通常會從element的一些域訪問/提取時間戳(That happens usually by accessing/extracting the timestamp from some field in the element.)。
時間戳的賦值通常與Watermark的生成緊密相關,其中Watermark生成負責通知系統事件時間的增長情況。
時間戳賦值和Watermark生成的方式有兩種:
1. 直接在數據流源處進行
2. 通過一個Timestamp assigner / watermark generator:在Flink中,Timestamp assigner同樣會定義watermark的發送行為
注意:時間戳和Watermark都是使用從Java歷元(epoch) “1970-01-01 T00.00.00Z”開始的毫秒數定義的
1.1 帶有時間戳和Watermark的Source方法
流的源可以在它們生產的element中直接賦值時間戳以及發送Watermark。在此情況下,我們不需要Timestamp Assigner。
要在Source方法中向element直接賦值時間戳,Source方法必須在SourceContext上調用方法collectWithTimestamp(…)。要在Source中生成Watermark,Source必須調用emitWatermark(Watermark)方法。
在下例的(非檢查點的)Source方法中,方法直接向element賦值時間戳,並且根據特殊事件生成Watermark:
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
注意:如果流程序在已經擁有時間戳的流上繼續使用TimestampAssigner,流中element的原有時間戳將被TimestampAssigner重寫。類似地,Watermark也會同樣被重寫。
1.2 Timestamp Assigner / Watermark Generators
Timestamp Assigner接收一個流並且產生一個帶有時間戳賦值element和Watermark的新的流。如果原有的流已經擁有了時間戳或Watermark,則Timestamp Assigner將會重寫它們。
通常在緊接着數據源之后會定義Timestamp Assigner,但這並不是嚴格要求的。例如在通用的模式中,會在Timestamp Assigner之前進行parse(MapFunction)和filter(FilterFunction)操作。不論在什么情況下,Timestamp Assigner都需要在第一個使用事件時間的Operation(如第一個窗口Operation)之前定義。而在流Job中使用Kafka作為數據源是一個特殊情況,Flink允許在數據源(或數據消費者consumer)內部定義Timestamp Assigner和Watermark emitter,更多相關信息請見Kafka Connector documentation。
注意:本節余下內容呈現了一個開發者創建自己的Timestamp Assigner 和 watermark emitter所需要實現的主要接口。有關Flink自帶的預先實現的extractor,請見Pre-defined Timestamp Extractors / Watermark Emitters
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
周期性Watermark
AssignerWithPeriodicWatermark賦值時間戳並周期性生成(生成方式有可能是依靠流的element,或者純粹基於處理時間)。
生成Watermark的時間周期區間(每n毫秒)的大小可以通過ExecutionConfig.setAutoWatermarkInterval(…)設置。每一次生成時,都將會調用Assigner的getCurrentWatermark()方法,如果返回的Watermark是非null且大於前一個Watermark,則會發送一個新的Watermark。
下面是兩個生成周期性Watermark和Timestamp Assigner的例子
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain time.
*/
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
帶標點(punctuated)Watermark
為了在某事件下就產生Watermark,我們需要使用AssignerWithPunctuatedWatermarks。在該類中,Flink會先調用extractTimestamp(…)方法來給element賦值一個時間戳,然后針對該element即刻調用checkAndGetNextWatermark(…)方法來返回一個非null的Watermark。
checkAndGetNextWatermark(…)方法將獲得在extractTimestamp(…)方法中獲得的時間戳,並決定是否生成Watermark。一旦checkAndGetNextWatermark(…)方法返回一個非null的Watermark,並且該Watermark大於最近的上一個Watermark,則發送該新的Watermark。
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
注意:在每個事件上都生成一個Watermark是可能存在的,但是由於每個Watermark都會導致下游的計算開銷,過多的Watermark會降低程序的性能