DataStream:
DataStream
是 Flink 流處理 API 中最核心的數據結構。它代表了一個運行在多個分區上的並行流。一個DataStream
可以從StreamExecutionEnvironment
通過env.addSource(SourceFunction)
獲得。- DataStream 上的轉換操作都是逐條的,比如
map()
,flatMap()
,filter()
。DataStream 也可以執行rebalance
(再平衡,用來減輕數據傾斜)和broadcaseted
(廣播)等分區轉換。 如上圖的執行圖所示,DataStream 各個算子會並行運行,算子之間是數據流分區。如 Source 的第一個並行實例(S1)和 flatMap() 的第一個並行實例(m1)之間就是一個數據流分區。而在 flatMap() 和 map() 之間由於加了 rebalance(),它們之間的數據流分區就有3個子分區(m1的數據流向3個map()實例)。
KeyedStream:
KeyedStream
用來表示根據指定的key進行分組的數據流。KeyedStream
可以通過調用DataStream.keyBy()
來獲得。而在KeyedStream
上進行任何transformation都將轉變回DataStream
。在實現中,KeyedStream
是把key的信息寫入到了transformation中。每條記錄只能訪問所屬key的狀態,其上的聚合函數可以方便地操作和保存對應key的狀態。
WindowedStream & AllWindowedStream:
WindowedStream
代表了根據key分組,並且基於WindowAssigner
切分窗口的數據流。所以WindowedStream
都是從KeyedStream
衍生而來的。而在WindowedStream
上進行任何transformation也都將轉變回DataStream
。DataStream[MyType] stream = ... WindowedDataStream[MyType] windowed = stream .keyBy("userId") .window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data DataStream[ResultType] result = windowed.reduce(myReducer)
Flink 的窗口實現中會將到達的數據緩存在對應的窗口buffer中(一個數據可能會對應多個窗口)。當到達窗口發送的條件時(由Trigger控制),Flink 會對整個窗口中的數據進行處理。Flink 在聚合類窗口有一定的優化,即不會保存窗口中的所有值,而是每到一個元素執行一次聚合函數,最終只保存一份數據即可。
- 在key分組的流上進行窗口切分是比較常用的場景,也能夠很好地並行化(不同的key上的窗口聚合可以分配到不同的task去處理)。不過當我們需要在普通流上進行窗口操作時,就要用到
AllWindowedStream
。AllWindowedStream
是直接在DataStream
上進行windowAll(...)
操作。AllWindowedStream 的實現是基於 WindowedStream 的。Flink 不推薦使用AllWindowedStream
,因為在普通流上進行窗口操作,就勢必需要將所有分區的流都匯集到單個的Task中,而這個單個的Task很顯然就會成為整個Job的瓶頸。
JoinedStreams & CoGroupedStreams:
- co-group 側重的是group,是對同一個key上的兩組集合進行操作,而 join 側重的是pair,是對同一個key上的每對元素進行操作, join 只是 co-group 的一個特例。
- JoinedStreams 和 CoGroupedStreams 是基於 Window 上實現的,所以 CoGroupedStreams 最終又調用了 WindowedStream 來實現。
DataStream[MyType] firstInput = ... DataStream[AnotherType] secondInput = ... DataStream[(MyType, AnotherType)] result = firstInput.join(secondInput) .where("userId").equalTo("id") .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...})
雙流上的數據在同一個key的會被分別分配到同一個window窗口的左右兩個籃子里,當window結束的時候,會對左右籃子進行笛卡爾積從而得到每一對pair,對每一對pair應用 JoinFunction。
ConnectedStreams:
- 在 DataStream 上有一個 union 的轉換
dataStream.union(otherStream1, otherStream2, ...)
,用來合並多個流,新的流會包含所有流中的數據。union 有一個限制,就是所有合並的流的類型必須是一致的。 - union 有一個限制,就是所有合並的流的類型必須是一致的。
ConnectedStreams
提供了和 union 類似的功能,用來連接兩個流,但是與 union 轉換有以下幾個區別:- ConnectedStreams 只能連接兩個流,而 union 可以連接多個流。
- ConnectedStreams 連接的兩個流類型可以不一致,而 union 連接的流的類型必須一致。
- ConnectedStreams 會對兩個流的數據應用不同的處理方法,並且雙流之間可以共享狀態。這在第一個流的輸入會影響第二個流時, 會非常有用。
- 如下 ConnectedStreams 的樣例,連接
input
和other
流,並在input
流上應用map1
方法,在other
上應用map2
方法,雙流可以共享狀態(比如計數)。DataStream[MyType] input = ... DataStream[AnotherType] other = ... ConnectedStreams[MyType, AnotherType] connected = input.connect(other) DataStream[ResultType] result = connected.map(new CoMapFunction[MyType, AnotherType, ResultType]() { override def map1(value: MyType): ResultType = { ... } override def map2(value: AnotherType): ResultType = { ... } })
當並行度為2時: