1. ProcessFunction
ProcessFunction是一個低級的流處理操作,可以訪問所有(非循環)流應用程序的基本構建塊:
- event(流元素)
- state(容錯,一致性,只能在Keyed流中使用)
- timers(事件時間和處理時間,只能在keyed流中使用)
ProcessFunction可以被認為是增加了keyed state和timers功能的FlatMapFunction。ProcesseFunction可以通過RuntimeContext訪問Flink中的Keyed State,通過processElement方法中的Context實例訪問流元素的時間戳,以及timerServer(注冊定時器),如果watermark大於等於注冊定時器的時間,就會調用onTimer方法(此處相當於一個回調函數),在調用期間,所有state的范圍再次限定在創建定時器的key上,從而允許定時器操作keyed state。
注意:如果我們想要使用keyed state和timers(定時器),我們必須在一個keyed stream上應用ProcessFunction,如下所示
stream.keyBy(...).process(new MyProcessFunction())
案例1:使用ProcessFunction注冊定時器
此處要實現的功能就是使用定時器定時輸出一些數據,不能使用窗口函數,數據的類型為:時間戳,單詞(123422,hello)
ProcessFunctionWithTimerDemo

package cn._51doit.flink.day09; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class ProcessFunctionWithTimerDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("feng05", 8888); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 得到watermark,並沒有對原始數據進行處理 SingleOutputStreamOperator<String> lineWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String element) { return Long.parseLong(element.split(",")[0]); } }); // 處理數據,獲取指定字段 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { String[] fields = value.split(","); return Tuple2.of(fields[1], 1); } }); //調用keyBy進行分組 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); // 沒有划分窗口,直接調用底層的process方法 keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String,Integer>>() { private transient ListState<Tuple2<String, Integer>> bufferState; // 定義狀態描述器 @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<>( "list-state", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}) ); bufferState = getRuntimeContext().getListState(listStateDescriptor); } // 不划分窗口的話,該方法是來一條數據處理一條數據,這樣輸出端的壓力會很大 @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { //out.collect(value); bufferState.add(value); //獲取當前的event time Long timestamp = ctx.timestamp(); System.out.println("current event time is : " + timestamp); //注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的實際,就會觸發onTimer方法 ctx.timerService().registerEventTimeTimer(timestamp+10000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception { Iterable<Tuple2<String, Integer>> iterable = bufferState.get(); for (Tuple2<String, Integer> tp : iterable) { out.collect(tp); } } }).print(); env.execute(); } }
由於定時器中的時間為timestamp+10000,當輸入分別輸入1000,spark;11000,spark(該條數據觸發定時器,調用onTimer()方法),輸出如下結果
同時其還會產生一個新的定時器:21000觸發的定時器
注意
1.processElement()方法處理數據時一條一條進行處理的
2. 該案例實現了滾動窗口的功能,而滾動窗口的底層實現原理與此相似:processElement()方法+onTimer()方法
案例二:使用定時器實現類似滾動窗口的功能
ProcessFunctionWithTimerDemo2

package cn._51doit.flink.day09; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 只有keyedStream在使用ProcessFunction時可以使用State和Timer定時器 */ public class ProcessFunctionWithTimerDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1000,hello DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String element) { return Long.parseLong(element.split(",")[0]); } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String word = line.split(",")[1]; return Tuple2.of(word, 1); } }); //調用keyBy進行分組 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //沒有划分窗口,直接調用底層的process方法 keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() { private transient ListState<Tuple2<String, Integer>> bufferState; @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>( "list-state", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}) ); bufferState = getRuntimeContext().getListState(listStateDescriptor); } @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { //out.collect(value); bufferState.add(value); //獲取當前的event time Long timestamp = ctx.timestamp(); //10:14:13 -> 10:15:00 //輸入的時間 [10:14:00, 10:14:59) 注冊的定時器都是 10:15:00 System.out.println("current event time is : " + timestamp); //注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的時間,就會觸發onTimer方法 long timer = timestamp - timestamp % 60000 + 60000; System.out.println("next timer is: " + timer); ctx.timerService().registerEventTimeTimer(timer); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception { Iterable<Tuple2<String, Integer>> iterable = bufferState.get(); for (Tuple2<String, Integer> tp : iterable) { out.collect(tp); } //請求當前ListState中的數據 bufferState.clear(); } }).print(); env.execute(); } }
注意的代碼
//注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的時間,就會觸發onTimer方法 long timer = timestamp - timestamp % 60000 + 60000; System.out.println("next timer is: " + timer); ctx.timerService().registerEventTimeTimer(timer);
改變:使用Process Time
ProcessFunctionWithTimerDemo3

package cn._51doit.flink.day09; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; /** * 只有keyedStream在使用ProcessFunction時可以使用State和Timer定時器 * * Processing Time類型的定時器 * */ public class ProcessFunctionWithTimerDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //hello DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return Tuple2.of(word, 1); } }); //調用keyBy進行分組 KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //沒有划分窗口,直接調用底層的process方法 keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() { private transient ListState<Tuple2<String, Integer>> bufferState; @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>( "list-state", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}) ); bufferState = getRuntimeContext().getListState(listStateDescriptor); } @Override public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { bufferState.add(value); //獲取當前的processing time long currentProcessingTime = ctx.timerService().currentProcessingTime(); //10:14:13 -> 10:15:00 //輸入的時間 [10:14:00, 10:14:59) 注冊的定時器都是 10:15:00 System.out.println("current processing time is : " + currentProcessingTime); //注冊定時器,如果注冊的是ProcessingTime類型的定時器,當SubTask所在機器的ProcessingTime大於等於注冊定時器的時間,就會觸發onTimer方法 long timer = currentProcessingTime - currentProcessingTime % 60000 + 60000; System.out.println("next timer is: " + timer); //注冊ProcessingTime的定時器 ctx.timerService().registerProcessingTimeTimer(timer); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception { Iterable<Tuple2<String, Integer>> iterable = bufferState.get(); for (Tuple2<String, Integer> tp : iterable) { out.collect(tp); } //請求當前ListState中的數據 bufferState.clear(); } }).print(); env.execute(); } }
2. apply方法對窗口進行全量聚合
窗口每觸發一次時,會調用一次apply方法,相當於是對窗口中的全量數據進行計算

package cn._51doit.flink.day09; import com.alibaba.fastjson.JSON; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * apply是在窗口內進行全量的聚合,浪費資源 */ public class HotGoodsTopN { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(60000); env.setParallelism(1); //json字符串 DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() { @Override public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception { try { MyBehavior behavior = JSON.parseObject(value, MyBehavior.class); //輸出 out.collect(behavior); } catch (Exception e) { //e.printStackTrace(); //TODO 記錄出現異常的數據 } } }); //提取EventTime生成WaterMark SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) { @Override public long extractTimestamp(MyBehavior element) { return element.timestamp; } }); //按照指定的字段進行分組 KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type"); //窗口長度為10分組,一分鍾滑動一次 WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))); //SingleOutputStreamOperator<MyBehavior> sum = window.sum("counts"); SingleOutputStreamOperator<ItemViewCount> sum = window.apply(new WindowFunction<MyBehavior, ItemViewCount, Tuple, TimeWindow>() { //當窗口觸發是,會調用一次apply方法,相當於是對窗口中的全量數據進行計算 @Override public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input, Collector<ItemViewCount> out) throws Exception { //窗口的起始時間 long start = window.getStart(); //窗口的結束時間 long end = window.getEnd(); //獲取分組的key String itemId = tuple.getField(0); String type = tuple.getField(1); int count = 0; for (MyBehavior myBehavior : input) { count++; } //輸出結果 out.collect(ItemViewCount.of(itemId, type, start, end, count++)); } }); sum.print(); env.execute(); } }
此處的計算是全量計算,效率不高,因為其要等到窗口數據攢足了才觸發定時器,執行apply方法,這個apply方法相當於對窗口中的全量數據進行計算。假設窗口一直不觸發,其會將數據緩存至窗口內存中,其實就是state中,窗口內部會有state,無需自己定義。窗口若是很長的話,緩存在內存中的數據就會很多。,解決辦法是,窗口來一條數據就進行一次累加計算,即增量計算(效率更高,內存中存的知識次數)
3. 使用aggregate方法實現增量聚合
HotGoodsTopNAdv

package cn._51doit.flink.day09; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Comparator; import java.util.List; /** * 在窗口內增量聚合,效率更高 */ public class HotGoodsTopNAdv { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(60000); env.setParallelism(1); //json字符串 DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() { @Override public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception { try { MyBehavior behavior = JSON.parseObject(value, MyBehavior.class); //輸出 out.collect(behavior); } catch (Exception e) { //e.printStackTrace(); //TODO 記錄出現異常的數據 } } }); //提取EventTime生成WaterMark SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) { @Override public long extractTimestamp(MyBehavior element) { return element.timestamp; } }); //按照指定的字段進行分組 KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type"); //窗口長度為10分組,一分鍾滑動一次 WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))); //SingleOutputStreamOperator<MyBehavior> counts = window.sum("counts"); //自定義窗口聚合函數 SingleOutputStreamOperator<ItemViewCount> aggDataStream = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction()); //按照窗口的start、end進行分組,將窗口相同的數據進行排序 aggDataStream.keyBy("type", "windowStart", "windowEnd") .process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() { private transient ValueState<List<ItemViewCount>> valueState; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<List<ItemViewCount>> stateDescriptor = new ValueStateDescriptor<List<ItemViewCount>>( "list-state", TypeInformation.of(new TypeHint<List<ItemViewCount>>() {}) ); valueState = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(ItemViewCount value, Context ctx, Collector<List<ItemViewCount>> out) throws Exception { //將數據添加到State中緩存 List<ItemViewCount> buffer = valueState.value(); if(buffer == null) { buffer = new ArrayList<>(); } buffer.add(value); valueState.update(buffer); //注冊定時器 ctx.timerService().registerEventTimeTimer(value.windowEnd + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception { //將ValueState中的數據取出來 List<ItemViewCount> buffer = valueState.value(); //按照次數降序排序 buffer.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return -(int)(o1.viewCount - o2.viewCount); } }); //清空State valueState.update(null); out.collect(buffer); } }).print(); //打印結果 env.execute(); } //三個泛型: //第一個:輸入的數據類型 //第二個:計數/累加器的類型 //第三個:輸出的數據類型 public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> { //初始化一個計數器 @Override public Long createAccumulator() { return 0L; } //每輸入一條數據就調用一次add方法 @Override public Long add(MyBehavior value, Long accumulator) { return accumulator + value.counts; } @Override public Long getResult(Long accumulator) { return accumulator; } //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法 @Override public Long merge(Long a, Long b) { return null; } } //傳入4個泛型 //第一個:輸入的數據類型(Long類型的次數) //第二個:輸出的數據類型(ItemViewCount) //第三個:分組的key(分組的字段) //第四個:窗口對象(起始時間、結束時間) public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception { //輸入的Key String itemId = tuple.getField(0); String type = tuple.getField(1); //窗口的起始時間 long start = window.getStart(); //窗口結束時間 long end = window.getEnd(); //窗口集合的結果 Long count = input.iterator().next(); //輸出數據 out.collect(ItemViewCount.of(itemId, type, start, end, count)); } } }
涉及的重要知識點:
- 自定義聚合函數:
//三個泛型: //第一個:輸入的數據類型 //第二個:計數/累加器的類型 //第三個:輸出的數據類型 public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> { //初始化一個計數器 @Override public Long createAccumulator() { return 0L; } //每輸入一條數據就調用一次add方法 @Override public Long add(MyBehavior value, Long accumulator) { return accumulator + value.counts; } @Override public Long getResult(Long accumulator) { return accumulator; } //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法 @Override public Long merge(Long a, Long b) { return null; } }
- 自定義WindowFunction
//傳入4個泛型 //第一個:輸入的數據類型(Long類型的次數) //第二個:輸出的數據類型(ItemViewCount) //第三個:分組的key(分組的字段) //第四個:窗口對象(起始時間、結束時間) public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception { //輸入的Key String itemId = tuple.getField(0); String type = tuple.getField(1); //窗口的起始時間 long start = window.getStart(); //窗口結束時間 long end = window.getEnd(); //窗口集合的結果 Long count = input.iterator().next(); //輸出數據 out.collect(ItemViewCount.of(itemId, type, start, end, count)); } }
4.使用ProcessFunction結合定時器實現排序

package cn._51doit.flink.day09; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Comparator; import java.util.List; /** * 在窗口內增量聚合,效率更高 */ public class HotGoodsTopNAdv { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(60000); env.setParallelism(1); //json字符串 DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() { @Override public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception { try { MyBehavior behavior = JSON.parseObject(value, MyBehavior.class); //輸出 out.collect(behavior); } catch (Exception e) { //e.printStackTrace(); //TODO 記錄出現異常的數據 } } }); //提取EventTime生成WaterMark SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) { @Override public long extractTimestamp(MyBehavior element) { return element.timestamp; } }); //按照指定的字段進行分組 KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type"); //窗口長度為10分組,一分鍾滑動一次 WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1))); //SingleOutputStreamOperator<MyBehavior> counts = window.sum("counts"); //自定義窗口聚合函數 SingleOutputStreamOperator<ItemViewCount> aggDataStream = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction()); //按照窗口的start、end進行分組,將窗口相同的數據進行排序 aggDataStream.keyBy("type", "windowStart", "windowEnd") .process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() { private transient ValueState<List<ItemViewCount>> valueState; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<List<ItemViewCount>> stateDescriptor = new ValueStateDescriptor<List<ItemViewCount>>( "list-state", TypeInformation.of(new TypeHint<List<ItemViewCount>>() {}) ); valueState = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(ItemViewCount value, Context ctx, Collector<List<ItemViewCount>> out) throws Exception { //將數據添加到State中緩存 List<ItemViewCount> buffer = valueState.value(); if(buffer == null) { buffer = new ArrayList<>(); } buffer.add(value); valueState.update(buffer); //注冊定時器 ctx.timerService().registerEventTimeTimer(value.windowEnd + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception { //將ValueState中的數據取出來 List<ItemViewCount> buffer = valueState.value(); //按照次數降序排序 buffer.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return -(int)(o1.viewCount - o2.viewCount); } }); //清空State valueState.update(null); out.collect(buffer); } }).print(); //打印結果 env.execute(); } //三個泛型: //第一個:輸入的數據類型 //第二個:計數/累加器的類型 //第三個:輸出的數據類型 public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> { //初始化一個計數器 @Override public Long createAccumulator() { return 0L; } //每輸入一條數據就調用一次add方法 @Override public Long add(MyBehavior value, Long accumulator) { return accumulator + value.counts; } @Override public Long getResult(Long accumulator) { return accumulator; } //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法 @Override public Long merge(Long a, Long b) { return null; } } //傳入4個泛型 //第一個:輸入的數據類型(Long類型的次數) //第二個:輸出的數據類型(ItemViewCount) //第三個:分組的key(分組的字段) //第四個:窗口對象(起始時間、結束時間) public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception { //輸入的Key String itemId = tuple.getField(0); String type = tuple.getField(1); //窗口的起始時間 long start = window.getStart(); //窗口結束時間 long end = window.getEnd(); //窗口集合的結果 Long count = input.iterator().next(); //輸出數據 out.collect(ItemViewCount.of(itemId, type, start, end, count)); } } }