主要介紹 Flink 中的時間和水印。
我們在之前的課時中反復提到過窗口和時間的概念,Flink 框架中支持事件時間、攝入時間和處理時間三種。而當我們在流式計算環境中數據從 Source 產生,再到轉換和輸出,這個過程由於網絡和反壓的原因會導致消息亂序。因此,需要有一個機制來解決這個問題,這個特別的機制就是“水印”。
Flink 的窗口和時間
我們在第 05 課時中講解過 Flink 窗口的實現,根據窗口數據划分的不同,目前 Flink 支持如下 3 種:
我們在第 05 課時中講解過 Flink 窗口的實現,根據窗口數據划分的不同,目前 Flink 支持如下 3 種:
滾動窗口,窗口數據有固定的大小,窗口中的數據不會疊加;
滑動窗口,窗口數據有固定的大小,並且有生成間隔;
會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加。
Flink 中的時間分為三種:
滑動窗口,窗口數據有固定的大小,並且有生成間隔;
會話窗口,窗口數據沒有固定的大小,根據用戶傳入的參數進行划分,窗口數據無疊加。
Flink 中的時間分為三種:
事件時間(Event Time),即事件實際發生的時間;
攝入時間(Ingestion Time),事件進入流處理框架的時間;
處理時間(Processing Time),事件被處理的時間。
下面的圖詳細說明了這三種時間的區別和聯系:
攝入時間(Ingestion Time),事件進入流處理框架的時間;
處理時間(Processing Time),事件被處理的時間。
下面的圖詳細說明了這三種時間的區別和聯系:
事件時間(Event Time)
事件時間(Event Time)指的是數據產生的時間,這個時間一般由數據生產方自身攜帶,比如 Kafka 消息,每個生成的消息中自帶一個時間戳代表每條數據的產生時間。Event Time 從消息的產生就誕生了,不會改變,也是我們使用最頻繁的時間。
事件時間(Event Time)指的是數據產生的時間,這個時間一般由數據生產方自身攜帶,比如 Kafka 消息,每個生成的消息中自帶一個時間戳代表每條數據的產生時間。Event Time 從消息的產生就誕生了,不會改變,也是我們使用最頻繁的時間。
利用 Event Time 需要指定如何生成事件時間的“水印”,並且一般和窗口配合使用,具體會在下面的“水印”內容中詳細講解。
我們可以在代碼中指定 Flink 系統使用的時間類型為 EventTime:
復制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置時間屬性為 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置時間屬性為 EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
Flink 注冊 EventTime 是通過 InternalTimerServiceImpl.registerEventTimeTimer 來實現的:
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
Flink 注冊 EventTime 是通過 InternalTimerServiceImpl.registerEventTimeTimer 來實現的:
可以看到,該方法有兩個入參:namespace 和 time,其中 time 是觸發定時器的時間,namespace 則被構造成為一個 TimerHeapInternalTimer 對象,然后將其放入 KeyGroupedInternalPriorityQueue 隊列中。
那么 Flink 什么時候會使用這些 timer 觸發計算呢?答案在這個方法里:
復制InternalTimeServiceImpl.advanceWatermark。
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
這個方法中的 while 循環部分會從 eventTimeTimersQueue 中依次取出觸發時間小於參數 time 的所有定時器,調用 triggerTarget.onEventTime() 方法進行觸發。
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
這個方法中的 while 循環部分會從 eventTimeTimersQueue 中依次取出觸發時間小於參數 time 的所有定時器,調用 triggerTarget.onEventTime() 方法進行觸發。
這就是 EventTime 從注冊到觸發的流程。
處理時間(Processing Time)
處理時間(Processing Time)指的是數據被 Flink 框架處理時機器的系統時間,Processing Time 是 Flink 的時間系統中最簡單的概念,但是這個時間存在一定的不確定性,比如消息到達處理節點延遲等影響。
處理時間(Processing Time)指的是數據被 Flink 框架處理時機器的系統時間,Processing Time 是 Flink 的時間系統中最簡單的概念,但是這個時間存在一定的不確定性,比如消息到達處理節點延遲等影響。
我們同樣可以在代碼中指定 Flink 系統使用的時間為 Processing Time:
復制final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
同樣,也可以在源碼中找到 Flink 是如何注冊和使用 Processing Time 的。
同樣,也可以在源碼中找到 Flink 是如何注冊和使用 Processing Time 的。
registerProcessingTimeTimer() 方法為我們展示了如何注冊一個 ProcessingTime 定時器:
每當一個新的定時器被加入到 processingTimeTimersQueue 這個優先級隊列中時,如果新來的 Timer 時間戳更小,那么更小的這個 Timer 會被重新注冊 ScheduledThreadPoolExecutor 定時執行器上。
每當一個新的定時器被加入到 processingTimeTimersQueue 這個優先級隊列中時,如果新來的 Timer 時間戳更小,那么更小的這個 Timer 會被重新注冊 ScheduledThreadPoolExecutor 定時執行器上。
Processing Time 被觸發是在 InternalTimeServiceImpl 的 onProcessingTime() 方法中:
一直循環獲取時間小於入參 time 的所有定時器,並運行 triggerTarget 的 onProcessingTime() 方法。
攝入時間(Ingestion Time)
攝入時間(Ingestion Time)是事件進入 Flink 系統的時間,在 Flink 的 Source 中,每個事件會把當前時間作為時間戳,后續做窗口處理都會基於這個時間。理論上 Ingestion Time 處於 Event Time 和 Processing Time之間。
攝入時間(Ingestion Time)是事件進入 Flink 系統的時間,在 Flink 的 Source 中,每個事件會把當前時間作為時間戳,后續做窗口處理都會基於這個時間。理論上 Ingestion Time 處於 Event Time 和 Processing Time之間。
與事件時間相比,攝入時間無法處理延時和無序的情況,但是不需要明確執行如何生成 watermark。在系統內部,攝入時間采用更類似於事件時間的處理方式進行處理,但是有自動生成的時間戳和自動的 watermark。
可以防止 Flink 內部處理數據是發生亂序的情況,但無法解決數據到達 Flink 之前發生的亂序問題。如果需要處理此類問題,建議使用 EventTime。
Ingestion Time 的時間類型生成相關的代碼在 AutomaticWatermarkContext 中:
我們可以看出,這里會設置一個 watermark 發送定時器,在 watermarkInterval 時間之后觸發。
處理數據的代碼在 processAndCollect() 方法中:
水印(WaterMark)
水印(WaterMark)是 Flink 框架中最晦澀難懂的概念之一,有很大一部分原因是因為翻譯的原因。
水印(WaterMark)是 Flink 框架中最晦澀難懂的概念之一,有很大一部分原因是因為翻譯的原因。
WaterMark 在正常的英文翻譯中是水位,但是在 Flink 框架中,翻譯為“水位線”更為合理,它在本質上是一個時間戳。
在上面的時間類型中我們知道,Flink 中的時間:
EventTime 每條數據都攜帶時間戳;
EventTime 每條數據都攜帶時間戳;
ProcessingTime 數據不攜帶任何時間戳的信息;
IngestionTime 和 EventTime 類似,不同的是 Flink 會使用系統時間作為時間戳綁定到每條數據,可以防止 Flink 內部處理數據是發生亂序的情況,但無法解決數據到達 Flink 之前發生的亂序問題。
所以,我們在處理消息亂序的情況時,會用 EventTime 和 WaterMark 進行配合使用。
IngestionTime 和 EventTime 類似,不同的是 Flink 會使用系統時間作為時間戳綁定到每條數據,可以防止 Flink 內部處理數據是發生亂序的情況,但無法解決數據到達 Flink 之前發生的亂序問題。
所以,我們在處理消息亂序的情況時,會用 EventTime 和 WaterMark 進行配合使用。
首先我們要明確幾個基本問題。
水印的本質是什么
水印的出現是為了解決實時計算中的數據亂序問題,它的本質是 DataStream 中一個帶有時間戳的元素。如果 Flink 系統中出現了一個 WaterMark T,那么就意味着 EventTime < T 的數據都已經到達,窗口的結束時間和 T 相同的那個窗口被觸發進行計算了。
水印的出現是為了解決實時計算中的數據亂序問題,它的本質是 DataStream 中一個帶有時間戳的元素。如果 Flink 系統中出現了一個 WaterMark T,那么就意味着 EventTime < T 的數據都已經到達,窗口的結束時間和 T 相同的那個窗口被觸發進行計算了。
也就是說:水印是 Flink 判斷遲到數據的標准,同時也是窗口觸發的標記。
在程序並行度大於 1 的情況下,會有多個流產生水印和窗口,這時候 Flink 會選取時間戳最小的水印。
水印是如何生成的
Flink 提供了 assignTimestampsAndWatermarks() 方法來實現水印的提取和指定,該方法接受的入參有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 兩種。
Flink 提供了 assignTimestampsAndWatermarks() 方法來實現水印的提取和指定,該方法接受的入參有 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 兩種。
整體的類圖如下:
水印種類
周期性水印
周期性水印
我們在使用 AssignerWithPeriodicWatermarks 周期生成水印時,周期默認的時間是 200ms,這個時間的指定位置為:
復制@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
是否還記得上面我們在講時間類型時會通過 env.setStreamTimeCharacteristic() 方法指定 Flink 系統的時間類型,這個 setStreamTimeCharacteristic() 方法中會做判斷,如果用戶傳入的是 TimeCharacteristic.eventTime 類型,那么 AutoWatermarkInterval 的值則為 200ms ,如上述代碼所示。當前我們也可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法來指定自動生成的時間間隔。
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
是否還記得上面我們在講時間類型時會通過 env.setStreamTimeCharacteristic() 方法指定 Flink 系統的時間類型,這個 setStreamTimeCharacteristic() 方法中會做判斷,如果用戶傳入的是 TimeCharacteristic.eventTime 類型,那么 AutoWatermarkInterval 的值則為 200ms ,如上述代碼所示。當前我們也可以使用 ExecutionConfig.setAutoWatermarkInterval() 方法來指定自動生成的時間間隔。
在上述的類圖中可以看出,我們需要通過 TimestampAssigner 的 extractTimestamp() 方法來提取 EventTime。
Flink 在這里提供了 3 種提取 EventTime() 的方法,分別是:
AscendingTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor
IngestionTimeExtractor
這三種方法中 BoundedOutOfOrdernessTimestampExtractor() 用的最多,需特別注意,在這個方法中的 maxOutOfOrderness 參數,該參數指的是允許數據亂序的時間范圍。簡單說,這種方式允許數據遲到 maxOutOfOrderness 這么長的時間。
BoundedOutOfOrdernessTimestampExtractor
IngestionTimeExtractor
這三種方法中 BoundedOutOfOrdernessTimestampExtractor() 用的最多,需特別注意,在這個方法中的 maxOutOfOrderness 參數,該參數指的是允許數據亂序的時間范圍。簡單說,這種方式允許數據遲到 maxOutOfOrderness 這么長的時間。
復制 public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
PunctuatedWatermark 水印
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
PunctuatedWatermark 水印
這種水印的生成方式 Flink 沒有提供內置實現,它適用於根據接收到的消息判斷是否需要產生水印的情況,用這種水印生成的方式並不多見。
舉個簡單的例子,假如我們發現接收到的數據 MyData 中以字符串 watermark 開頭則產生一個水印:
復制data.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
@Override
public Watermark checkAndGetNextWatermark(MyData data, long l) {
return data.getRecord.startsWith("watermark") ? new Watermark(l) : null;
}
public Watermark checkAndGetNextWatermark(MyData data, long l) {
return data.getRecord.startsWith("watermark") ? new Watermark(l) : null;
}
@Override
public long extractTimestamp(MyData data, long l) {
return data.getTimestamp();
}
});
class MyData{
private String record;
private Long timestamp;
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
public Timestamp getTimestamp() {
return timestamp;
}
public void setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
}
}
案例
我們上面講解了 Flink 關於水印和時間的生成,以及使用,下面舉一個例子來講解。
public long extractTimestamp(MyData data, long l) {
return data.getTimestamp();
}
});
class MyData{
private String record;
private Long timestamp;
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
public Timestamp getTimestamp() {
return timestamp;
}
public void setTimestamp(Timestamp timestamp) {
this.timestamp = timestamp;
}
}
案例
我們上面講解了 Flink 關於水印和時間的生成,以及使用,下面舉一個例子來講解。
模擬一個實時接收 Socket 的 DataStream 程序,代碼中使用 AssignerWithPeriodicWatermarks 來設置水印,將接收到的數據進行轉換,分組並且在一個 5
秒的窗口內獲取該窗口中第二個元素最小的那條數據。
秒的窗口內獲取該窗口中第二個元素最小的那條數據。
復制public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//設置為eventtime事件類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置水印生成時間間隔100ms
env.getConfig().setAutoWatermarkInterval(100);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置水印生成時間間隔100ms
env.getConfig().setAutoWatermarkInterval(100);
DataStream<String> dataStream = env
.socketTextStream("127.0.0.1", 9000)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
private Long currentTimeStamp = 0L;
//設置允許亂序時間
private Long maxOutOfOrderness = 5000L;
.socketTextStream("127.0.0.1", 9000)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
private Long currentTimeStamp = 0L;
//設置允許亂序時間
private Long maxOutOfOrderness = 5000L;
@Override
public Watermark getCurrentWatermark() {
public Watermark getCurrentWatermark() {
return new Watermark(currentTimeStamp - maxOutOfOrderness);
}
}
@Override
public long extractTimestamp(String s, long l) {
String[] arr = s.split(",");
long timeStamp = Long.parseLong(arr[1]);
currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
System.err.println(s + ",EventTime:" + timeStamp + ",watermark:" + (currentTimeStamp - maxOutOfOrderness));
return timeStamp;
}
});
public long extractTimestamp(String s, long l) {
String[] arr = s.split(",");
long timeStamp = Long.parseLong(arr[1]);
currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
System.err.println(s + ",EventTime:" + timeStamp + ",watermark:" + (currentTimeStamp - maxOutOfOrderness));
return timeStamp;
}
});
dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] split = s.split(",");
return new Tuple2<String, Long>(split[0], Long.parseLong(split[1]));
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.minBy(1)
.print();
return new Tuple2<String, Long>(split[0], Long.parseLong(split[1]));
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.minBy(1)
.print();
env.execute("WaterMark Test Demo");
}
我們第一次試驗的數據如下:
我們第一次試驗的數據如下:
復制flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
可以做一個簡單的判斷,第一條數據的時間戳為 1588659181000,窗口的大小為 5 秒,那么應該會在 flink,1588659185000 這條數據出現時觸發窗口的計算。
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
可以做一個簡單的判斷,第一條數據的時間戳為 1588659181000,窗口的大小為 5 秒,那么應該會在 flink,1588659185000 這條數據出現時觸發窗口的計算。
我們用 nc -lk 9000 命令啟動端口,然后輸出上述試驗數據,看到控制台的輸出:
很明顯,可以看到當第五條數據出現后,窗口觸發了計算。
下面再模擬一下數據亂序的情況,假設我們的數據來源如下:
復制flink,1588659181000
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
flink,1588659180000
flink,1588659186000
flink,1588659187000
flink,1588659188000
flink,1588659189000
flink,1588659190000
其中的 flink,1588659180000 為亂序消息,來看看會發生什么?
flink,1588659182000
flink,1588659183000
flink,1588659184000
flink,1588659185000
flink,1588659180000
flink,1588659186000
flink,1588659187000
flink,1588659188000
flink,1588659189000
flink,1588659190000
其中的 flink,1588659180000 為亂序消息,來看看會發生什么?
可以看到,時間戳為 1588659180000 的這條消息並沒有被處理,因為此時代碼中的允許亂序時間 private Long maxOutOfOrderness = 0L 即不處理亂序消息。
下面修改 private Long maxOutOfOrderness = 5000L,即代表允許消息的亂序時間為 5 秒,然后把同樣的數據發往 socket 端口。
可以看到,我們把所有數據發送出去僅觸發了一次窗口計算,並且輸出的結果中 watermark 的時間往后順延了 5 秒鍾。所以,maxOutOfOrderness 的設置會影響窗口的計算時間和水印的時間,如下圖所示:
假如我們繼續向 socket 中發送數據:
復制flink,1588659191000
flink,1588659192000
flink,1588659193000
flink,1588659194000
flink,1588659195000
可以看到下一次窗口的觸發時間:
flink,1588659192000
flink,1588659193000
flink,1588659194000
flink,1588659195000
可以看到下一次窗口的觸發時間:
在這里要特別說明,Flink 在用時間 + 窗口 + 水印來解決實際生產中的數據亂序問題,有如下的觸發條件:
watermark 時間 >= window_end_time;
在 [window_start_time,window_end_time) 中有數據存在,這個窗口是左閉右開的。
此外,因為 WaterMark 的生成是以對象的形式發送到下游,同樣會消耗內存,因此水印的生成時間和頻率都要進行嚴格控制,否則會影響我們的正常作業。
在 [window_start_time,window_end_time) 中有數據存在,這個窗口是左閉右開的。
此外,因為 WaterMark 的生成是以對象的形式發送到下游,同樣會消耗內存,因此水印的生成時間和頻率都要進行嚴格控制,否則會影響我們的正常作業。
總結
這一課時我們學習了 Flink 的時間類型和水印生成,內容偏多並且水印部分理解起來需要時間,建議你結合源碼再進一步學習。
這一課時我們學習了 Flink 的時間類型和水印生成,內容偏多並且水印部分理解起來需要時間,建議你結合源碼再進一步學習。