1 前言
在時間 Time
那一篇中,介紹了三種時間概念 Event
、Ingestin
和 Process
, 其中還簡單介紹了亂序 Event Time
事件和它的解決方案 Watermark
水位線
(看過多篇文章后,決定喊它水位線,因為窗口觸發條件是 Watermark
> Window_end_time
,有點像水流到達水位線后溢出,當然喊它水印也是可以的,全看個人愛好咯~)
前文請翻 時間 Time 和 Watermark,不過前面介紹比較淺,沒能很好領會水位線的概念,所以本篇是作為補充,來加深理解~
2 Watermark 理論
2.1 Watermark 的概念
Watermark
是一種衡量 Event Time
進展的機制,它是數據本身的隱藏屬性。通常基於 Event Time
事件的數據,數據自身有一個時間戳 timestamp
類型的屬性,例如 Message
對象中有個屬性 timestamp
= 1575090298299(2019-11-30 13:04:58),如果設定的可延時時間為 3s,那么從該事件提取到的水位線可能如下:
water(1575090298299) = 1575090298299 - 3000(2019-11-30 13:04:55)
如果該水位線被采納,那么表示全部事件中,timestamp
小於 1575090298299 - 3000(2019-11-30 13:04:55)的事件,都已經到達了 Flink
程序中。
Ideal
表示理想情況下的,事件是按序到達程序的,與處理時間相吻合,但是真實情況下,會有各種其他因素導致事件延遲,造成了 Reality
那條線,其中 Skew
就是它們傾斜,代表可能延遲到達的時間。
2.2 Watermark 的作用
前面提到事件 Event
有可能延遲,也就是事件亂序,Watermark
就是用來解決該問題的利器,通常搭配 window
一起出現。
watermark
本質上是一個時間戳,它是單調遞增,只要有源源不斷的事件到達程序,水位線就有可能被更新,比較替換成更大的值。
然后程序 根據 watermark
識別程序處理到什么位置、進度,比水位線小的數據表示都已經到達,后續接收的事件時間應該都要比它大。
默認情況下,后續比水位線小的事件會被認為是遲到數據,Flink
默認策略是舍棄它們,不進行計算(但還有其它機制去搜集這些被舍棄的遲到數據,詳細可去了解 SideOutputLateData)。
在一些情況下,我們想要延遲窗口幾秒才觸發計算,例如當前有個時間窗口 [00:01]-[00:04],但可能在 05s 時,出現了 03 的事件,我們想稍微等一下出現延遲的事件,將 03 這個事件加入到正確的窗口中一起計算
下面用消息隊列來說明下 watermark
(事件上的數字表示自身攜帶的 timestamp
)
上圖設定的時間窗口大小為 4s,時間屬性為 Event Time
事件,消息隊列中的數據是亂序到達程序,分割線 w(4)、w(9)
表示的是水印(結合下圖分析,延時時間大概率設置為 3s)
上圖例子中,數據 7 進入第二個窗口而水印未生成前,數據 3 進入了第一個窗口后,然后前面水印生成了,全局水印更新成了 w(4)
,由於 w(4) >= window1_end_time
,於是觸發了第一個窗口的計算。
后續數據 5、6 繼續進入第二個窗口,數據 9、12 進入第三個窗口。
接着數據 12 進入程序中,被分配到第三個窗口中,計算得到水印 w(9)
大於第二個窗口結束時間,又會觸發第二個窗口的計算,以此類推。
從上面看出,數據 3 在數據 7 后面才到,屬於遲到數據,但由於設定了 watermark
,允許了一定時間的遲到事件,所以設定 watermark
可以解決一定程度上的亂序事件。
2.3 算子對 Watermark 的處理
引用自 Louisvv 的博文
Watermark
是可以被算子處理,算子內部會有個時間記錄器,記錄各個 Window
的結束時間。
當算子收到一個 Watermark
時,算子會根據這個 Watermark
的時間戳更新內部的 Event Time Clock
,當前記錄的時間與 Watermark
進行比較,如果 Watermark
大於記錄的時間,則會更新該記錄為最新的 Watermark
值。
2.4 Watermark 的設定
各種情況導致的事件亂序,我們需要設置 Watermark
,允許一定時間段的延遲(不過不能無限等待下去,時效性和內存使用率還是很重要的)。
一般會在接收到 DataSource
的數據后,立刻生成 watermark
,也可以在 source
之后,進行簡單的 map
或者 filter
操作后,再生成 watermark
。
Watermark
設定方法有兩種:
在代碼中,可以通過調用 DataStream
中的兩個 API
來提取時間和分配水印,分別是 AssignerWithPunctuatedWatermarks
和 AssignerWithPeriodicWatermarks
。
Punctuated Watermark
:數據流中每一個遞增的EventTime
都會產生一個Watermark
在實際的生產環境中,在 TPS
很高的情況下會產生大量的 Watermark
,可能在一定程度上對下游算子造成一定的壓力,所以只有在實時性很高的場景才會選擇這種方式來進行生成水印。
PeriodicWatermarks
:周期性(一定時間間隔或者達到一定的記錄條數)生成水印
在實際的生產環境中,使用這種 PeriodicWatermarks
較多,它會周期性(通過 setAutoWatermarkInterval(...),設置間隔的毫秒數)生成水印,但是必須結合時間或者積累條數兩個維度,否則會在極端情況下會有很大的延時。
來看下它們在代碼中的位置和結構吧:
下面來結合代碼例子看下常用的 PeriodicWatermarks
3 PeriodicWatermarks Demo
先來交代下程序的邏輯:
1、往 9010 端口發送數據:nc -l 9010
2、寫程序,監聽 9010 端口,按行讀取,map 分詞處理成 tuple2
類型(key, time)
3、提取時間和生成水印
4、event time
窗口是 4s,允許延遲時間是 3s
其中,自定義的 Watermark
生成規則實現了 AssignerWithPeriodicWatermarks
,實現 getCurrentWatermark
:生成水印 和 extractTimestamp
:提取時間戳
3.1 程序主要邏輯
int port = 9010;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<String> text = env.socketTextStream("127.0.0.1", port, "\n");
//解析輸入的數據
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new WordCountPeriodicWatermarks());
DataStream<String> window = waterMarkStream.keyBy(0)
//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<>();
List<String> eventTimeList = new ArrayList<>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
eventTimeList.add(String.valueOf(next.f1).substring(8,10));
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "\n 鍵值 : " + key + "\n " +
"觸發窗內數據個數 : " + arrarList.size() + "\n " +
"觸發窗起始數據: " + sdf.format(arrarList.get(0)) + "\n " +
"觸發窗最后(可能是延時)數據:" + sdf.format(arrarList.get(arrarList.size() - 1))
+ "\n " +
"窗口內的事件數據:" + Joiner.on(",").join(eventTimeList) + "\n" +
"實際窗起始和結束時間: " + sdf.format(window.getStart()) + "《----》" + sdf.format(window.getEnd()) + " \n \n ";
out.collect(result);
}
});
window.print();
env.execute("eventtime-watermark");
上面貼出來的代碼作用就是接收 9010 端口寫入的數據,然后進行 map
提取處理,接着進行水印處理,最后進入窗口運算符,進行窗口計算。
3.2 水印生成器
public class WordCountPeriodicWatermarks implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
private Long currentMaxTimestamp = 0L;
// 最大允許的亂序時間是 3 s
private final Long maxOutOfOrderness = 3000L;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
//定義如何提取timestamp
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
long id = Thread.currentThread().getId();
System.out.println("線程 ID :"+ id +
" 鍵值 :" + element.f0 +
",事件事件:[ "+sdf.format(element.f1)+
" ],currentMaxTimestamp:[ "+
sdf.format(currentMaxTimestamp)+" ],水印時間:[ "+
sdf.format(getCurrentWatermark().getTimestamp())+" ]");
return timestamp;
}
}
extractTimestamp()
方法從數據本身屬性中提取 Event Time
,該方法返回的是 Math.max(timestamp, currentMaxTimestamp)
,比較了當前時間戳和事件時間戳,返回較大者。
getCurrentWatermark
是獲取當前的水印,這里定義的最大延遲時間為 3s,生成的水印會減去它,例如數據 7 進來后,水印計算規則:w(7 - 3) = w(4)
。后面如果窗口計算觸發后,超過水印時間的事件,默認情況下會被舍棄掉。
3.3 運行程序和測試數據
記住運行程序前,需要在終端中打開 9010 端口:nc -l 9010
測試數據:2 3 1 7 3 5 9 6 12 17 10 16 19 11 18
第一列為 key
,為了便於辨認,都是用 001,第二列是時間戳,按照上面測試數據,設定成對應的秒數
001,1575129602000
001,1575129603000
001,1575129601000
001,1575129607000
001,1575129603000
001,1575129605000
001,1575129609000
001,1575129606000
001,1575129612000
001,1575129617000
001,1575129610000
001,1575129616000
001,1575129619000
001,1575129611000
001,1575129618000
3.4 理想中的驗證結果
在第一次試驗中,我將這批數據全量拷貝到終端,程序進行了處理:
整理的輸出結果如下:
Event Time | CurrentMaxTimeStamp | Watermark | Window_Start_Time | Window_End_Time |
---|---|---|---|---|
00:00:02 | 00:00:02 | 23:59:59 | 00:00:00 | 00:00:04 |
00:00:03 | 00:00:03 | 00:00:00 | ||
00:00:01 | 00:00:01 | 00:00:00 | ||
00:00:07 | 00:00:07 | 00:00:04 | 00:00:04 | 00:00:08 |
00:00:03 | 00:00:03 | 00:00:04 | 00:00:00 | 00:00:04 |
00:00:05 | 00:00:05 | 00:00:04 | 00:00:04 | 00:00:08 |
00:00:09 | 00:00:09 | 00:00:06 | 00:00:08 | 00:00:12 |
00:00:06 | 00:00:06 | 00:00:06 | 00:00:04 | 00:00:08 |
00:00:12 | 00:00:12 | 00:00:09 | 00:00:12 | 00:00:16 |
00:00:17 | 00:00:17 | 00:00:14 | 00:00:16 | 00:00:20 |
00:00:10 | 00:00:17 | 00:00:14 | ||
00:00:16 | 00:00:17 | 00:00:14 | ||
00:00:19 | 00:00:19 | 00:00:16 | ||
00:00:11 | 00:00:19 | 00:00:16 | ||
00:00:18 | 00:00:19 | 00:00:16 |
我們關注一下第一個窗口的例子,數據 3 在數據 7 之后才進來,但還是正確進入到了第一個窗口中運算,表示設定了 Watermark
后,能夠解決亂序事件。
3.5 留下一個疑問
同樣的測試數據,前面的是批量拷貝過去的,程序和水印 Watermark
正常起了作用,但是在單條數據,一條一條發送的情況下,出現了另一種窗口觸發情況:
也是關注第一個窗口,前面例子中,輸出的結果是窗口數據有四個,本次只有三個,我們下一條數據 3 被舍棄了,這個情況我無法解釋清楚,也算留個坑,等之后再去處理。
3.6 Watermark 和 Window 結合
從上面的例子可以看出,使用水印時,需要設置延遲時間 maxOutOfOrderness
,如果設置過大的話,容易造成程序中出現多個窗口,一直在等待延遲數據,然后窗口一直不被觸發。
服務器內存使用過多容易導致 OOM
不說,而且窗口不被觸發計算,會造成統計數據的實時性變差,影響業務輸出。
按照 zhisheng
的建議,在之后的使用中,要注意以下兩點:
- 合理的設置
maxOutOfOrderness
,避免過大 - 不太依賴
Event Time
的場景就不要設置時間策略為EventTime
4 延遲數據如何處理
4.1 默認策略:丟棄
這個是默認處理方式,從前面的例子中也能看到,數據 10 在數據 17 后面進入,此時水印比 10 要大,所以數據 10 所處的窗口 [08 <--> 12] 已經被觸發,於是數據 10 被舍棄,不參與計算
4.2 剩下的內容
- allowedLateness 再次指定允許數據延遲的時間
- sideOutputLateData 搜集遲到的數據
前面兩個真正深入去學,估計也是一篇大學問,所以推薦給大家看別人寫的,之后我學到再跟大家分享吧:
5 總結
- Flink 如何處理亂序
使用 Watermark
+ Window
機制
- Flink 何時觸發 Window
普通設定,沒有設定 allowedLateness
更多延遲處理時間
Watermark >= Event Time
關於 allowedLateness
,請看參考資料 4
- Flink 使用 Watermark 的建議
1. 合理的設置
maxOutOfOrderness
,避免過大
2. 不太依賴Event Time
的場景就不要設置時間策略為EventTime
本篇開門見山,直接介紹了 Watermark
水印的概念,因為亂序事件的情況,Flink
設計了水印用來處理延遲事件。介紹了兩種生成水印的方法 Periodic
:周期性 和 Punctuated
:按次遞增生成,以常用的 Periodic
水印 作為例子,通過代碼和測試數據驗證了延遲事件被正確處理了。
在進一步學習 Watermark
時,參考了很多文章,大致了解了它的概念和使用,不過還存在一些疑問和坑,希望各位有了解的可與我分享。如有其它學習建議或文章不對之處,請與我討論吧~
6 項目地址
https://github.com/Vip-Augus/flink-learning-note
git clone https://github.com/Vip-Augus/flink-learning-note
7 參考資料
- Flink Window分析及Watermark解決亂序數據機制深入剖析-Flink牛刀小試
- Flink流計算編程--watermark(水位線)簡介
- Flink WaterMark簡介
- Flink流計算編程--Flink中allowedLateness詳細介紹及思考
歡迎關注 我們為您准備了 Java 生態知識體系/面試必看資料 回復領取