

當我們在使用Flink的時候,避免不了要和時間(time)、水位線(watermarks)打交道,理解這些概念是開發分布式流處理應用的基礎。那么Flink支持哪些時間語義?Flink是如何處理亂序事件的?什么是水位線?水位線是如何生成的?水位線的傳播方式是什么?讓我們帶着這些問題來開始本文的內容。
時間語義
基本概念
時間是Flink等流處理中最重要的概念之一,在 Flink 中 Time 可以分為三種:Event-Time,Processing-Time 以及 Ingestion-Time,如下圖所示:
Event Time
事件時間,事件(Event)本身的時間,即數據流中事件實際發生的時間,通常使用事件發生時的時間戳來描述,這些事件的時間戳通常在進入流處理應用之前就已經存在了,事件時間反映了事件真實的發生時間。所以,基於事件時間的計算操作,其結果是具有確定性的,無論數據流的處理速度如何、事件到達算子的順序是否會亂,最終生成的結果都是一樣的。
Ingestion Time
攝入時間,事件進入Flink的時間,即將每一個事件在數據源算子的處理時間作為事件時間的時間戳,並自動生成水位線(watermarks,關於watermarks下文會詳細分析)。
Ingestion Time從概念上講介於Event Time和Processing Time之間。與Processing Time相比 ,它的性能消耗更多一些,但結果卻更可預測。由於 Ingestion Time使用穩定的時間戳(在數據源處分配了一次),因此對記錄的不同窗口操作將引用相同的時間戳,而在Processing Time中每個窗口算子都可以將記錄分配給不同的窗口。
與Event Time相比,Ingestion Time無法處理任何亂序事件或遲到的數據,即無法提供確定的結果,但是程序不必指定如何生成水位線。在內部,Ingestion Time與Event Time非常相似,但是可以實現自動分配時間戳和自動生成水位線的功能。
Processing Time
處理時間,根據處理機器的系統時鍾決定數據流當前的時間,即事件被處理時當前系統的時間。還以窗口算子為例(關於window,下文會詳細分析),基於處理時間的窗口操作是以機器時間來進行觸發的,由於數據到達窗口的速率不同,所以窗口算子中使用處理時間會導致不確定的結果。在使用處理時間時,無需等待水位線的到來后進行觸發窗口,所以可以提供較低的延遲。
對比
經過上面的分析,應該對Flink的時間語義有了大致的了解。不知道你會不會有這樣一個疑問:既然事件時間已經能夠解決所有的問題了,那為何還要用處理時間呢?其實處理時間有其特定的使用場景,處理時間由於不用考慮事件的延遲與亂序,所以其處理數據的延遲較低。因此如果一些應用比較重視處理速度而非准確性,那么就可以使用處理時間,比如要實時監控儀表盤。總之,雖然處理時間的延遲較低,但是其結果具有不確定性,事件時間雖然有延遲,但是能夠保證處理的結果具有准確性,並且可以處理延遲甚至無序的數據。
使用
上一小結講述了三種時間語義的基本概念,接下來將從代碼層面講解在程序中該如何配置這三種時間語義。首先來看一段代碼:
/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
//省略的代碼
/** The time characteristic used by the data streams. */
private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
上述兩行代碼摘自StreamExecutionEnvironment類,可以看出,Flink在流處理程序中默認的時間語義是Processing Time,那么該如何修改默認的時間語義呢?很簡單,再來看一段代碼,下面的代碼片段同樣來自於StreamExecutionEnvironment類:
/**
* 如果使用Processing Time或者Event Time,默認的水位線間隔時間是200毫秒
* 可以通過ExecutionConfig#setAutoWatermarkInterval(long)設置
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
上述的方法可以配置不同的時間語義,參數TimeCharacteristic是一個枚舉類,包括ProcessingTime,IngestionTime,EventTime三個元素。具體使用方式如下:
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
watermarks
在解釋watermarks(水位線)之前,先看一個我們身邊發生的真實案例。高考,是大家非常熟悉的場景。如果把高考的考試安排簡單地看作是一個流處理應用,那么,每一個考試科目的開始時間到結束時間就是一個窗口,每個考生可以理解成一條記錄,考生到達考場的時間可以理解成記錄的時間戳,而考試可以理解成某種算子操作。大家都知道,高考考試在開考后15分鍾是不允許進場的,這個規定可以理解成一個水位線,比如,上午第一場語文考試,開考時間是9:30,允許在9:45之前進入考場,那么9:45這個時間可以理解成一個水位線。在開考之前,有的同學喜歡提前到考場,有的同學喜歡卡點到考場。假設有個同學叫考必勝,ta是卡着時間點到的考場,但是早上由於吃了不干凈的東西,突然感覺肚子不適,無奈之下在廁所里耽誤了16分鍾,那么按照規定,此時考必勝是不能夠進入考場的,因為此時已經默認所有考生都已經在考場了,此時考試也已經觸發,那么考必勝就可以理解為遲到的事件。以上就是對窗口、事件時間以及水位線的簡單理解,下面開始詳細解釋什么水位線。
基本概念
在上一節中,詳細講解了Flink提供的三種時間語義,在講解這三種時間語義的時候,提到了一個名詞---水位線,那么究竟什么是水位線呢?先來看一個例子,假如要每5分鍾統計一次過去1個小時內的熱門商品的topN,這是一個典型的滑動窗口操作,那么基於事件時間的窗口該在什么時候出發計算呢?換句話說,我們要等多久才能夠確定已經接收到了特定時間點之前的所有事件,另一方面,由於網絡延遲等原因,會產生亂序的數據,在進行窗口操作時,不能夠無限期的等待下去,需要一個機制來告訴窗口在某個特定時間來觸發window計算,即認為小於等於該時間點的數據都已經到來了。這個機制就是watermark(水位線),可以用來處理亂序事件。
水位線是一個全局的進度指標,表示可以確定不會再有延遲的事件到來的某個時間點。從本質上講,水位線提供了一個邏輯時鍾,用來通知系統當前的事件時間。比如,當一個算子接收到了W(T)時刻的水位線,就可以大膽的認為不會再接收到任何時間戳小於或等於W(T)的事件了。水位線對於基於事件時間的窗口和處理亂序數據是非常關鍵的,算子一旦接收到了某個水位線,就相當於接到一支穿雲箭的信號:所有特定時間區間的數據都已集結完畢,可以進行窗口觸發計算。
既然已經說了,事件是會存在亂序的,那這個亂序的程度究竟有多大呢,這個就不太好確定了,總之總會有些遲到的事件慢慢悠悠的到來。所以,水位線其實是一種在准確性與延遲之間的權衡,如果水位線設置的非常苛刻,即不允許有掉隊的數據出現,雖然准確性提高了,但這在無形之中增加了數據處理的延遲。反之,如果水位線設置的非常激進,即允許有遲到的數據發生,那么雖然降低了數據處理的延遲,但數據的准確性會較低。
所以,水位線是中庸之道,過猶不及。在很多現實應用中,系統無法獲取足夠多的信息來確定完美的水位線,那么該怎么辦呢?Flink提供了某些機制來處理那些可能晚於水位線的遲到時間,用戶可以根據應用的需求不同,可以將這些漏網之魚(遲到的數據)舍棄掉,或者寫入日志,或者利用他們修正之前的結果。
上面說到沒有完美的水位線,可能還是很抽象。接下來,我們再看一幅圖,從圖中可以很直觀地觀察真實的水位線與理想中的完美水位線之間的關系,如下圖:
上圖的淺灰色直虛線表示理想的水位線,深灰色的彎曲虛線表示現實中的水位線,黑色直線表示兩者之間的偏差。在理想狀態下,這種偏差為0,因為總是在時間發生時就會立即處理,即事件的真實時間與處理事件的時間是一致的。比如,12:01產生的事件剛好在12:01時被處理,12:02產生的事件剛好在12:02時被處理。但是現實總會有遲到的數據產生,比如網絡延遲的原因,所以真實的情況會像深灰色的彎曲虛線表示的那樣,即12:01產生的數據可能會在12:01之后被處理,12:02產生的數據在12:02時被處理,12:03時產生的數據會被在12:03之后處理。這種動態的偏差在分布式處理系統中是非常常見的。
水位線圖解
在上一小節,通過語言描述對水位線的概念進行了詳細解讀,在本小節會通過圖解的方式解析水位線的含義,這樣更能加深對水位線的理解。如下圖所示:
如上圖,矩形表示一條記錄,三角表示該條記錄的時間戳(真實發生時間),圓圈表示水位線。可以看到上面的數據是亂序的,比如當算子接收到為2的水位線時,就可以認為時間戳小於等於2的數據都已經到來了,此時可以觸發計算。同理,接收到為5的水位線時,就可以認為時間戳小於或等於5的數據都已經到來了,此時可以觸發計算。
可以看出水位線是單調遞增的,並且和記錄的時間戳存在聯系,一個時間戳為T的水位線表示接下來所有記錄的時間戳一定都會大於T。
水位線的傳播
現在,或許你已經對水位線是什么有了一個初步的認識,接下來將會介紹水位線是怎么在Flink內部傳播的。關於水位線的傳播策略可以歸納為3點:
首先,水位線是以廣播的形式在算子之間進行傳播
Long.MAX_VALUE表示事件時間的結束,即未來不會有數據到來了
單個分區的輸入取最大值,多個分區的輸入取最小值
關於Long.MAX_VALUE的解釋,先看一段代碼,如下:
/**
* 當一個source關閉時,會輸出一個Long.MAX_VALUE的水位線,當一個算子接收到該水位線時,
* 相當於接收到一個信號:未來不會再有數據輸入了
*/
@PublicEvolving
public final class Watermark extends StreamElement {
//表示事件時間的結束
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
//省略的代碼
}
關於另外兩條策略的解釋,可以從下圖中得到:
如上圖,一個任務會為它的每個分區都維護一個分區水位線(partition watermark),當收到每個分區傳來的水位線時,任務首先會讓當前分區水位線的值與接收的水位線值相比較,如果新接收的水位線值大於當前分區水位線值,則會將對應的分區水位線值更新為較大的水位線值(如上圖中的2步驟),接着,任務會把事件時鍾調整為當前分區水位線值的最小值,如上圖步驟2 ,由於當前分區水位線的最小值為3,所以將事件時間時鍾更新為3,然后將值為3的水位線廣播到下游任務。步驟3與步驟4的處理邏輯同上。
同時我們可以注意到這種設計其實有一個局限,具體體現在沒有對分區(partition)是否來自於不同的流進行區分,比如對於兩條流或多條流的Union或Connect操作,同樣是按照全部分區水位線中最小值來更新事件時間時鍾,這就導致所有的輸入記錄都會按照基於同一個事件時間時鍾來處理,這種一刀切的做法對於同一個流的不同分區而言是無可厚非的,但是對於多條流而言,強制使用一個時鍾進行同步會對整個集群帶來較大的性能開銷,比如當兩個流的水位線相差很大是,其中的一個流要等待最慢的那條流,而較快的流的記錄會在狀態中緩存,直到事件時間時鍾到達允許處理它們的那個時間點。
水位線的生成方式
通常情況下,在接收到數據源之后應該馬上為其生成水位線,即越靠近數據源越好。Flink提供兩種方式生成水位線,其中一種方式為在數據源完成的,即利用SourceFunction在應用讀入數據流的時候分配時間戳與水位線。另一種方式是通過實現接口的自定義函數,該方式又包括兩種實現方式:一種為周期性生成水位線,即實現AssignerWithPeriodicWatermarks接口,另一種為定點生成水位線,即實AssignerWithPunctuatedWatermarks接口。具體如下圖所示:
數據源方式
該方式主要是實現自定義數據源,數據源分配時間戳和水位線主要是通過內部的SourceContext對象實現的,先看一下SourceFunction的源碼,如下:
public interface SourceFunction<T> extends Function, Serializable {
void cancel();
interface SourceContext<T> {
void collect(T element);
/**
* 用於輸出記錄並附屬一個與之關聯的時間戳
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
/**
* 用於輸出傳入的水位線
*/
@PublicEvolving
void emitWatermark(Watermark mark);
/**
* 將自身標記為空閑狀態
* 某個某個分區不在產生數據,會阻礙全局水位線前進,
* 因為收不到新的記錄,意味着不會發出新的水位線,
* 根據水位線的傳播策略,會導致整個應用都停止工作
* Flink提供一種機制,將數據源函數暫時標記為空閑,
* 在空閑狀態下,Flink水位線的傳播機制會忽略掉空閑的數據流分區
*/
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
從上面對的代碼可以看出,通過SourceContext對象的方法可以實現時間戳與水位線的分配。
自定義函數的方式
使用自定義函數的方式分配時間戳,只需要調用assignTimestampsAndWatermarks()方法,傳入一個實現AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks接口的分配器即可,如下代碼所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<UserBehavior> userBehavior = env
.addSource(new MysqlSource())
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
周期分配器(AssignerWithPeriodicWatermarks)
該分配器是實現了一個AssignerWithPeriodicWatermarks的用戶自定義函數,通過重寫extractTimestamp()方法來提取時間戳,提取出來的時間戳會附加在各自的記錄上,查詢得到的水位線會注入到數據流中。
周期性的生成水位線是指以固定的時間間隔來發出水位線並推進事件時間的前進,關於默認的時間間隔在上文中也有提到,根據選擇的時間語義確定默認的時間間隔,如果使用Processing Time或者Event Time,默認的水位線間隔時間是200毫秒,當然用戶也可以自己設定時間間隔,關於如何設定,先看一段代碼,代碼來自於ExecutionConfig類:
/**
* 設置生成水位線的時間間隔
* 注:自動生成watermarks的時間間隔不能是負數
*/
@PublicEvolving
public ExecutionConfig setAutoWatermarkInterval(long interval) {
Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
this.autoWatermarkInterval = interval;
return this;
}
所以,如果要調整默認的200毫秒的間隔,可以調用setAutoWatermarkInterval()方法,具體使用如下:
//每3秒生成一次水位線
env.getConfig().setAutoWatermarkInterval(3000);
上面指定了每隔3秒生成一次水位線,即每隔3秒會自動向流里注入一個水位線,在代碼層面,Flink會每隔3秒鍾調用一次AssignerWithPeriodicWatermarks的getCurrentWatermark()方法,每次調用該方法時,如果得到的值不為空並且大於上一個水位線的時間戳,那么就會向流中注入一個新的水位線。這項檢查可以有效地保證了事件時間的遞增的特性,一旦檢查失敗也就不會生成水位線。下面給出一個實現周期分配水位線的例子:
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<UserBehavior> {
// 定義1分鍾的容忍間隔時間,即允許數據的最大亂序時間
private long maxOutofOrderness = 60 * 1000;
// 觀察到的最大時間戳
private long currentMaxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
// 生成具有1分鍾容忍度的水位線
return new Watermark(currentMaxTs - maxOutofOrderness);
}
@Override
public long extractTimestamp(UserBehavior element, long previousElementTimestamp) {
//獲取當前記錄的時間戳
long currentTs = element.timestamp;
// 更新最大的時間戳
currentMaxTs = Math.max(currentMaxTs, currentTs);
// 返回記錄的時間戳
return currentTs;
}
}
通過查看TimestampAssignerd 繼承關系可以發現(繼承關系如下圖),除此之外,Flink還提供了兩種內置的水位線分配器,分別為:AscendingTimestampExtractor和BoundedOutOfOrdernessTimestampExtractor兩個抽象類。
關於AscendingTimestampExtractor,一般是在數據集的時間戳是單調遞增的且沒有亂序時使用,該方法使用當前的時間戳生成水位線,使用方式如下:
SingleOutputStreamOperator<UserBehavior> userBehavior = env
.addSource(new MysqlSource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.timestamp*1000;
}
});
關於BoundedOutOfOrdernessTimestampExtractor,是在數據集中存在亂序數據的情況下使用,即數據有延遲(任意新到來的元素與已經到來的時間戳最大的元素之間的時間差),這種方式可以接收一個表示最大預期延遲參數,具體如下:
SingleOutputStreamOperator<UserBehavior> userBehavior = env
.addSource(new MysqlSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.seconds(10)) {
@Override
public long extractTimestamp(UserBehavior element) {
return element.timestamp*1000;
}
} );
上述的代碼接收了一個10秒鍾延遲的參數,這10秒鍾意味着如果當前元素的事件時間與到達的元素的最大時間戳的差值在10秒之內,那么該元素會被處理,如果差值超過10秒,表示其本應該參與的計算,已經完成了,Flink稱之為遲到的數據,Flink提供了不同的策略來處理這些遲到的數據。
定點水位線分配器(AssignerWithPunctuatedWatermarks)
該方式是基於某些事件(指示系統進度的特殊元祖或標記)觸發水位線的生成與發送,基於特定的事件向流中注入一個水位線,流中的每一個元素都有機會判斷是否生成一個水位線,如果得到的水位線不為空並且大於之前的水位線,就生成水位線並注入流中。
實現AssignerWithPunctuatedWatermarks接口,重寫checkAndGetNextWatermark()方法,該方法會在針對每個事件的extractTimestamp()方法后立即調用,以此來決定是否生成一個新的水位線,如果該方法返回一個非空並且大於之前值的水位線,就會將這個新的水位線發出。
下面將會實現一個簡單的定點水位線分配器
public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<UserBehavior> {
// 定義1分鍾的容忍間隔時間,即允許數據的最大亂序時間
private long maxOutofOrderness = 60 * 1000;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(UserBehavior element, long extractedTimestamp) {
// 如果讀取數據的用戶行為是購買,就生成水位線
if(element.action.equals("buy")){
return new Watermark(extractedTimestamp - maxOutofOrderness);
}else{
// 不發出水位線
return null;
}
}
@Override
public long extractTimestamp(UserBehavior element, long previousElementTimestamp) {
return element.timestamp;
}
}
遲到的數據
上文已經說過,現實中很難生成一個完美的水位線,水位線就是在延遲與准確性之前做的一種權衡。那么,如果生成的水位線過於緊迫,即水位線可能會大於后來數據的時間戳,這就意味着數據有延遲,關於延遲數據的處理,Flink提供了一些機制,具體如下:
直接將遲到的數據丟棄
將遲到的數據輸出到單獨的數據流中,即使用sideOutputLateData(new OutputTag<>())實現側輸出
根據遲到的事件更新並發出結果
由於篇幅限制,關於遲到數據的具體處理在本文先不做太多的討論,在后續的文章中會對其詳細進行說明。
總結
本文從Flink的時間語義開始說起,詳細介紹了三種時間語義的概念、特點及使用方式,接着對Flink處理亂序數據的一種機制---水位線進行詳細說明,主要描述了水位線的基本概念,傳播方式、生成方式,並對其中的細節部分進行了圖解,可以加深對水位線的理解。最后,簡單說明了一下Flink對於遲到數據的處理方式。
你真的了解Flink Kafka source嗎?
Flink的八種分區策略源碼解讀


本文分享自微信公眾號 - 大數據技術與數倉(gh_95306769522d)。
如有侵權,請聯系 support@oschina.cn 刪除。
本文參與“OSC源創計划”,歡迎正在閱讀的你也加入,一起分享。