Flink Program Guide (6) -- 窗口 (DataStream API編程指導 -- For Java)


窗口(Window

本文翻譯自文檔Windows

 

-----------------------------------

Flink使用窗口的概念,根據element的時間戳或者其他指標,將可能無限的DataStream分割為有限的數據切片(slice)。我們在處理無限數據流以及進行聚合elementtransformation時需要此種窗口分割。

 

注意:我們在此文檔中討論的大多是keyed windowing,即window是應用在KeyedStream上的。關鍵字下的窗口具有一定的優勢,即它可以在element傳遞給user function之前就能按照window和關鍵字共同二次分割element。由於不同關鍵字的element可以相互獨立處理,所以該工作可以在cluster之上分布式進行。有關non-keyed window的信息,請查看文檔non-keyed windowing

 

一、基礎部分

一個帶窗口的transformation至少需要定義一個key(見文檔specifying keys)、一個window assigner以及一個window functionkey將無限而無關鍵字的流分割成邏輯的有關鍵字數據流,而window assignerelement賦值給有限的各自關鍵字的窗口(per-key window)。最后window function會用於處理每個窗口的element

 

帶窗口transformation的基礎結構如下所示:

DataStream<T> input = ...;

input.keyBy(<key selector>)
  .
window(<window assigner>)
  .<windowed transformation>(<window function>);

我們會在接下來的一節中單獨講window assigners

 

Window transformation可以是reduce()fold()或者apply()之一,它們相對應的需要一個ReduceFunctionFoldFunctionWindowFunction。我們將會在下文window functions中具體描述定義一個帶窗口transformation的不同方式。

 

在更進一步的用例中,你可以定義一個Trigger來決定一個窗口什么時候才是ready for processing的。相關詳細內容見於本文triggers小節。

 

二、Window Assigners

Window assigner定義了數據流的element將如何分割進有限的數據切片。Flink自帶預先實現了針對多數典型用例的window assigner,以tumbling windowsliding windowsession windowglobal window命名,此外,你還可以通過繼承WindowAssigner類來自定義自己的window assigner。除了global window,所有自帶window assigner都是基於時間(可以是processing time或者event time)來分配element。有關Flink如何處理時間,請見文檔event time

 

在描述這些window assigner如何用於Flink程序之前,我們先描述它們的工作機制。我們將使用抽象圖來可視化每個assigner的工作機制:在下面的內容中,紫色圈是數據流的element,它們以不同的關鍵字進行分割(在該例中關鍵字為user1, user2, user3),x軸表示時間的進展。

 

2.1Global Windows

Global Window的定義表明我們不會進一步將element二次分割到窗口中。每個element將被分配到一個單獨的各自關鍵字的窗口中。該窗口化模式僅僅在同時擁有一個自定義trigger時才有用。否則由於global window沒有用以聚合element的常態結束,所以不會發生任何計算。

 

 

 

2.2 Tumbling Window

Tumbling window assignerelement分配到一個固定長度、無重疊的窗口,且該窗口的window size有用戶定義。例如,如果你定義window size5分鍾,window function每次調用都會得到5分鍾的element

 

 

 

2.3 Sliding Windows

Sliding window assignertumbling window一樣,將element分配到一個固定長度(等於window size),但該窗口可重疊,重疊的大小由用戶定義的參數window slide定義。由於窗口可重疊,故一個element可以分配到多個窗口中去。

 

例如,你可以定義一個window size10分鍾,且slide5分鍾。在該窗口中,每此window function會得到10分鍾的element,且5分鍾調用一次。

 

 

 

2.4 Session Windows

Session window assigner在窗口邊界需要根據到達數據調整的情況下十分適用。Tumbling windowssliding windowsassigner都將element分配到開始於固定時間點並且擁有固定window size的窗口中。而在session中,你可以讓關鍵字窗口開始於它們自己的時間點,並且在一段無活動情況(inactivity)出現時結束窗口。該窗口的配置參數session gap定義了等待新數據多長時間就結束一個session

 

 

2.5 定義一個Window Assigner

除了GlobalWindows,內置window assigner都有兩個版本,一個處理processing-time windowing,另一個處理event-time windowingProcessing-time assigner根據worker設備的當前時鍾來分配element,而event-time assigner根據element的時間戳來分配窗口。有關processing timeevent time的區別以及如何給element分配時間戳的內容,請見文檔event time

 

下面的代碼片段展示了在程序中如何使用每個window assigner

DataStream<T> input = ...;

// tumbling event-time windows
input
  .
keyBy(<key selector>)
  .
window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .<windowed transformation>(<window function>);

// sliding event-time windows
input
  .
keyBy(<key selector>)
  .
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .<windowed transformation>(<window function>);

// event-time session windows
input
  .
keyBy(<key selector>)
  .
window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
  .
keyBy(<key selector>)
  .
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  .<windowed transformation>(<window function>);

// sliding processing-time windows
input
  .
keyBy(<key selector>)
  .
window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .<windowed transformation>(<window function>);

// processing-time session windows
input
  .
keyBy(<key selector>)
  .
window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  .<windowed transformation>(<window function>);

// global windows
input
  .
keyBy(<key selector>)
  .
window(GlobalWindows.create())
  .<windowed transformation>(<window function>);

 

注意:我們可以通過Time.millisecond(x)Time.second(x)Time.minutes(x)等等方法來定義時間。

 

三、Window Functions

在系統確定一個窗口已經做好了處理的准備時(有關系統是如何確定窗口可處理,見文檔trigger),Flink將使用Window Function處理每個窗口中的element

 

Window Function可以是ReduceFunctionFlodFunctionWindowFunction。由於Flinkelement到達各自窗口時遞增地聚合它們,所以前兩個方法可以更加高效地執行。WindowFunction獲取窗口中所有elementIterable以及這些element所屬的窗口的額外元信息(meta information)

由於Flink在調用使用了WindowFunction的窗口化transformation之前,必須在內部緩存一個窗口中所有element,所以此種transformation無法像其他情況一樣高效執行。我們可以將WindowFunction和一個ReduceFunctionFoldFunction相結合,從而在遞增聚合窗口中element的同時,還可以獲取WindowFunction接收的額外信息,通過這種方式,我們可以緩解上述問題。我們在下面會對各種情況一一舉例。

 

3.1 ReduceFunction

一個reduce方法定義了兩個值如何結合形成一個elementFlink可以使用它來遞增地聚合窗口中的element

 

程序中的ReduceFunction如下例:

DataStream<Tuple2<String, Long>> input = ...;

input.keyBy(<key selector>)
  .
window(<window assigner>)
  .
reduce(new ReduceFunction<Tuple2<String, Long>> {
    public Tuple2<String, Long>
reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
      return new Tuple2<>(v1.
f0, v1.f1 + v2.f1);
    }
  });

一個ReduceFunction定義了兩個輸入中的element是如何結合產生一個輸出element的。在上面的例子中,將會計算出一個窗口中所有element的第二個域的總和。

 

3.2 FoldFunction

一個fold方法可以定義如下:

DataStream<Tuple2<String, Long>> input = ...;

input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
fold("", new FoldFunction<Tuple2<String, Long>, String>> {
    public String
fold(String acc, Tuple2<String, Long> value) {
      return acc + value.
f1;
    }
  });

一個FoldFunction定義了輸入中的element如何加上一個累加初始值(在本例中為"",即空字符串)。在上例中,將會計算出輸入中所有Long域的字符串連接(concatenation)結果。

 

3.3 WindowFunction - 一般情況

WindowFunction以性能開銷的增加,換來了最大的靈活性(它可以獲得keyWindow的引用)。WindowFunction由於無法遞增聚合窗口中的element,在窗口准備好處理之前,Flink都必須內部緩存整個窗口,所以帶來性能上的開銷。一個WindowFunction將得到處理的窗口中所有elementIterableWindowFunction接口的簽名如下所示:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
  
* Evaluates the window and outputs none or several elements.
  
*
  
* @param key The key for which this window is evaluated.
  
* @param window The window that is being evaluated.
  
* @param input The elements in the window being evaluated.
  
* @param out A collector for emitting elements.
  
*
  
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  
*/
  
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

 

下面我們舉例使用WindowFunction來計算一個窗口中element數量。我們之所以選擇WindowFunction,是因為我們想在發送計數的同時,還想訪問並一同發送有關窗口的信息。這將非常低效,我們應當在練習中和一個ReduceFunction一同實現WindowFunction。在下一節中,我們將會看到將ReduceFunctionWindowFunction結合來獲取遞增地聚合以及之前添加的WindowFunction的信息

DataStream<Tuple2<String, Long>> input = ...;

input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
apply(new MyWindowFunction());

/* ... */

public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {

void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
  
long count = 0;
  for (Tuple<String, Long> in: input) {
    count++;
  }
  out.
collect("Window: " + window + "count: " + count);
}

}

 

3.4 帶有遞增聚合的WindowFunction

一個WindowFunction可以與ReduceFunction或者FoldFunction結合,在結合后,ReduceFunction/FoldFunction將會用來在窗口的element到達時遞增聚合它們,而WindowFunction則會在窗口准備好處理時得到已經聚合后的結果。這種方式使我們可以即獲得窗口遞增計算的優勢,又可以獲得編寫一個WindowFunction提供的額外窗口元信息。

下面的例子為我們展示了遞增聚合方法如何與WindowFunction結合:

DataStream<Tuple2<String, Long>> input = ...;

// for folding incremental computation
input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());

// for reducing incremental computation
input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
apply(new MyReduceFunction(), new MyWindowFunction());

 

四、處理遲到數據

在處理事件時間窗口時,可能會發生element遲到的情況,即Flink用來持續跟蹤事件時間進展的watermark已經晚於到達element所屬的窗口的結束時間戳了的情況。有關event time和其中遲到element等等Flink如何處理event time的詳細討論,請見event timelate element

 

你可以定義一個帶窗口transformation如何處理遲到element以及允許的遲到時間(lateness)。相關的參數為allowed lateness,該參數定義了element最多可以遲到多長時間。對於在allowed lateness之內到達的elementFlink仍然會將它們放入窗口中並且考慮到計算結果之內。而在allowed lateness之外到達的element則將被拋棄。Flink同樣保證一旦Watermark超過窗口結束時間加上allowed lateness,由窗口Operation持有的所有狀態都將進入垃圾回收。

 

默認地,allowed lateness設置為0,即在watermark之后到達的element將會被拋棄。你可以通過以下方式定義allow lateness

DataStream<T> input = ...;

input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
allowedLateness(<time>)
  .<windowed transformation>(<window function>);

 

注意,當使用GlobalWindowswindow assigner時,沒有數據會變為遲到數據,因為全局窗口的結束時間戳為Long.MAX_VALUE

 

五、Triggers

一個Trigger決定着窗口(由WindowAssigner賦值)什么時候准備好由window function處理。trigger觀察element如何加入窗口中,並且持續跟蹤processing timeevent time的進展。一旦一個trigger決定窗口已准備好處理,它就會被觸發。這是Window Operation獲取當前處於窗口中的數據,並且將它們傳遞給window function來產生處於觸發狀態的(firing)窗口的輸出的信號。

 

除了GlobalWindows,每個WindowAssigner都帶有一個默認trigger來適用於絕大多數用例。例如,TumblingEventTimeWindows擁有EventTimeTrigger作為默認trigger,該trigger簡單地在watermark超過窗口的結束時間時觸發。

 

你可以通過使用給定Trigger類來調用trigger()方法定義所用的trigger。一個帶窗口transformation的全貌大致如下:

DataStream<T> input = ...;

input
  .
keyBy(<key selector>)
  .
window(<window assigner>)
  .
trigger(<trigger>)
  .<windowed transformation>(<window function>);

 

Flink自帶一些開箱即用的trigger:包括上面提到的EventTimeTrigger,基於由watermark衡量的event time的進展來決定是否觸發;還有ProcessingTimeTrigger,與EventTimeTrigger大致一樣,但基於processing time;最后是CountTrigger,在一個窗口的element數量溢出給定界限時觸發。

 

注意,通過使用trigger()方法定義一個trigger,你將重寫WindowAssigner的默認trigger。例如,若你為TunmblingEventTimeWindows定義CountTriggertrigger,窗口將不會基於時間進展觸發,而僅僅依靠計數結果來觸發。在當前版本下,若你想要同時對時間和計數都做出響應,你只能自定義trigger

 

內部Trigger API在當前版本仍然處於測試階段,但如果你想要編寫自定義trigger,請檢出(check out)該代碼。Trigger.java

 

六、non-keyed windowing

你同樣可以在定義一個帶窗口transformation時忽略KeyBy()方法,該方式將使Flink無法並行處理各自不同key的窗口,本質上將transformation變成了一個非並行的Operation

 

警告:正如本文開始介紹中提到的,由於non-keyed windows不能分各自key獨立計算,所以有着無法在集群上分布式運行的缺陷,這將帶來一些性能上的影響。

 

一個帶non-keyed windowtransformation的基本結構如下代碼所示:

DataStream<T> input = ...;

input
  .
windowAll(<window assigner>)
  .<windowed transformation>(<window function>);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM