-
總覽
-
Window 是flink處理無限流的核心,Windows將流拆分為有限大小的“桶”,我們可以在其上應用計算。
-
Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。
-
而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。
-
Flink 提供了非常完善的窗口機制。
-
在流處理應用中,數據是連續不斷的,因此我們不可能等到所有數據都到了才開始處理。
-
當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鍾內有多少用戶點擊了我們的網頁。
-
在這種情況下,我們必須定義一個窗口,用來收集最近一分鍾內的數據,並對這個窗口內的數據進行計算。
-
窗口可以是基於時間驅動的(Time Window,例如:每30秒鍾)
-
也可以是基於數據驅動的(Count Window,例如:每一百個元素)
-
同時基於不同事件驅動的窗口又可以分成以下幾類:
- 翻滾窗口 (Tumbling Window, 無重疊)
- 滑動窗口 (Sliding Window, 有重疊)
- 會話窗口 (Session Window, 活動間隙)
- 全局窗口 (略)
-
Flink要操作窗口,先得將StreamSource 轉成WindowedStream
Window操作 其作用 Window Keyed Streaming → WindowedStream 可以在已經分區的KeyedStream上定義Windows,即K,V格式的數據。 WindowAll DataStream → AllWindowedStream 對常規的DataStream上定義Window,即非K,V格式的數據 Window Apply WindowedStream → AllWindowedStream AllWindowedStream → DataStream 將函數應用於整個窗口中的數據 Window Reduce WindowedStream → DataStream 對窗口里的數據進行”reduce”減少聚合統計 Aggregations on windows WindowedStream → DataStream 對窗口里的數據進行聚合操作: sum(), max(), min()
-
-
Tumbling Window(翻滾窗口)
-
翻滾窗口能將數據流切分成不重疊的窗口,每一個事件只能屬於一個窗口
-
翻滾窗具有固定的尺寸,不重疊。
-
例圖:
-
代碼
package com.ronnie.flink.stream.window; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 翻滾窗口:窗口不可重疊 * 1、基於時間驅動 * 2、基於事件驅動 */ public class TumblingWindow { public static void main(String[] args) { //設置執行環境,類似spark中初始化sparkContext StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); // 基於時間驅動,每隔10s划分一個窗口 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10)); // 基於事件驅動, 每相隔3個事件(即三個相同key的數據), 划分一個窗口進行計算 // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3); // apply是窗口的應用函數,即apply里的函數將應用在此窗口的數據上。 timeWindow.apply(new MyTimeWindowFunction()).print(); // countWindow.apply(new MyCountWindowFunction()).print(); try { // 轉換算子都是lazy init的, 最后要顯式調用 執行程序 env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
-
-
基於時間驅動
-
場景1:我們需要統計每一分鍾中用戶購買的商品的總數,需要將用戶的行為事件按每一分鍾進行切分,這種切分被成為翻滾時間窗口(Tumbling Time Window)。
package com.shsxt.flink.stream.window; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for(Tuple2<String,Integer> tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :" + format.format(start) + " window_end :" + format.format(end) ); } }
-
-
基於事件驅動
-
場景2:當我們想要每100個用戶的購買行為作為驅動,那么每當窗口中填滿100個”相同”元素了,就會對窗口進行計算。
package com.ronnie.flink.stream.window; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> { @Override public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){ sum += tuple2.f1; } //無用的時間戳,默認值為: Long.MAX_VALUE,因為基於事件計數的情況下,不關心時間。 long maxTimestamp = window.maxTimestamp(); out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :" + maxTimestamp + "," + format.format(maxTimestamp) ); } }
-
-
-
Sliding Window(滑動窗口)
-
滑動窗口和翻滾窗口類似,區別在於:滑動窗口可以有重疊的部分。
-
在滑窗中,一個元素可以對應多個窗口。
-
例圖:
-
基於時間的滑動窗口
- 場景: 我們可以每30秒計算一次最近一分鍾用戶購買的商品總數。
-
基於事件的滑動窗口
- 場景: 每10個 “相同”元素計算一次最近100個元素的總和.
-
代碼:
package com.ronnie.flink.stream.window; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; /** * 滑動窗口:窗口可重疊 * 1、基於時間驅動 * 2、基於事件驅動 */ public class SlidingWindow { public static void main(String[] args) { // 設置執行環境, 類似spark中初始化SparkContext StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); //基於時間驅動,每隔5s計算一下最近10s的數據 // WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5)); //基於事件驅動,每隔2個事件,觸發一次計算,本次窗口的大小為3,代表窗口里的每種事件最多為3個 WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3, 2); // timeWindow.sum(1).print(); countWindow.sum(1).print(); // timeWindow.apply(new MyTimeWindowFunction()).print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
-
-
Session Window(會話窗口)
-
會話窗口不重疊,沒有固定的開始和結束時間
-
與翻滾窗口和滑動窗口相反, 當會話窗口在一段時間內沒有接收到元素時會關閉會話窗口。
-
后續的元素將會被分配給新的會話窗口
-
例圖:
-
舉例:
- 計算每個用戶在活躍期間總共購買的商品數量,如果用戶30秒沒有活動則視為會話斷開。
-
代碼:
package com.ronnie.flink.stream.window; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat; import java.util.Random; public class SessionWindow { public static void main(String[] args) { // 設置執行環境, 類似spark中初始化sparkContext StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random); } }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); //如果連續10s內,沒有數據進來,則會話窗口斷開。 WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); // window.sum(1).print(); window.apply(new MyTimeWindowFunction()).print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } } }
-