正文前先來一波福利推薦:
福利一:
百萬年薪架構師視頻,該視頻可以學到很多東西,是本人花錢買的VIP課程,學習消化了一年,為了支持一下女朋友公眾號也方便大家學習,共享給大家。
福利二:
畢業答辯以及工作上各種答辯,平時積累了不少精品PPT,現在共享給大家,大大小小加起來有幾千套,總有適合你的一款,很多是網上是下載不到。
獲取方式:
微信關注 精品3分鍾 ,id為 jingpin3mins,關注后回復 百萬年薪架構師 ,精品收藏PPT 獲取雲盤鏈接,謝謝大家支持!

------------------------正文開始---------------------------
Flink Window機制范例實錄:
什么是Window?有哪些用途?
1、window又可以分為基於時間(Time-based)的window
2、基於數量(Count-based)的window。
Flink DataStream API提供了Time和Count的window,同時增加了基於Session的window。
同時,由於某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。
下面,主要介紹Time-Based window以及Count-Based window,以及自定義的window操作,Session-Based Window操作將會在后續的文章中講到。
1、Time-Based Window
細分:基於時間的window又分為:
增量聚合;全量聚合。
--------------------------------增量聚合-------------------------------:

類似於 Flink Sql中的 group window,計算結果不斷的更新;
------------------------------------------------------------------------------
代碼示例:
1.1、Tumbling window(翻滾)
此處的window要在keyed Stream上應用window操作,當輸入1個參數時,代表Tumbling window操作,每分鍾統計一次,此處用scala語言實現:
增量聚合代碼---- 求和操作:
//todo 獲得數據源后進行算子操作 DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId") //以設備id進行分組 .timeWindow(Time.minutes(60)) //指定時間窗口大小為5分鍾,指定時間間隔為5分鍾 .aggregate(new CountAgg(), new WindowResultFunction()); windowedData.print();
CountAgg自定義的函數,需要實現 AggregateFunction函數
public class CountAgg implements AggregateFunction<StartupInfoData, Long, Long> { @Override public Long createAccumulator() { //初始化算子 return 0L; } @Override public Long add(StartupInfoData startupInfoData, Long acc) { //傳入一個入參后,做累加操作,將算子加1 return acc + 1; } @Override public Long getResult(Long acc) { //最輸出merge產生的結果 return acc; } @Override public Long merge(Long acc1, Long acc2) { //對算子進行每一個的累和 return acc1 + acc2; } }
輸出函數格式:
public class WindowResultFunction implements WindowFunction<Long, StartAppCount, Tuple, TimeWindow> { @Override public void apply( Tuple key, // 窗口的主鍵,即 appId TimeWindow window, // 窗口 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值 Collector<StartAppCount> collector // 輸出類型為 StartAppCount ) throws Exception { String appId = ((Tuple1<String>) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(StartAppCount.of(appId, window.getEnd(), count)); }
自定義輸出類的類格式:
public class StartAppCount { public String appId; // 商品ID public long windowEnd; // 窗口結束時間戳 public long count; // 商品的點擊量 public static StartAppCount of (String appId, long windowEnd, long count) { StartAppCount result = new StartAppCount(); result.appId = appId; result.windowEnd = windowEnd; result.count = count; return result; } @Override public String toString() { return "WordWithCount{" + "appId='" + appId + '\'' + ", count=" + count + '}'; } }
增量聚合代碼---- 求平均值操作:
public class AverageAggregate implements AggregateFunction<Tuple2<String,Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> acc) { //可以理解為緩存的中間值 return new Tuple2<>(acc.f0 + value.f1, acc.f1 + 1L); //傳入的值加到acc的第一個值得到傳入值, 第二個值為個數 } @Override public Double getResult(Tuple2<Long, Long> acc) { return (double)acc.f0 / acc.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) { //進行累和合並 return new Tuple2<>(acc1.f0+acc2.f0, acc1.f1+acc2.f1); } }
使用sum進行求和的代碼:
DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒 .sum("count");//在這里使用sum或者reduce都可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把數據打印到控制台並且設置並行度 windowCounts.print().setParallelism(1);
使用reduce進行求和的方法:
DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒 //.sum("count");//在這里使用sum或者reduce都可以 .reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } });
--------------------------------全量的時間窗口操作-------------------------------:

代碼示例:
public class MyprocessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process(String s, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<String> out) throws Exception { long count = 0; for(Tuple2<String,Long> in : iterable) { count++; } out.collect("Window: " + context.window() + "count: " + count); } }
1.2、Sliding window(滑動)
//todo 獲得數據源后進行算子操作 DataStream<StartAppCount> windowedData = startupInfoData.keyBy("appId") //以設備id進行分組 .timeWindow(Time.minutes(60), Time.seconds(5)) //指定時間窗口大小為5分鍾,指定時間間隔為5分鍾 .aggregate(new CountAgg(), new WindowResultFunction()); windowedData.print();
2、Count-Based Window
2.1、Tumbling Window (滾動計數窗口)
和Time-Based一樣,Count-based window同樣支持翻滾與滑動窗口,即在Keyed Stream上,統計每100個元素的數量之和
public class FlinkCountWindowDemo { public static void main(String[] args) throws Exception
{ final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); final int windowSize = params.getInt("window", 100); // read source data DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource()); // calculate DataStream<Tuple2<String, String>> outStream = inStream .keyBy(0) .countWindow(windowSize) .reduce( new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1); } } ); outStream.print(); env.execute("WindowWordCount"); } }
2.2、Sliding Window
盜用 Flink 原理與實現:Window 機制 中的一張圖,假設有一個滑動計數窗口,每2個元素計算一次最近4個元素的總和,那么窗口工作示意圖如下所示:

代碼示例:
public class FlinkCountWindowDemo { public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); final int windowSize = params.getInt("window", 3); final int slideSize = params.getInt("slide", 2); // read source data DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource()); // calculate DataStream<Tuple2<String, String>> outStream = inStream .keyBy(0) .countWindow(windowSize, slideSize) .reduce( new ReduceFunction<Tuple2<String, String>>() { @Override public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1); } } ); outStream.print(); env.execute("WindowWordCount"); } }
3、Advanced Window(自定義window)
自定義的Window需要指定3個function。
3.1、Window Assigner:負責將元素分配到不同的window。
WindowAPI提供了自定義的WindowAssigner接口,我們可以實現WindowAssigner的public abstract Collection<W> assignWindows(T element, long timestamp)方法。同時,對於基於Count的window而言,默認采用了GlobalWindow的window assigner,例如:keyValue.window(GlobalWindows.create())
