在前面,我們介紹了Flink的窗口概述以及WindowAssigner窗口指派器,接下來,我們繼續介紹Flink窗口函數、Trigger觸發器、Evictor清除器等的相關機制。
五、Window Functions 窗口函數
引用轉載自:https://zhuanlan.zhihu.com/p/102325190 (強烈推薦)
定義好窗口分配器之后,無限流上的數據已經被我們划分到了一個個窗口里,接下來,我們需要對每個窗口中的數據進行處理。這可以通過指定Window Function來實現,一旦系統確定了某個窗口已經准備好進行處理,該函數將會處理窗口中的每個元素。
Window Function通常有這幾種:ReduceFunction,AggregateFunction,FoldFunction以及ProcessWindowFunction、WindowFunction(舊版)。
窗口函數分為兩類,一種是增量聚合,如reduce和aggregate,一種是全量聚合,如process、apply
增量聚合:
窗口保存一份中間數據,每流入一個新元素,新元素與中間數據兩兩合一,生成新的中間數據,再保存到窗口中。reduce(ReduceFunction),aggregate(AggregateFunction),fold(FoldFunction)都是這種。
全量聚合:
窗口先緩存該窗口所有元素,等到觸發條件后對窗口內的全量元素執行計算。
process(ProcessWindowFunction)就是這種。
apply(WindowFunction) --- 不過1.3之后被棄用
增量聚合和全量聚合對比:
1、增量聚合執行非常高效,因為Flink可以在每個元素到達窗口時增量的聚合這些元素。
但是增量聚合缺少窗口 的meta元數據信息。
2、全量聚合執行效率很低,因為在調用函數之前Flink必須在內部緩存窗口中的所有元素。
但是ProcessWindowFunction持有一個窗口中包含的所有元素的Iterable對象,以及元素所屬窗口的附加meta信息。【可以實現對窗口內的數據進行排序等需求】
增量+全量聚合函數結合:(推薦,也常用)
我們可以將ProcessWindowFunction和ReduceFunction,AggregateFunction, 或者FoldFunction函數結合來緩解這個問題,從而可以讓窗口增量聚合以及獲取ProcessWindowFunction接收的窗口meta數據。
1、ReduceFunction
RedceFunction的算子是reduce(),使用reduce算子時,我們要重寫一個ReduceFunction。
ReduceFunction用於指明如何將輸入流中的兩個元素組合在一起來生成一個相同類型的輸出元素。Flink使用ReduceFunction增量地聚合窗口中的元素。
這里的reduce同我們之前的reduce算子原理是一樣的。
reduce/fold/aggregate算子計算原理:
reduce將stream中元素前兩個傳給輸入函數,產生一個新的return值,將新產生的return值與RDD中下一個元素(即第三個元素)組成兩個元素,再被傳給輸入函數,這樣遞歸運作,直到最后只有一個值為止。*/
fold就是在reduce的基礎上,增加了一個初始值,這個初始值會加入到每個分區的頭部。
因為fold分為分區內的匯總,和分區間的全局匯總,在局部匯總的每個子分區頭部都會加入這個初始值,在全局匯總的頭部也會加入這個初始值。
aggregate,比reduce和fold更加靈活,reduce和fold的分區內局部匯總和分區間的全局匯總的算法都是一致的,但是aggregate可以不同。
ReduceFunction的api源碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
應用案例:
case class StockPrice(symbol: String, price: Double)
val input: DataStream[StockPrice] = ...
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// reduce的返回類型必須和輸入類型StockPrice一致
val sum = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.reduce((s1, s2) => StockPrice(s1.symbol, s1.price + s2.price))
上面的代碼使用Lambda表達式對兩個元組進行操作,由於對symbol字段進行了keyBy,相同symbol的數據都分組到了一起,接着我們將price加和,返回的結果必須也是StockPrice類型,否則會報錯。
這里s1、s2、和返回的數據都是相同類型,都是StockPrice類型
上面是對price價格進行聚合,s1是之前計算的結果,s2是當前輸入的元素數據。
最初始狀態時:s1、s2是流里前面兩個元素數據。
后面計算時:s1是前面計算的結果值,保存為中間狀態,和當前輸入的元素s2,繼續組成兩個參數傳入到函數當中。
優缺點:
1、使用reduce的好處是窗口的狀態數據量非常小,實現一個ReduceFunction也相對比較簡單,可以使用Lambda表達式,也可以重寫函數。
2、缺點是能實現的功能非常有限,因為中間狀態數據的數據類型、輸入類型以及輸出類型三者必須一致,而且只保存了一個中間狀態數據,當我們想對整個窗口內的數據進行操作時,僅僅一個中間狀態數據是遠遠不夠的。
2、AggregateFunction
AggregateFunction也是一種增量計算窗口函數,也只保存了一個中間狀態數據
aggregate與reduce最大的不同就是,aggregate允許輸入類型(IN),累加器類型(ACC)以及輸出類型(OUT)可以不一樣。
AggregateFunction可以稱之為廣義上的ReduceFunction,它包含三種元素類型:輸入類型(IN),累加器類型(ACC)以及輸出類型(OUT)。AggregateFunction接口中有一個用於創建初始累加器、合並兩個累加器的值到一個累加器以及從累加器中提取輸出結果的方法。
查看源碼定義:
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
// 在一次新的aggregate發起時,創建一個新的Accumulator,Accumulator是我們所說的中間狀態數據,簡稱ACC
// 這個函數一般在初始化時調用
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
// 當一個新元素流入時,將新元素與狀態數據ACC合並,返回狀態數據ACC
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
// 將中間數據轉成結果數據返回
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
// 將兩個ACC合並
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
輸入類型是IN,輸出類型是OUT,中間狀態數據是ACC,這樣復雜的設計主要是為了解決輸入類型、中間狀態和輸出類型不一致的問題,同時ACC可以自定義,我們可以在ACC里構建我們想要的數據結構。比如我們要計算一個窗口內某個字段的平均值,那么ACC中要保存總和以及個數,下面是一個平均值的示例:
case class StockPrice(symbol: String, price: Double)
// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {
override def createAccumulator() = ("", 0, 0)
override def add(item: StockPrice, accumulator: (String, Double, Int)) =
(item.symbol, accumulator._2 + item.price, accumulator._3 + 1)
override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)
override def merge(a: (String, Double, Int), b: (String, Double, Int)) =
(a._1 ,a._2 + b._2, a._3 + b._3)
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.aggregate(new AverageAggregate)
這幾個函數的工作流程如下圖所示。在計算之前要創建一個新的ACC,這時ACC還沒有任何實際表示意義,當有新數據流入時,Flink會調用add方法,更新ACC,並返回最新的ACC,ACC是一個中間狀態數據。當有一些跨節點的ACC融合時,Flink會調用merge,生成新的ACC。當所有的ACC最后融合為一個ACC后,Flink調用getResult生成結果。
圖片來源:https://zhuanlan.zhihu.com/p/102325190
3、FoldFunction
FoldFunction指定如何將窗口的輸入元素與輸出類型的元素組合。對添加到窗口的每個元素和當前輸出值增量調用FoldFunction。
FoldFunction與ReduceFunction基本一致。區別在於FoldFunction需要預設一個初始值,reduce不用。
如下的示例將所有輸入的值追加到最初為空的String上。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
注意:fold()不能用於會話窗口或其他可合並的窗口
flink中已經Deprecated警告,且建議使用AggregateFunction代替。
4、ProcessWindowFunction
與前兩種方法不同,ProcessWindowFunction要對窗口內的全量數據都緩存。在Flink所有API中,process算子以及其對應的函數是最底層的實現,使用這些函數能夠訪問一些更加底層的數據,比如,直接操作狀態等。它在源碼中的定義如下:
/**
* IN 輸入類型
* OUT 輸出類型
* KEY keyBy中按照Key分組,Key的類型
* W 窗口的類型
*/
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
/**
* 對一個窗口內的元素進行處理,窗口內的元素緩存在Iterable<IN>,進行處理后輸出到Collector<OUT>中
* 我們可以輸出一到多個結果
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* 當窗口執行完畢被清理時,刪除各類狀態數據。
*/
public void clear(Context context) throws Exception {}
/**
* 一個窗口的上下文,包含窗口的一些元數據、狀態數據等。
*/
public abstract class Context implements java.io.Serializable {
// 返回當前正在處理的Window
public abstract W window();
// 返回當前Process Time
public abstract long currentProcessingTime();
// 返回當前Event Time對應的Watermark
public abstract long currentWatermark();
// 返回某個Key下的某個Window的狀態
public abstract KeyedStateStore windowState();
// 返回某個Key下的全局狀態
public abstract KeyedStateStore globalState();
// 遲到數據發送到其他位置
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
使用時,Flink將某個Key下某個窗口的所有元素都緩存在Iterable<IN>中,我們需要對其進行處理,然后用Collector<OUT>收集輸出。我們可以使用Context獲取窗口內更多的信息,包括時間、狀態、遲到數據發送位置等。
下面的代碼是一個ProcessWindowFunction的簡單應用,我們對價格出現的次數做了統計,選出出現次數最多的輸出出來。
case class StockPrice(symbol: String, price: Double)
class FrequencyProcessFunction extends ProcessWindowFunction[StockPrice, (String, Double), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
// 股票價格和該價格出現的次數
var countMap = scala.collection.mutable.Map[Double, Int]()
for(element <- elements) {
val count = countMap.getOrElse(element.price, 0)
countMap(element.price) = count + 1
}
// 按照出現次數從高到低排序
val sortedMap = countMap.toSeq.sortWith(_._2 > _._2)
// 選出出現次數最高的輸出到Collector
if (sortedMap.size > 0) {
out.collect((key, sortedMap(0)._1))
}
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val frequency = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(10))
.process(new FrequencyProcessFunction)
Context中有兩種狀態,一種是針對Key的全局狀態,它是跨多個窗口的,多個窗口都可以訪問;另一種是該Key下單窗口的狀態,單窗口的狀態只保存該窗口的數據,主要是針對process函數多次被調用的場景,比如處理遲到數據或自定義Trigger等場景。當使用單個窗口的狀態時,要在clear函數中清理狀態。
ProcessWindowFunction相比AggregateFunction和ReduceFunction的應用場景更廣,能解決的問題也更復雜。但ProcessWindowFunction需要將窗口中所有元素作為狀態存儲起來,這將占用大量的存儲資源,尤其是在數據量大窗口多的場景下,使用不慎可能導致整個程序宕機。比如,每天的數據在TB級,我們需要Slide為十分鍾Size為一小時的滑動窗口,這種設置會導致窗口數量很多,而且一個元素會被復制好多份分給每個所屬的窗口,這將帶來巨大的內存壓力。
5、ProcessWindowFunction與增量計算相結合(常用)
為了解決ProcessWindowFunction將整個窗口元素緩存起來占用大量資源的情況,flink提供了可以將ProcessWindowFunction和reduce和aggregate組合的操作。
即當元素到達窗口時進行增量計算,當窗口結束的時候,將增量計算結果發送給ProcessWindowFunction作為輸入再進行處理。ProcessWindowFunction會將增量結果進行處理輸出結果。該組合操作即可以增量計算窗口,同時也可以訪問窗口的一些元數據、狀態信息等。
可以在ProcessWindowFunction里計算最大值,最小值,獲取窗口啟動時間,結束時間等信息。
5.1、ReduceFunction + ProcessWindowFunction
reduce算子里傳入兩個參數,一個繼承ReduceFunction 接口,實現增量聚合操作。
一個繼承ProcessWindowFunction,以返回窗口中的最小事件和窗口的開始時間
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
5.2、AggregateFunction + ProcessWindowFunction (推薦)
在aggregate算子中傳入兩個參數,一個繼承AggregateFunction,實現增量聚合操作。
一個繼承ProcessWindowFunction,獲取窗口狀態信息,以及進行整體的計算。
示例:計算元素平均值,同時輸出key值與均值。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
5.3、FoldFunction+ ProcessWindowFunction
在fold()算子中傳入三個參數,
一個作為fold的初始值
一個繼承AggregateFunction,實現增量聚合操作。
一個繼承ProcessWindowFunction,獲取窗口狀態信息,
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
6、apply(WindowFunction) 舊版
窗口每觸發一次時,會調用一次apply方法,相當於是對窗口中的全量數據進行計算。
1、apply方法中,可以添加WindowFunction對象,會將該窗口中所有的數據先緩存,當時間到了一次性計算
* 2、需要設置4個類型,分別是:輸入類型,輸出類型,keyBy時key的類型(如果用字符串來划分key類型為Tuple,窗口類型
* 3、所有的計算都在apply中進行,可以通過window獲取窗口的信息,比如開始時間,結束時間
它跟process類似,在某些ProcessWindowFunction可以使用的地方,您也可以使用WindowFunction。這是ProcessWindowFunction的舊版本,提供較少的上下文信息,並且沒有某些高級功能,例如每個窗口的狀態。該接口將在某個時候被棄用。
apply用法:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
WindowFunction源碼定義:
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
使用案例:
package cn._51doit.flink.day09;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
* apply是在窗口內進行全量的聚合,浪費資源
*/
public class HotGoodsTopN {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(60000);
env.setParallelism(1);
//json字符串
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() {
@Override
public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception {
try {
MyBehavior behavior = JSON.parseObject(value, MyBehavior.class);
//輸出
out.collect(behavior);
} catch (Exception e) {
//e.printStackTrace();
//TODO 記錄出現異常的數據
}
}
});
//提取EventTime生成WaterMark
SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
@Override
public long extractTimestamp(MyBehavior element) {
return element.timestamp;
}
});
//按照指定的字段進行分組
KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type");
//窗口長度為10分組,一分鍾滑動一次
WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));
//SingleOutputStreamOperator<MyBehavior> sum = window.sum("counts");
SingleOutputStreamOperator<ItemViewCount> sum = window.apply(new WindowFunction<MyBehavior, ItemViewCount, Tuple, TimeWindow>() {
//當窗口觸發是,會調用一次apply方法,相當於是對窗口中的全量數據進行計算
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input, Collector<ItemViewCount> out) throws Exception {
//窗口的起始時間
long start = window.getStart();
//窗口的結束時間
long end = window.getEnd();
//獲取分組的key
String itemId = tuple.getField(0);
String type = tuple.getField(1);
int count = 0;
for (MyBehavior myBehavior : input) {
count++;
}
//輸出結果
out.collect(ItemViewCount.of(itemId, type, start, end, count++));
}
});
sum.print();
env.execute();
}
}
此處的計算是全量計算,效率不高,因為其要等到窗口數據攢足了才觸發定時器,執行apply方法,這個apply方法相當於對窗口中的全量數據進行計算。假設窗口一直不觸發,其會將數據緩存至窗口內存中,其實就是state中,窗口內部會有state,無需自己定義。窗口若是很長的話,緩存在內存中的數據就會很多。,解決辦法是,窗口來一條數據就進行一次累加計算,即增量計算(效率更高,內存中存的知識次數)
7、Using per-window state in ProcessWindowFunction
在ProcessWindowFunction中使用每個窗口狀態
ProcessWindowFunction與WindowFunction不同點在於使用ProcessWindowFunction不僅僅可以拿到窗口的院內數據信息,還可以獲取WindowState和GlobalState。
- WindowState - 表示窗口的狀態,該狀態值和窗口綁定的,一旦窗口消亡狀態消失。
- GlobalState - 表示窗口的狀態,該狀態值和Key綁定,可以累計多個窗口的值。
如果同一窗口會多次觸發(如event-time觸發器加上允許最大延遲時間,則有肯觸發多次計算),則此功能很有用。例如可以存儲每次窗口觸發的次數以及最新一次觸發的信息,為下一次窗口觸發提供邏輯處理信息。使用Per-window State數據時要及時清理狀態數據,可以覆寫,調用ProcessWindowFunction的clear()完成狀態數據的清理。
package com.baizhi.jsy.windowFunction
import java.text.SimpleDateFormat
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object FlinkWindowProcessTumblingWithProcessWindowFunctionState {
def main(args: Array[String]): Unit = {
//1.創建流計算執⾏環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.創建DataStream - 細化
val text = env.socketTextStream("Centos",9999)
//3.執⾏行行DataStream的轉換算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(t=>t._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new UserDefineProcessWindowFunction3)
.print()
//5.執⾏流計算任務
env.execute("Aggregate Window Stream WordCount")
}
}
class UserDefineProcessWindowFunction3 extends ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
val sdf = new SimpleDateFormat("HH;mm:ss")
var wvsd:ValueStateDescriptor[Int]=_
var gvsd:ValueStateDescriptor[Int]=_
override def open(parameters: Configuration): Unit = {
wvsd=new ValueStateDescriptor[Int]("ws",createTypeInformation[Int])
gvsd=new ValueStateDescriptor[Int]("gs",createTypeInformation[Int])
}
override def process(key: String,
context: Context,
elements: Iterable[(String, Int)],
out: Collector[(String, Int)]): Unit = {
val window = context.window//獲取窗口元數據
val start = sdf.format(window.getStart)
val end = sdf.format(window.getEnd)
val sum = elements.map(_._2).sum
var wvs:ValueState[Int]=context.windowState.getState(wvsd)
var gvs:ValueState[Int]=context.globalState.getState(gvsd)
wvs.update(wvs.value()+sum)
gvs.update(gvs.value()+sum)
println("Window Count\t"+wvs.value()+"\tGlobal Count\t"+gvs.value())
out.collect((key+"\t["+start+"---"+end+"]",sum))
}
}
六、Triggers觸發器
參考:https://blog.csdn.net/x950913/article/details/106203894/
1、概述:
觸發器(Trigger)決定了何時啟動Window Function來處理窗口中的數據以及何時將窗口內的數據清理。
增量計算窗口函數對每個新流入的數據直接進行聚合,Trigger決定了在窗口結束時將聚合結果發送出去;全量計算窗口函數需要將窗口內的元素緩存,Trigger決定了在窗口結束時對所有元素進行計算然后將結果發送出去。
每個窗口都有一個默認的Trigger,比如前文這些例子都是基於Event Time的時間窗口,當到達窗口的結束時間時,Trigger以及對應的計算被觸發,觸發窗口函數計算。如果我們有一些個性化的觸發條件,比如窗口中遇到某些特定的元素、元素總數達到一定數量或窗口中的元素到達時滿足某種特定的模式時,我們可以自定義一個Trigger。我們甚至可以在Trigger中定義一些提前計算的邏輯,比如在Event Time語義中,雖然Watermark還未到達,但是我們可以定義提前計算輸出的邏輯,以快速獲取計算結果,獲得更低的延遲。
觸發器決定窗口何時將數據交給窗口函數處理。每個窗口都有一個默認觸發器。如果默認觸發器不符合業務需要,也可以使用自定義的觸發器。
trigger接口有五個方法允許trigger對不同的事件做出反應:
- onElement() :窗口每收到一個元素調用該方法,返回結果決定是否觸發算子
- onEventTime(): 當注冊一個event-time 定時器時會被調用。根據注冊的事件事件觸發
- onProcessingTime(): 當注冊一個processing-time 定時器時被調用。根據注冊的處理時間定時器觸發
- onMerge(): 窗口合並時觸發。與狀態性觸發器相關,兩個窗口合並時,合並兩個觸發器的狀態,如使用session window時,窗口會進行合並,此時調用該方法。
- clear() :窗口關閉時觸發,用於做一些清理工作。
關於上述方法,需要注意兩件事:
1) 前三個函數通過返回TriggerResult來決定如何處理它們的調用事件。操作可以是以下操作之一:
- CONTINUE:什么都不做
- FIRE:觸發計算並將結果發送給下游,不清理窗口數據。
- PURGE:清理窗口數據但不執行計算。
- FIRE_AND_PURGE:觸發計算,發送結果然后清除窗口中的元素。
2) 這些方法中的任何一個都可以用於為將來的操作注冊processing-time 定時器或event-time定時器。
在繼續介紹Trigger的使用之前,我們可以先了解一下定時器(Timer)的使用方法。我們可以把Timer理解成一個鬧鍾,使用前先注冊未來一個時間,當時間到達時,就像鬧鍾會響一樣,程序會啟用一個回調函數,來執行某個時間相關的任務。對於自定義Trigger來說,我們需要考慮注冊時間的邏輯,當到達這個時間時,Flink會啟動Window Function,清理窗口數據。
2、觸發與清除
一旦一個觸發器認為一個窗口已經可以進行處理,它將觸發並返回FIRE或者FIRE_AND_PURGE。這意味着當前窗口馬上就要觸發計算,並將元素發送給計算方法。如一個帶有ProcessWindowFunction的窗口,當觸發器觸發fire后,所有元素都被傳遞給ProcessWindowFunction(如果有剔除器,則先經過剔除器)。
如果窗口的計算函數時ReduceFunction、AggregateFunction或FoldFunction,則只發出它們聚合的結果,因為在窗口內部已經由這些預聚合方法進行ji's。
觸發器觸發的方式有兩種:FIRE或者FIRE_AND_PURGE。如果是FIRE的話,將保留window中的內容,FIRE_AND_PURGE則會清除window的內容。默認情況下,觸發器使用的是FIRE。
注意:清除操作僅清除window的內容,但會保留窗口的元數據信息和觸發器狀態。
3、默認觸發器和自定義觸發器
WindowAssigner的默認觸發器適用於各種用例。例如,所有事件時間窗口分配程序都有一個event time trigger作為默認觸發器。一旦watermark 大於窗口的endtime,那么這個觸發器就會觸發。
PS:GlobalWindow默認的觸發器時NeverTrigger,該觸發器從不出發,所以在使用GlobalWindow時必須自定義觸發器。
注意:通過使用trigger()指定觸發器后,將覆蓋WindowAssigner的默認觸發器。例如,如果為TumblingEventTimeWindows指定CountTrigger,則不再根據時間進度而僅按count觸發窗口,兩者不同時生效。所以,如果想同時基於時間和計數觸發窗口,就必須編寫自定義觸發器。
flink的默認觸發器有四種:
- EventTimeTrigger:事件時間觸發器,根據watermark觸發。
- ProcessingTimeTrigger:處理時間觸發器,根據元素在被處理的那台機器的系統時間觸發。
- CountTrigger:計數觸發器,根據元素的個數觸發。
- PurgingTrigger :清除觸發器,將另一個觸發器作為參數,將其轉換為清除觸發器,即在原有觸發器的基礎上,添加清除窗口內容功能。
如果需要自定義觸發器的話,則需要實現Trigger抽象類 。
4、Trigger接口源碼
我們看一下Flink的Trigger源碼。
/**
* T為元素類型
* W為窗口
*/
public abstract class Trigger<T, W extends Window> implements Serializable {
/**
* 當某窗口增加一個元素時調用onElement方法,返回一個TriggerResult
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/**
* 當一個基於Processing Time的Timer觸發了FIRE時調用onProcessTime方法
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* 當一個基於Event Time的Timer觸發了FIRE時調用onEventTime方法
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* 如果這個Trigger支持狀態合並,則返回true
*/
public boolean canMerge() {
return false;
}
/**
* 當多個窗口被合並時調用onMerge
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* 當窗口數據被清理時,調用clear方法來清理所有的Trigger狀態數據
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception
/**
* 上下文,保存了時間、狀態、監控以及定時器
*/
public interface TriggerContext {
/**
* 返回當前Processing Time
*/
long getCurrentProcessingTime();
/**
* 返回MetricGroup
*/
MetricGroup getMetricGroup();
/**
* 返回當前Watermark時間
*/
long getCurrentWatermark();
/**
* 將某個time注冊為一個Timer,當系統時間到達time這個時間點時,onProcessingTime方法會被調用
*/
void registerProcessingTimeTimer(long time);
/**
* 將某個time注冊為一個Timer,當Watermark時間到達time這個時間點時,onEventTime方法會被調用
*/
void registerEventTimeTimer(long time);
/**
* 將注冊的Timer刪除
*/
void deleteProcessingTimeTimer(long time);
/**
* 將注冊的Timer刪除
*/
void deleteEventTimeTimer(long time);
/**
* 獲取該窗口Trigger下的狀態
*/
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
/**
* 將多個窗口下Trigger狀態合並
*/
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
5、應用案例:
案例一:
案例二:
接下來我們以一個提前計算的案例來解釋如何使用自定義的Trigger。在股票或任何交易場景中,我們比較關注價格急跌的情況,默認窗口長度是60秒,如果價格跌幅超過5%,則立即執行Window Function,如果價格跌幅在1%到5%之內,那么10秒后觸發Window Function。
class MyTrigger extends Trigger[StockPrice, TimeWindow] {
override def onElement(element: StockPrice,
time: Long,
window: TimeWindow,
triggerContext: Trigger.TriggerContext): TriggerResult = {
val lastPriceState: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPriceState", classOf[Double]))
// 設置返回默認值為CONTINUE
var triggerResult: TriggerResult = TriggerResult.CONTINUE
// 第一次使用lastPriceState時狀態是空的,需要先進行判斷
// 狀態數據由Java端生成,如果是空,返回一個null
// 如果直接使用Scala的Double,需要使用下面的方法判斷是否為空
if (Option(lastPriceState.value()).isDefined) {
if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.05) {
// 如果價格跌幅大於5%,直接FIRE_AND_PURGE
triggerResult = TriggerResult.FIRE_AND_PURGE
} else if ((lastPriceState.value() - element.price) > lastPriceState.value() * 0.01) {
val t = triggerContext.getCurrentProcessingTime + (10 * 1000 - (triggerContext.getCurrentProcessingTime % 10 * 1000))
// 給10秒后注冊一個Timer
triggerContext.registerProcessingTimeTimer(t)
}
}
lastPriceState.update(element.price)
triggerResult
}
// 我們不用EventTime,直接返回一個CONTINUE
override def onEventTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}
override def clear(window: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
val lastPrice: ValueState[Double] = triggerContext.getPartitionedState(new ValueStateDescriptor[Double]("lastPrice", classOf[Double]))
lastPrice.clear()
}
}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
.keyBy(s => s.symbol)
.timeWindow(Time.seconds(60))
.trigger(new MyTrigger)
.aggregate(new AverageAggregate)
注意:在自定義Trigger時,如果使用了狀態,一定要使用clear方法將狀態數據清理,否則隨着窗口越來越多,狀態數據會越積越多。
七、Evictors 驅逐器
Flink的窗口模型允許除了WindowAssigner和Trigger之外,還指定一個可選的Evictor,用於從窗口中移除元素。
Evictor作用在觸發器啟動之后、窗口函數作用之前或之后移出窗口中的元素。
api調用如下:
/**
* T為元素類型
* W為窗口
*/
public interface Evictor<T, W extends Window> extends Serializable {
/**
* 在Window Function前調用,即可以在窗口處理之前剔除數據
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* 在Window Function后調用
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Evictor的上下文
*/
interface EvictorContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
}
}
- victBefore():用於在窗口函數執行之前剔除元素。
- evictAfter():用於在窗口函數執行之后剔除元素。
窗口中所有的元素被放在Iterable<TimestampedValue<T>>中,我們可以實現自己的清除邏輯。對於增量計算如ReduceFunction和AggregateFunction,沒必要使用Evictor。
Flink提供了三種已實現的Evictor:
- CountEvictor:保存指定數量的元素,多余的元素按照從前往后的順序剔除
- DeltaEvictor:需要傳入一個DeltaFunction和一個threshold,使用DeltaFunction計算Window中最后一個元素與其余每個元素之間的增量(delta),丟棄增量大於或等於閾值(threshold)的元素
- TimeEvictor:對於給定的窗口,提供一個以毫秒為單位間隔的參數interval,找到最大的時間max_ts,然后刪除所有時間戳小於max_ts-interval的元素。
默認情況下:所有預定義的Evictor均會在窗口函數作用之前執行。
注意:
1、如果指定了剔除器,則預聚合不生效,因為在進行計算之前,每一條元素都會經過剔除器才會進入窗口。
2、Flink不能保證窗口中元素的順序。這意味着盡管剔除器即使從窗口的開頭移除元素,但這些元素不一定是最先到達或最后到達的元素。
3、默認情況下,Evictor都在窗口函數調用之前執行。
八、如何處理遲到的事件元素?
1、Allowed Lateness
使用事件時間(event-time)窗口時,可能會發生元素到達晚的情況,即本應該在上一個窗口處理的元素,由於延遲到達flink,而watermark已經超過窗口的endtime而啟動計算,導致這個元素沒有在上一個窗口中處理。
關於watermark,可以參考:https://blog.csdn.net/x950913/article/details/106246807
默認情況下,當watermark超過窗口的endtime時,將刪除延遲到達的元素。但是,Flink可以為窗口指定一個最大延遲時間。Allowed lateness指定watermark超過endtime多少時間后,再收到事件時間在該窗口內的元素時,會再次觸發窗口的計算,其默認值為0。watermark超過endtime后,會觸發一次計算,在允許延遲的時間范圍內到達的元素,仍然會添加到窗口中,且再次觸發計算。所以如果使用的是EventTimeTrigger觸發器,延遲但未丟棄的元素可能會導致窗口再次觸發。
默認情況下,允許的延遲設置為0。也就是說,在endtime后到達的元素將被刪除。
指定允許延遲時間,如下所示:
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
另外,當使用GlobalWindows窗口時,任何數據都不會被認為是延遲的,因為全局窗口的結束時間戳是Long的最大值.
2、將遲到元素從側輸出流輸出
可以將延遲到達的元素從側輸出流中輸出。
首先需要創建一個OutputTag用於接收延遲數據。然后,指定將窗口中的延遲數據發送到OutputTag中:
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
3、遲到元素處理注意事項
當指定允許的延遲大於0時,在watermark超過窗口的endtime后,仍會保留窗口及其內容。在這些情況下,當一個延遲但未被丟棄的元素到達時,它可能會再次觸發這個窗口的觸發器。此時這些觸發器的Fire被稱為late firings,因為它們是由遲到元素觸發的,而主Fire是窗口的第一次Fire。在會話窗口(session windows)的情況下,延遲觸發可能進一步導致窗口合並,因為它們可能“橋接”兩個預先存在的未合並窗口。
注意:延遲到達的元素所觸發的計算應被視為對先前計算的結果的更新,即watermark到達endtime后窗口會進行一次計算,之后延遲到達的元素會觸發新的計算來更新計算結果,所以數據流將包含同一計算的多個結果。根據應用場景不同,需要考慮是否需要消除重復數據。
九、窗口計算后還可以做什么?
使用窗口計算后得出的結果仍是一個DataStream,該DataStream中的元素不會保留有關窗口的信息。所以,如果需要保存窗口的元數據信息,就必須編寫代碼,在ProcessWindowFunction中將窗口的元數據信息與元素進行整合。輸出元素中的時間戳是唯一與窗口相關的信息,可以將其設置為窗口的最大允許延遲時間,即endtime-1,因為endtime之前的元素才屬於這個窗口,大於等於endtime的則屬於下一個窗口(event-time windows 和processing-time windows都是這樣的)。
在經過窗口函數處理后的元素總會包含一個時間戳,可以是event-time時間戳也可以是 processing-time時間戳。
對於 processing-time 窗口可能沒什么作用,但對於event-time窗口,配合watermark機制,可以將event-time屬於同一窗口的元素放在另一個窗口中處理,即連續的窗口操作,在下面有介紹。
watermarks對窗口的作用
在此稍微提及一點關於watermark和窗口的作用。
觸發器基於watermark有兩種處理方式:
- 當watermark大於等於窗口的endtime時,觸發窗口計算。
- 當watermark小於於窗口的endtime時,則將watermark轉發給下游操作(維護watermark為窗口中所有元素中的最大時間戳)
連續的窗口操作
如上所述,經過窗口函數計算后的結果仍然帶有時間戳,且與watermark搭配使用,就可以連續使用多個窗口操作。如在上游窗口計算之后,仍可以對得出的結果以不同的key、不同的function進行計算。如:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
上述的例子中,事件時間戳在0~5秒的元素(包含0秒,不含5秒),經過第一個窗口計算后,產生的結果傳入第二個窗口時也屬於0~5秒的窗口,即同窗口的元素在下游窗口中仍屬於同一窗口。如第一個窗口計算0~5秒內每個key的和,再在第二個窗口中取出0~5秒內key的和的TopK。
十、如何估計窗口存儲大小?
定義窗口的時間范圍時,可以定義一個很長的時間,甚至是幾天、幾周或幾月。因此窗口可以累積相當多的數據。在估計窗口計算的存儲量時,需要記住以下幾個規則:
Flink會為每個窗口的所有元素都創建一個副本,因此,滾動窗口對每個元素僅保留一個副本,因為一個元素恰好僅屬於一個窗口,除非它被擱置(dropped late)。相反,滑動窗口會為元素保存多個副本,因為一個元素可能屬於多個窗口,每個窗口都會保存一次。所以,如果使用滑動窗口,那么應該盡量避免窗口大小太大,而滑動步長小的情況,如窗口大小為1天,滑動步長為1秒。
ReduceFunction,AggregateFunction和FoldFunction可以大大減少存儲需求,因為他們會盡量早地對元素進行聚合,且每個窗口僅存儲一個值,而不是所有元素。相反,ProcessWindowFunction需要存儲每個元素。
使用剔除器Evictor會阻止所有預聚合操作,因為在算之前,必須將窗口的所有元素傳遞到剔除器。
參考引用:
官網:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
https://segmentfault.com/a/1190000022106275 (贊)
https://zhuanlan.zhihu.com/p/102325190 (爆贊)
https://blog.csdn.net/x950913/article/details/106203894/ (爆贊)


