問:數據工程師最期望數據怎么來?
答:按順序來。
MapReduce當初能用起來,就是因為Map階段對所有數據都進行排序了,后面的Reduce階段就可以直接用排序好的數據了。
批處理的時候因為數據已經落地了,咱可以慢慢排序。但是流式數據都是一條一條過來的,這個時候數據到達的時間和出發時的順序不一致會導致非常多的問題,這該咋整呢?
Sparkstreaming對亂序支持很差,因為它其實是“微批”,不是真正的流。加州伯克利大學AMP實驗室設計Spark的時候,想的就是弄一個更快的計算引擎,壓根就沒打算做成來一條處理一條的流式數據處理。所以對於一些亂序數據根本就不太關心,所以導致Sparkstreaming不能或者不太能支持亂序數據的處理。
但是Flink不行啊,數據一條一條的過來,然后進行窗口處理,亂序會導致各種統計問題,這就得必須解決了。
什么是亂序
一條數據在Flink里,有三個時間:
-
Event Time:事件產生的時間;
-
Ingestion Time:事件進入Flink的時間;
-
Window Processing Time:事件被處理的時間。
當數據一條一條規規矩矩的按流程發送,MQ傳輸,Flink接受然后處理,這個時候,就是有序的數據。
當出現各種異常,有些數據延遲了,排在后面的數據跑前面去了,這就出現了亂序。
請思考一下,我們應該以哪個時間戳判定亂序呢?
Flink的WaterMark機制
亂序會導致各種統計上的問題。比如一個Time Window本應該計算1、2、3,結果3遲到了,那這個窗口統計就丟數據了。這可太坑了。
為了解決這個問題,Flink設置了一個三個機制來解決這個問題:
-
WaterMark--水位線,;
-
allowLateNess--數據遲到時間;
-
sideOutPut--超長遲到數據收集;
水位線的設置很簡單(系統時間為准):
override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis - 5000)
設置Watermark為-5秒。但是怎么理解這個-5秒的水位線呢?
經常戶外徒步的同學應該知道一個徒步小隊通常會有一正兩副領隊,隊首隊尾各一個副隊,正隊長在隊伍中穿插協調。
隊尾的領隊叫后隊領隊,后隊領隊要保證所有隊員都在前面,也就是說后隊領隊是整個隊伍的隊尾,當收隊的時候,看見后隊領隊,那就說明整個隊伍都已經完全到達了。
這個Watermark就相當於給整個數據流設置一個后隊領隊。但是窗口是不知道具體要來幾個數的,所以只能設置一個時間上的限制,以此來推測當前窗口最后一條數據是否已經到達。假設窗口大小為10秒,Watermark為-5秒,那么他會做以下事情:
-
每來一條數據,取當前窗口內所有數據的最大時間戳;
-
用最大時間戳扣減Watermark后看看是不是符合窗口關閉條件;
-
如果不符合,則繼續進數據;
-
如果符合,則關閉窗口開始計算。
你看,多像戶外徒步?
-
每來一個人,就問問出發時是幾號,然后確認所有已到隊員最大的號碼;
-
用最大的號碼對比一下后隊領隊的號碼;
-
如果比后隊領隊的號碼小,就不收隊;
-
如果號碼大於等於后隊領隊號碼,就收隊。
遲到的數據
當然啊,即便是用了Watermark機制,依然還會存在遲到的數據。就像戶外徒步一樣,有人走錯路然后又趕上來。后隊領隊分明沒超過任何一個隊員,但是還是有隊員落在后面了。
所以Flink還增設了三種應對方式:
-
allowLateNess--對於遲到一小會的數據,設置一個允許遲到時間;
-
sideOutPut--對於超過允許遲到時間的數據,全部收集起來,后續再處理;
-
如果都不處理,Flink就默認自動丟棄。
也就是說,在watermark機制下,窗口雖然到了關閉時間,但是如果你設置了allowLateNess=10秒,那這個窗口還會再等10秒,看看是否還有他那個小隊的數據,10秒后窗口關閉,開始計算。
如果等了10秒還沒等到,11秒的時候,原本屬於該窗口的數據才姍姍來遲,那么sideOutPut會把數據收集起來,放到側輸出流,等待后續處理。這個數據肯定就不會在當前窗口計算進去了。