1.ReduceFunction
增量聚合,輸入輸出元素類型相同。
2.AggregateFunction
增量聚合,輸入輸出元素類型可以不相同。
3.ProcessWindowFunction
一些業務場景,我們需要收集窗口內所有的數據進行計算,例如計算窗口數據的中位數,或者計算窗口數據中出現頻率最高的值。這樣的需求,使用ReduceFunction和AggregateFunction就無法實現了。這個時候就需要ProcessWindowFunction了。
process()方法接受的參數為:
window的key,
Iterable迭代器包含窗口的所有元素,
Collector用於輸出結果流。
Context參數和別的process方法一樣。而ProcessWindowFunction的Context對象還可以訪問window的元數據(窗口開始和結束時間),當前處理時間和水位線,per-window state和per-key global state,side outputs。
- per-window state: 用於保存一些信息,這些信息可以被process()訪問,只要process所處理的元素屬於這個窗口。
- per-key global state: 同一個key,也就是在一條KeyedStream上,不同的window可以訪問per-key global state保存的值。
4.結合使用
ReduceFunction/AggregateFunction和ProcessWindowFunction結合使用,分配到某個窗口的元素將被提前聚合,而當窗口的trigger觸發時,也就是窗口收集完數據關閉時,將會把聚合結果發送到ProcessWindowFunction中,這時Iterable參數將會只有一個值,就是前面聚合的值。
input
.keyBy(...)
.timeWindow(...)
.reduce(
incrAggregator: ReduceFunction[IN],
function: ProcessWindowFunction[IN, OUT, K, W])
input
.keyBy(...)
.timeWindow(...)
.aggregate(
incrAggregator: AggregateFunction[IN, ACC, V],
windowFunction: ProcessWindowFunction[V, OUT, K, W])
結合使用:
- 可將
ProcessWindowFunction
與增量聚合函數ReduceFunction
、AggregateFunction
結合。 - 元素到達窗口時增量聚合,當窗口關閉時對增量聚合的結果用
ProcessWindowFunction
再進行全量聚合。 - 既可以增量聚合,也可以訪問窗口的元數據信息(如開始結束時間、狀態等)。