基於flink快速開發實時TopN程序


轉發請注明原創地址:https://www.cnblogs.com/dongxiao-yang/p/9198977.html

 

TopN 是統計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的TopN可以使業務方在內存中按照某個統計指標(如出現次數)計算排名並快速出發出更新后的排行榜。

我們以統計詞頻為例展示一下如何快速開發一個計算TopN的flink程序。

 

flink支持各種各樣的流數據接口作為數據的數據源,本次demo我們采用內置的socketTextStream作為數據數據源。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時間語義

DataStream<String> text = env.socketTextStream(hostName, port); //監聽指定socket端口作為輸入

 

與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個一個單詞,然后按照單詞為key實現累加

DataStream<Tuple2<String, Integer>> ds = text
                .flatMap(new LineSplitter()); //將輸入語句split成一個一個單詞並初始化count值為1的Tuple2<String, Integer>類型


private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }

 

    DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0) //按照Tuple2<String, Integer>的第一個元素為key,也就是單詞
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20))) 
                //key之后的元素進入一個總時間長度為600s,每20s向后滑動一次的滑動窗口
                .sum(1);// 將相同的key的元素第二個count值相加

 

全局TopN

數據流經過前面的處理后會每20s計算一次各個單詞的count值並發送到下游窗口

        DataStream<Tuple2<String, Integer>> ret = wcount
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) 
                //所有key元素進入一個20s長的窗口(選20秒是因為上游窗口每20s計算一輪數據,topN窗口一次計算只統計一個窗口時間內的變化)
                .process(new TopNAllFunction(5));//計算該窗口TopN

windowAll是一個全局並發為1的特殊操作,也就是所有元素都會進入到一個窗口內進行計算。

 

    private static class TopNAllFunction
            extends
            ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {

        private int topSize = 10;

        public TopNAllFunction(int topSize) {
            // TODO Auto-generated constructor stub

            this.topSize = topSize;
        }

        @Override
        public void process(
                ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // TODO Auto-generated method stub

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            // TODO Auto-generated method stub
                            return (x < y) ? -1 : 1;
                        }

                    }); //treemap按照key降序排列,相同count值不覆蓋

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) { //只保留前面TopN個元素
                    treemap.pollLastEntry();
                }
            }

            for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect(entry.getValue());
            }

        }

    }

 

分組TopN

在部分場景下,用戶希望根據不同的分組進行排序,計算出每個分組的一個排行榜。

    wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組
                .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統計上游數據
                .process(new TopNFunction(5)) //分組TopN統計

 

    private static class TupleKeySelectorByStart implements
            KeySelector<Tuple2<String, Integer>, String> {

        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            // TODO Auto-generated method stub
            return value.f0.substring(0, 1); //取首字母做key
        }

    }

 

/**
     * 
     *針對keyby window的TopN函數,繼承自ProcessWindowFunction
     *
     */
    private static class TopNFunction
            extends
            ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {

        private int topSize = 10;

        public TopNFunction(int topSize) {
            // TODO Auto-generated constructor stub
            this.topSize = topSize;
        }

        @Override
        public void process(
                String arg0,
                ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context arg1,
                Iterable<Tuple2<String, Integer>> input,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // TODO Auto-generated method stub

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            // TODO Auto-generated method stub
                            return (x < y) ? -1 : 1;
                        }

                    });

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) {
                    treemap.pollLastEntry();
                }
            }

            for (Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect(entry.getValue());
            }

        }

    }

 

上面的代碼實現了按照首字母分組,取每組元素count最高的TopN方法。

 

嵌套TopN

全局topN的缺陷是,由於windowall是一個全局並發為1的操作,所有的數據只能匯集到一個節點進行 TopN 的計算,那么計算能力就會受限於單台機器,容易產生數據熱點問題。

解決思路就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用於分散熱點。例如可以先加一層分組 TopN,第一層會計算出每一組的 TopN,而后在第二層中進行合並匯總,得到最終的全網TopN。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴展的。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM