flink-----實時項目---day05-------1. ProcessFunction 2. apply對窗口進行全量聚合 3使用aggregate方法實現增量聚合 4.使用ProcessFunction結合定時器實現排序


1. ProcessFunction

  ProcessFunction是一個低級的流處理操作,可以訪問所有(非循環)流應用程序的基本構建塊:

  • event(流元素)
  • state(容錯,一致性,只能在Keyed流中使用)
  • timers(事件時間和處理時間,只能在keyed流中使用)

  ProcessFunction可以被認為是增加了keyed state和timers功能的FlatMapFunction。ProcesseFunction可以通過RuntimeContext訪問Flink中的Keyed State,通過processElement方法中的Context實例訪問流元素的時間戳,以及timerServer(注冊定時器),如果watermark大於等於注冊定時器的時間,就會調用onTimer方法(此處相當於一個回調函數),在調用期間,所有state的范圍再次限定在創建定時器的key上,從而允許定時器操作keyed state。

注意:如果我們想要使用keyed state和timers(定時器),我們必須在一個keyed stream上應用ProcessFunction,如下所示

stream.keyBy(...).process(new MyProcessFunction())

 

案例1:使用ProcessFunction注冊定時器

  此處要實現的功能就是使用定時器定時輸出一些數據,不能使用窗口函數,數據的類型為:時間戳,單詞(123422,hello)

ProcessFunctionWithTimerDemo
package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class ProcessFunctionWithTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 得到watermark,並沒有對原始數據進行處理
        SingleOutputStreamOperator<String> lineWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });
        // 處理數據,獲取指定字段
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], 1);
            }
        });
        //調用keyBy進行分組
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
        // 沒有划分窗口,直接調用底層的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String,Integer>>() {
            private transient ListState<Tuple2<String, Integer>> bufferState;
            // 定義狀態描述器
            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})
                );
                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }
            // 不划分窗口的話,該方法是來一條數據處理一條數據,這樣輸出端的壓力會很大
            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                //out.collect(value);
                bufferState.add(value);
                //獲取當前的event time
                Long timestamp = ctx.timestamp();
                System.out.println("current event time is : " + timestamp);

                //注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的實際,就會觸發onTimer方法
                ctx.timerService().registerEventTimeTimer(timestamp+10000);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();
                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);

                }
            }
        }).print();

        env.execute();
    }
}
View Code

由於定時器中的時間為timestamp+10000,當輸入分別輸入1000,spark;11000,spark(該條數據觸發定時器,調用onTimer()方法),輸出如下結果

 

 同時其還會產生一個新的定時器:21000觸發的定時器

注意

1.processElement()方法處理數據時一條一條進行處理的

2. 該案例實現了滾動窗口的功能,而滾動窗口的底層實現原理與此相似:processElement()方法+onTimer()方法

 

案例二:使用定時器實現類似滾動窗口的功能

ProcessFunctionWithTimerDemo2

package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 只有keyedStream在使用ProcessFunction時可以使用State和Timer定時器
 */
public class ProcessFunctionWithTimerDemo2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //1000,hello
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<String> linesWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.parseLong(element.split(",")[0]);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = linesWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String word = line.split(",")[1];
                return Tuple2.of(word, 1);
            }
        });

        //調用keyBy進行分組
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //沒有划分窗口,直接調用底層的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ListState<Tuple2<String, Integer>> bufferState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})
                );

                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                //out.collect(value);

                bufferState.add(value);
                //獲取當前的event time
                Long timestamp = ctx.timestamp();

                //10:14:13   ->   10:15:00
                //輸入的時間 [10:14:00, 10:14:59) 注冊的定時器都是 10:15:00
                System.out.println("current event time is : " + timestamp);

                //注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的時間,就會觸發onTimer方法
                long timer = timestamp - timestamp % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                ctx.timerService().registerEventTimeTimer(timer);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();

                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);
                }

                //請求當前ListState中的數據
                bufferState.clear();
            }
        }).print();

        env.execute();


    }
}
View Code

注意的代碼

//注冊定時器,如果注冊的是EventTime類型的定時器,當WaterMark大於等於注冊定時器的時間,就會觸發onTimer方法
                long timer = timestamp - timestamp % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                ctx.timerService().registerEventTimeTimer(timer);

改變:使用Process Time

ProcessFunctionWithTimerDemo3

package cn._51doit.flink.day09;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 只有keyedStream在使用ProcessFunction時可以使用State和Timer定時器
 *
 * Processing Time類型的定時器
 *
 */
public class ProcessFunctionWithTimerDemo3 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //hello
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        //調用keyBy進行分組
        KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

        //沒有划分窗口,直接調用底層的process方法
        keyed.process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private transient ListState<Tuple2<String, Integer>> bufferState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
                        "list-state",
                        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){})
                );

                bufferState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                bufferState.add(value);
                //獲取當前的processing time
                long currentProcessingTime = ctx.timerService().currentProcessingTime();

                //10:14:13   ->   10:15:00
                //輸入的時間 [10:14:00, 10:14:59) 注冊的定時器都是 10:15:00
                System.out.println("current processing time is : " + currentProcessingTime);

                //注冊定時器,如果注冊的是ProcessingTime類型的定時器,當SubTask所在機器的ProcessingTime大於等於注冊定時器的時間,就會觸發onTimer方法
                long timer = currentProcessingTime - currentProcessingTime % 60000 + 60000;
                System.out.println("next timer is: " + timer);
                //注冊ProcessingTime的定時器
                ctx.timerService().registerProcessingTimeTimer(timer);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {

                Iterable<Tuple2<String, Integer>> iterable = bufferState.get();

                for (Tuple2<String, Integer> tp : iterable) {
                    out.collect(tp);
                }

                //請求當前ListState中的數據
                bufferState.clear();
            }
        }).print();

        env.execute();


    }
}
View Code

 

2. apply方法對窗口進行全量聚合

   窗口每觸發一次時,會調用一次apply方法,相當於是對窗口中的全量數據進行計算

package cn._51doit.flink.day09;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


/**
 * apply是在窗口內進行全量的聚合,浪費資源
 */
public class HotGoodsTopN {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(60000);
        env.setParallelism(1);
        //json字符串
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() {
            @Override
            public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception {
                try {
                    MyBehavior behavior = JSON.parseObject(value, MyBehavior.class);
                    //輸出
                    out.collect(behavior);
                } catch (Exception e) {
                    //e.printStackTrace();
                    //TODO 記錄出現異常的數據
                }
            }
        });

        //提取EventTime生成WaterMark
        SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(MyBehavior element) {
                return element.timestamp;
            }
        });
        
        //按照指定的字段進行分組
        KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type");

        //窗口長度為10分組,一分鍾滑動一次
        WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));

        //SingleOutputStreamOperator<MyBehavior> sum = window.sum("counts");
        SingleOutputStreamOperator<ItemViewCount> sum = window.apply(new WindowFunction<MyBehavior, ItemViewCount, Tuple, TimeWindow>() {

            //當窗口觸發是,會調用一次apply方法,相當於是對窗口中的全量數據進行計算
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<MyBehavior> input, Collector<ItemViewCount> out) throws Exception {
                //窗口的起始時間
                long start = window.getStart();
                //窗口的結束時間
                long end = window.getEnd();
                //獲取分組的key
                String itemId = tuple.getField(0);
                String type = tuple.getField(1);

                int count = 0;
                for (MyBehavior myBehavior : input) {
                    count++;
                }
                //輸出結果
                out.collect(ItemViewCount.of(itemId, type, start, end, count++));
            }
        });

        sum.print();

        env.execute();

    }
}
View Code

此處的計算是全量計算,效率不高,因為其要等到窗口數據攢足了才觸發定時器,執行apply方法,這個apply方法相當於對窗口中的全量數據進行計算。假設窗口一直不觸發,其會將數據緩存至窗口內存中,其實就是state中,窗口內部會有state,無需自己定義。窗口若是很長的話,緩存在內存中的數據就會很多。,解決辦法是,窗口來一條數據就進行一次累加計算,即增量計算(效率更高,內存中存的知識次數)

 

3. 使用aggregate方法實現增量聚合

HotGoodsTopNAdv  

package cn._51doit.flink.day09;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

/**
 * 在窗口內增量聚合,效率更高
 */
public class HotGoodsTopNAdv {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(60000);
        env.setParallelism(1);
        //json字符串
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() {
            @Override
            public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception {
                try {
                    MyBehavior behavior = JSON.parseObject(value, MyBehavior.class);
                    //輸出
                    out.collect(behavior);
                } catch (Exception e) {
                    //e.printStackTrace();
                    //TODO 記錄出現異常的數據
                }
            }
        });

        //提取EventTime生成WaterMark
        SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(MyBehavior element) {
                return element.timestamp;
            }
        });
        
        //按照指定的字段進行分組
        KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type");

        //窗口長度為10分組,一分鍾滑動一次
        WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));


        //SingleOutputStreamOperator<MyBehavior> counts = window.sum("counts");
        //自定義窗口聚合函數
        SingleOutputStreamOperator<ItemViewCount> aggDataStream = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction());

        //按照窗口的start、end進行分組,將窗口相同的數據進行排序
        aggDataStream.keyBy("type", "windowStart", "windowEnd")
                .process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() {

                    private transient ValueState<List<ItemViewCount>> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<List<ItemViewCount>> stateDescriptor = new ValueStateDescriptor<List<ItemViewCount>>(
                                "list-state",
                                TypeInformation.of(new TypeHint<List<ItemViewCount>>() {})
                        );

                        valueState = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void processElement(ItemViewCount value, Context ctx, Collector<List<ItemViewCount>> out) throws Exception {

                        //將數據添加到State中緩存
                        List<ItemViewCount> buffer = valueState.value();
                        if(buffer == null) {
                            buffer = new ArrayList<>();
                        }
                        buffer.add(value);
                        valueState.update(buffer);
                        //注冊定時器
                        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception {

                        //將ValueState中的數據取出來
                        List<ItemViewCount> buffer = valueState.value();
                        //按照次數降序排序
                        buffer.sort(new Comparator<ItemViewCount>() {
                            @Override
                            public int compare(ItemViewCount o1, ItemViewCount o2) {
                                return -(int)(o1.viewCount - o2.viewCount);
                            }
                        });
                        //清空State
                        valueState.update(null);
                        out.collect(buffer);
                    }
                }).print(); //打印結果


        env.execute();

    }


    //三個泛型:
    //第一個:輸入的數據類型
    //第二個:計數/累加器的類型
    //第三個:輸出的數據類型
    public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> {

        //初始化一個計數器
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        //每輸入一條數據就調用一次add方法
        @Override
        public Long add(MyBehavior value, Long accumulator) {
            return accumulator + value.counts;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    //傳入4個泛型
    //第一個:輸入的數據類型(Long類型的次數)
    //第二個:輸出的數據類型(ItemViewCount)
    //第三個:分組的key(分組的字段)
    //第四個:窗口對象(起始時間、結束時間)
    public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
            //輸入的Key
            String itemId = tuple.getField(0);
            String type = tuple.getField(1);
            //窗口的起始時間
            long start = window.getStart();
            //窗口結束時間
            long end = window.getEnd();
            //窗口集合的結果
            Long count = input.iterator().next();
            //輸出數據
            out.collect(ItemViewCount.of(itemId, type, start, end, count));
        }
    }
}
View Code

涉及的重要知識點:

  • 自定義聚合函數:
    //三個泛型:
    //第一個:輸入的數據類型
    //第二個:計數/累加器的類型
    //第三個:輸出的數據類型
    public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> {
        //初始化一個計數器
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        //每輸入一條數據就調用一次add方法
        @Override
        public Long add(MyBehavior value, Long accumulator) {
            return accumulator + value.counts;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
  • 自定義WindowFunction
    //傳入4個泛型
    //第一個:輸入的數據類型(Long類型的次數)
    //第二個:輸出的數據類型(ItemViewCount)
    //第三個:分組的key(分組的字段)
    //第四個:窗口對象(起始時間、結束時間)
    public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
            //輸入的Key
            String itemId = tuple.getField(0);
            String type = tuple.getField(1);
            //窗口的起始時間
            long start = window.getStart();
            //窗口結束時間
            long end = window.getEnd();
            //窗口集合的結果
            Long count = input.iterator().next();
            //輸出數據
            out.collect(ItemViewCount.of(itemId, type, start, end, count));
        }
    }

  4.使用ProcessFunction結合定時器實現排序

package cn._51doit.flink.day09;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

/**
 * 在窗口內增量聚合,效率更高
 */
public class HotGoodsTopNAdv {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(60000);
        env.setParallelism(1);
        //json字符串
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<MyBehavior> behaviorDataStream = lines.process(new ProcessFunction<String, MyBehavior>() {
            @Override
            public void processElement(String value, Context ctx, Collector<MyBehavior> out) throws Exception {
                try {
                    MyBehavior behavior = JSON.parseObject(value, MyBehavior.class);
                    //輸出
                    out.collect(behavior);
                } catch (Exception e) {
                    //e.printStackTrace();
                    //TODO 記錄出現異常的數據
                }
            }
        });

        //提取EventTime生成WaterMark
        SingleOutputStreamOperator<MyBehavior> behaviorDataStreamWithWaterMark = behaviorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyBehavior>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(MyBehavior element) {
                return element.timestamp;
            }
        });
        
        //按照指定的字段進行分組
        KeyedStream<MyBehavior, Tuple> keyed = behaviorDataStreamWithWaterMark.keyBy("itemId", "type");

        //窗口長度為10分組,一分鍾滑動一次
        WindowedStream<MyBehavior, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)));


        //SingleOutputStreamOperator<MyBehavior> counts = window.sum("counts");
        //自定義窗口聚合函數
        SingleOutputStreamOperator<ItemViewCount> aggDataStream = window.aggregate(new MyWindowAggFunction(), new MyWindowFunction());

        //按照窗口的start、end進行分組,將窗口相同的數據進行排序
        aggDataStream.keyBy("type", "windowStart", "windowEnd")
                .process(new KeyedProcessFunction<Tuple, ItemViewCount, List<ItemViewCount>>() {

                    private transient ValueState<List<ItemViewCount>> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<List<ItemViewCount>> stateDescriptor = new ValueStateDescriptor<List<ItemViewCount>>(
                                "list-state",
                                TypeInformation.of(new TypeHint<List<ItemViewCount>>() {})
                        );

                        valueState = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void processElement(ItemViewCount value, Context ctx, Collector<List<ItemViewCount>> out) throws Exception {

                        //將數據添加到State中緩存
                        List<ItemViewCount> buffer = valueState.value();
                        if(buffer == null) {
                            buffer = new ArrayList<>();
                        }
                        buffer.add(value);
                        valueState.update(buffer);
                        //注冊定時器
                        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemViewCount>> out) throws Exception {

                        //將ValueState中的數據取出來
                        List<ItemViewCount> buffer = valueState.value();
                        //按照次數降序排序
                        buffer.sort(new Comparator<ItemViewCount>() {
                            @Override
                            public int compare(ItemViewCount o1, ItemViewCount o2) {
                                return -(int)(o1.viewCount - o2.viewCount);
                            }
                        });
                        //清空State
                        valueState.update(null);
                        out.collect(buffer);
                    }
                }).print(); //打印結果


        env.execute();

    }


    //三個泛型:
    //第一個:輸入的數據類型
    //第二個:計數/累加器的類型
    //第三個:輸出的數據類型
    public static class MyWindowAggFunction implements AggregateFunction<MyBehavior, Long, Long> {
        //初始化一個計數器
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        //每輸入一條數據就調用一次add方法
        @Override
        public Long add(MyBehavior value, Long accumulator) {
            return accumulator + value.counts;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        //只針對SessionWindow有效,對應滾動窗口、滑動窗口不會調用此方法
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    //傳入4個泛型
    //第一個:輸入的數據類型(Long類型的次數)
    //第二個:輸出的數據類型(ItemViewCount)
    //第三個:分組的key(分組的字段)
    //第四個:窗口對象(起始時間、結束時間)
    public static class MyWindowFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<ItemViewCount> out) throws Exception {
            //輸入的Key
            String itemId = tuple.getField(0);
            String type = tuple.getField(1);
            //窗口的起始時間
            long start = window.getStart();
            //窗口結束時間
            long end = window.getEnd();
            //窗口集合的結果
            Long count = input.iterator().next();
            //輸出數據
            out.collect(ItemViewCount.of(itemId, type, start, end, count));
        }
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM