窗口(Window)
本文翻譯自文檔Windows
-----------------------------------
Flink使用窗口的概念,根據element的時間戳或者其他指標,將可能無限的DataStream分割為有限的數據切片(slice)。我們在處理無限數據流以及進行聚合element的transformation時需要此種窗口分割。
注意:我們在此文檔中討論的大多是keyed windowing,即window是應用在KeyedStream上的。關鍵字下的窗口具有一定的優勢,即它可以在element傳遞給user function之前就能按照window和關鍵字共同二次分割element。由於不同關鍵字的element可以相互獨立處理,所以該工作可以在cluster之上分布式進行。有關non-keyed window的信息,請查看文檔non-keyed windowing
一、基礎部分
一個帶窗口的transformation至少需要定義一個key(見文檔specifying keys)、一個window assigner以及一個window function。key將無限而無關鍵字的流分割成邏輯的有關鍵字數據流,而window assigner將element賦值給有限的各自關鍵字的窗口(per-key window)。最后window function會用於處理每個窗口的element。
帶窗口transformation的基礎結構如下所示:
DataStream<T> input = ...;
input.keyBy(<key selector>)
.window(<window assigner>)
.<windowed transformation>(<window function>);
我們會在接下來的一節中單獨講window assigners。
Window transformation可以是reduce(),fold()或者apply()之一,它們相對應的需要一個ReduceFunction,FoldFunction或WindowFunction。我們將會在下文window functions中具體描述定義一個帶窗口transformation的不同方式。
在更進一步的用例中,你可以定義一個Trigger來決定一個窗口什么時候才是ready for processing的。相關詳細內容見於本文triggers小節。
二、Window Assigners
Window assigner定義了數據流的element將如何分割進有限的數據切片。Flink自帶預先實現了針對多數典型用例的window assigner,以tumbling window,sliding window,session window和global window命名,此外,你還可以通過繼承WindowAssigner類來自定義自己的window assigner。除了global window,所有自帶window assigner都是基於時間(可以是processing time或者event time)來分配element。有關Flink如何處理時間,請見文檔event time。
在描述這些window assigner如何用於Flink程序之前,我們先描述它們的工作機制。我們將使用抽象圖來可視化每個assigner的工作機制:在下面的內容中,紫色圈是數據流的element,它們以不同的關鍵字進行分割(在該例中關鍵字為user1, user2, user3),x軸表示時間的進展。
2.1Global Windows
Global Window的定義表明我們不會進一步將element二次分割到窗口中。每個element將被分配到一個單獨的各自關鍵字的窗口中。該窗口化模式僅僅在同時擁有一個自定義trigger時才有用。否則由於global window沒有用以聚合element的常態結束,所以不會發生任何計算。
2.2 Tumbling Window
Tumbling window assigner將element分配到一個固定長度、無重疊的窗口,且該窗口的window size有用戶定義。例如,如果你定義window size為5分鍾,window function每次調用都會得到5分鍾的element。
2.3 Sliding Windows
Sliding window assigner和tumbling window一樣,將element分配到一個固定長度(等於window size),但該窗口可重疊,重疊的大小由用戶定義的參數window slide定義。由於窗口可重疊,故一個element可以分配到多個窗口中去。
例如,你可以定義一個window size為10分鍾,且slide為5分鍾。在該窗口中,每此window function會得到10分鍾的element,且5分鍾調用一次。
2.4 Session Windows
Session window assigner在窗口邊界需要根據到達數據調整的情況下十分適用。Tumbling windows和sliding windows的assigner都將element分配到開始於固定時間點並且擁有固定window size的窗口中。而在session中,你可以讓關鍵字窗口開始於它們自己的時間點,並且在一段無活動情況(inactivity)出現時結束窗口。該窗口的配置參數session gap定義了等待新數據多長時間就結束一個session。
2.5 定義一個Window Assigner
除了GlobalWindows,內置window assigner都有兩個版本,一個處理processing-time windowing,另一個處理event-time windowing。Processing-time assigner根據worker設備的當前時鍾來分配element,而event-time assigner根據element的時間戳來分配窗口。有關processing time和event time的區別以及如何給element分配時間戳的內容,請見文檔event time。
下面的代碼片段展示了在程序中如何使用每個window assigner:
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// event-time session windows
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// processing-time session windows
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// global windows
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
注意:我們可以通過Time.millisecond(x),Time.second(x),Time.minutes(x)等等方法來定義時間。
三、Window Functions
在系統確定一個窗口已經做好了處理的准備時(有關系統是如何確定窗口可處理,見文檔trigger),Flink將使用Window Function處理每個窗口中的element。
Window Function可以是ReduceFunction、FlodFunction或WindowFunction。由於Flink在element到達各自窗口時遞增地聚合它們,所以前兩個方法可以更加高效地執行。WindowFunction獲取窗口中所有element的Iterable以及這些element所屬的窗口的額外元信息(meta information)。
由於Flink在調用使用了WindowFunction的窗口化transformation之前,必須在內部緩存一個窗口中所有element,所以此種transformation無法像其他情況一樣高效執行。我們可以將WindowFunction和一個ReduceFunction或FoldFunction相結合,從而在遞增聚合窗口中element的同時,還可以獲取WindowFunction接收的額外信息,通過這種方式,我們可以緩解上述問題。我們在下面會對各種情況一一舉例。
3.1 ReduceFunction
一個reduce方法定義了兩個值如何結合形成一個element。Flink可以使用它來遞增地聚合窗口中的element。
程序中的ReduceFunction如下例:
DataStream<Tuple2<String, Long>> input = ...;
input.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
一個ReduceFunction定義了兩個輸入中的element是如何結合產生一個輸出element的。在上面的例子中,將會計算出一個窗口中所有element的第二個域的總和。
3.2 FoldFunction
一個fold方法可以定義如下:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
一個FoldFunction定義了輸入中的element如何加上一個累加初始值(在本例中為"",即空字符串)。在上例中,將會計算出輸入中所有Long域的字符串連接(concatenation)結果。
3.3 WindowFunction - 一般情況
WindowFunction以性能開銷的增加,換來了最大的靈活性(它可以獲得key和Window的引用)。WindowFunction由於無法遞增聚合窗口中的element,在窗口准備好處理之前,Flink都必須內部緩存整個窗口,所以帶來性能上的開銷。一個WindowFunction將得到處理的窗口中所有element的Iterable。WindowFunction接口的簽名如下所示:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
下面我們舉例使用WindowFunction來計算一個窗口中element數量。我們之所以選擇WindowFunction,是因為我們想在發送計數的同時,還想訪問並一同發送有關窗口的信息。這將非常低效,我們應當在練習中和一個ReduceFunction一同實現WindowFunction。在下一節中,我們將會看到將ReduceFunction和WindowFunction結合來獲取遞增地聚合以及之前添加的WindowFunction的信息
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple<String, Long> in: input) {
count++;
}
out.collect("Window: " + window + "count: " + count);
}
}
3.4 帶有遞增聚合的WindowFunction
一個WindowFunction可以與ReduceFunction或者FoldFunction結合,在結合后,ReduceFunction/FoldFunction將會用來在窗口的element到達時遞增聚合它們,而WindowFunction則會在窗口准備好處理時得到已經聚合后的結果。這種方式使我們可以即獲得窗口遞增計算的優勢,又可以獲得編寫一個WindowFunction提供的額外窗口元信息。
下面的例子為我們展示了遞增聚合方法如何與WindowFunction結合:
DataStream<Tuple2<String, Long>> input = ...;
// for folding incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
// for reducing incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyReduceFunction(), new MyWindowFunction());
四、處理遲到數據
在處理事件時間窗口時,可能會發生element遲到的情況,即Flink用來持續跟蹤事件時間進展的watermark已經晚於到達element所屬的窗口的結束時間戳了的情況。有關event time和其中遲到element等等Flink如何處理event time的詳細討論,請見event time和late element。
你可以定義一個帶窗口transformation如何處理遲到element以及允許的遲到時間(lateness)。相關的參數為allowed lateness,該參數定義了element最多可以遲到多長時間。對於在allowed lateness之內到達的element,Flink仍然會將它們放入窗口中並且考慮到計算結果之內。而在allowed lateness之外到達的element則將被拋棄。Flink同樣保證一旦Watermark超過窗口結束時間加上allowed lateness,由窗口Operation持有的所有狀態都將進入垃圾回收。
默認地,allowed lateness設置為0,即在watermark之后到達的element將會被拋棄。你可以通過以下方式定義allow lateness:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
注意,當使用GlobalWindows的window assigner時,沒有數據會變為遲到數據,因為全局窗口的結束時間戳為Long.MAX_VALUE
五、Triggers
一個Trigger決定着窗口(由WindowAssigner賦值)什么時候准備好由window function處理。trigger觀察element如何加入窗口中,並且持續跟蹤processing time和event time的進展。一旦一個trigger決定窗口已准備好處理,它就會被觸發。這是Window Operation獲取當前處於窗口中的數據,並且將它們傳遞給window function來產生處於觸發狀態的(firing)窗口的輸出的信號。
除了GlobalWindows,每個WindowAssigner都帶有一個默認trigger來適用於絕大多數用例。例如,TumblingEventTimeWindows擁有EventTimeTrigger作為默認trigger,該trigger簡單地在watermark超過窗口的結束時間時觸發。
你可以通過使用給定Trigger類來調用trigger()方法定義所用的trigger。一個帶窗口transformation的全貌大致如下:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.trigger(<trigger>)
.<windowed transformation>(<window function>);
Flink自帶一些開箱即用的trigger:包括上面提到的EventTimeTrigger,基於由watermark衡量的event time的進展來決定是否觸發;還有ProcessingTimeTrigger,與EventTimeTrigger大致一樣,但基於processing time;最后是CountTrigger,在一個窗口的element數量溢出給定界限時觸發。
注意,通過使用trigger()方法定義一個trigger,你將重寫WindowAssigner的默認trigger。例如,若你為TunmblingEventTimeWindows定義CountTrigger為trigger,窗口將不會基於時間進展觸發,而僅僅依靠計數結果來觸發。在當前版本下,若你想要同時對時間和計數都做出響應,你只能自定義trigger。
內部Trigger API在當前版本仍然處於測試階段,但如果你想要編寫自定義trigger,請檢出(check out)該代碼。Trigger.java
六、non-keyed windowing
你同樣可以在定義一個帶窗口transformation時忽略KeyBy()方法,該方式將使Flink無法並行處理各自不同key的窗口,本質上將transformation變成了一個非並行的Operation。
警告:正如本文開始介紹中提到的,由於non-keyed windows不能分各自key獨立計算,所以有着無法在集群上分布式運行的缺陷,這將帶來一些性能上的影響。
一個帶non-keyed window的transformation的基本結構如下代碼所示:
DataStream<T> input = ...;
input
.windowAll(<window assigner>)
.<windowed transformation>(<window function>);