Flink Program Guide (4) -- 時間戳和Watermark生成(DataStream API編程指導 -- For Java)


時間戳和Watermark生成

本文翻譯自Generating Timestamp / Watermarks

------------------------------------------------------------------

本文是Flink在使用事件時間(Event Time)時相關內容,有關事件時間、處理時間和提取時間的介紹,請見event time introduction

 

流程序需要設置時間特征Event time,才能在程序中使用事件時間。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.
setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

一、時間戳賦值

為了使事件時間可以正常使用,Flink需要知道時間的時間戳,即流中的每個element都需要被賦予它自己的時間戳。Flink通常會從element的一些域訪問/提取時間戳(That happens usually by accessing/extracting the timestamp from some field in the element.)。

 

時間戳的賦值通常與Watermark的生成緊密相關,其中Watermark生成負責通知系統事件時間的增長情況。

 

時間戳賦值和Watermark生成的方式有兩種:

1.    直接在數據流源處進行

2.    通過一個Timestamp assigner / watermark generator:在Flink中,Timestamp assigner同樣會定義watermark的發送行為

 

注意:時間戳和Watermark都是使用從Java歷元(epoch “1970-01-01 T00.00.00Z”開始的毫秒數定義的

 

1.1 帶有時間戳和WatermarkSource方法

流的源可以在它們生產的element中直接賦值時間戳以及發送Watermark。在此情況下,我們不需要Timestamp Assigner

要在Source方法中向element直接賦值時間戳,Source方法必須在SourceContext上調用方法collectWithTimestamp(…)。要在Source中生成WatermarkSource必須調用emitWatermark(Watermark)方法。

 

在下例的(非檢查點的)Source方法中,方法直接向element賦值時間戳,並且根據特殊事件生成Watermark

@Override
public
void run(SourceContext<MyType> ctx) throws Exception {
  while (
/* condition */) {
    MyType next = getNext();
     ctx.
collectWithTimestamp(next, next.getEventTimestamp());

    if (next.hasWatermarkTime()) {
      ctx.
emitWatermark(new Watermark(next.getWatermarkTime()));
    }
  }
}

 

注意:如果流程序在已經擁有時間戳的流上繼續使用TimestampAssigner,流中element的原有時間戳將被TimestampAssigner重寫。類似地,Watermark也會同樣被重寫。

 

1.2 Timestamp Assigner / Watermark Generators

Timestamp Assigner接收一個流並且產生一個帶有時間戳賦值elementWatermark的新的流。如果原有的流已經擁有了時間戳或Watermark,則Timestamp Assigner將會重寫它們。

 

通常在緊接着數據源之后會定義Timestamp Assigner,但這並不是嚴格要求的。例如在通用的模式中,會在Timestamp Assigner之前進行parse(MapFunction)filter(FilterFunction)操作。不論在什么情況下,Timestamp Assigner都需要在第一個使用事件時間的Operation(如第一個窗口Operation)之前定義。而在流Job中使用Kafka作為數據源是一個特殊情況,Flink允許在數據源(或數據消費者consumer)內部定義Timestamp AssignerWatermark emitter,更多相關信息請見Kafka Connector documentation

 

注意:本節余下內容呈現了一個開發者創建自己的Timestamp Assigner watermark emitter所需要實現的主要接口。有關Flink自帶的預先實現的extractor,請見Pre-defined Timestamp Extractors / Watermark Emitters

 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.
setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
  myFormat, myFilePath, FileProcessingMode.
PROCESS_CONTINUOUSLY, 100,
  FilePathFilter.
createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
  .
filter( event -> event.severity() == WARNING )
  .
assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
  .
keyBy( (event) -> event.getGroup() )
  .
timeWindow(Time.seconds(10))
  .
reduce( (a, b) -> a.add(b) )
  .
addSink(...);

 

周期性Watermark

AssignerWithPeriodicWatermark賦值時間戳並周期性生成(生成方式有可能是依靠流的element,或者純粹基於處理時間)。

 

生成Watermark的時間周期區間(每n毫秒)的大小可以通過ExecutionConfig.setAutoWatermarkInterval(…)設置。每一次生成時,都將會調用AssignergetCurrentWatermark()方法,如果返回的Watermark是非null且大於前一個Watermark,則會發送一個新的Watermark

 

下面是兩個生成周期性WatermarkTimestamp Assigner的例子

/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class
BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

private final long maxOutOfOrderness = 3500; // 3.5 seconds

private long currentMaxTimestamp;

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  
long timestamp = element.getCreationTime();
  currentMaxTimestamp = Math.
max(timestamp, currentMaxTimestamp);
  return timestamp;
}

@Override
public Watermark
getCurrentWatermark() {
  
// return the watermark as current highest timestamp minus the out-of-orderness bound
  return new
Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

}

/**
* This generator generates watermarks that are lagging behind processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain time.
*/
public class
TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

private final long maxTimeLag = 5000; // 5 seconds

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.
getCreationTime();
}

@Override
public Watermark
getCurrentWatermark() {
        
// return the watermark as current time minus the maximum time lag
        return new
Watermark(System.currentTimeMillis() - maxTimeLag);
}

}

 

帶標點(punctuatedWatermark

為了在某事件下就產生Watermark,我們需要使用AssignerWithPunctuatedWatermarks。在該類中,Flink會先調用extractTimestamp(…)方法來給element賦值一個時間戳,然后針對該element即刻調用checkAndGetNextWatermark(…)方法來返回一個非nullWatermark

 

checkAndGetNextWatermark(…)方法將獲得在extractTimestamp(…)方法中獲得的時間戳,並決定是否生成Watermark。一旦checkAndGetNextWatermark(…)方法返回一個非nullWatermark,並且該Watermark大於最近的上一個Watermark,則發送該新的Watermark

 

public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.
getCreationTime();
}

@Override
public Watermark
checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return element.
hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}

}

 

注意:在每個事件上都生成一個Watermark是可能存在的,但是由於每個Watermark都會導致下游的計算開銷,過多的Watermark會降低程序的性能

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM