轉發請注明原創地址:https://www.cnblogs.com/dongxiao-yang/p/9198977.html
TopN 是統計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的TopN可以使業務方在內存中按照某個統計指標(如出現次數)計算排名並快速出發出更新后的排行榜。
我們以統計詞頻為例展示一下如何快速開發一個計算TopN的flink程序。
flink支持各種各樣的流數據接口作為數據的數據源,本次demo我們采用內置的socketTextStream作為數據數據源。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時間語義 DataStream<String> text = env.socketTextStream(hostName, port); //監聽指定socket端口作為輸入
與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個一個單詞,然后按照單詞為key實現累加
DataStream<Tuple2<String, Integer>> ds = text .flatMap(new LineSplitter()); //將輸入語句split成一個一個單詞並初始化count值為1的Tuple2<String, Integer>類型
private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
DataStream<Tuple2<String, Integer>> wcount = ds .keyBy(0) //按照Tuple2<String, Integer>的第一個元素為key,也就是單詞 .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20))) //key之后的元素進入一個總時間長度為600s,每20s向后滑動一次的滑動窗口 .sum(1);// 將相同的key的元素第二個count值相加
全局TopN
數據流經過前面的處理后會每20s計算一次各個單詞的count值並發送到下游窗口
DataStream<Tuple2<String, Integer>> ret = wcount .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))) //所有key元素進入一個20s長的窗口(選20秒是因為上游窗口每20s計算一輪數據,topN窗口一次計算只統計一個窗口時間內的變化) .process(new TopNAllFunction(5));//計算該窗口TopN
windowAll是一個全局並發為1的特殊操作,也就是所有元素都會進入到一個窗口內進行計算。
private static class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> { private int topSize = 10; public TopNAllFunction(int topSize) { // TODO Auto-generated constructor stub this.topSize = topSize; } @Override public void process( ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context arg0, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { // TODO Auto-generated method stub TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { // TODO Auto-generated method stub return (x < y) ? -1 : 1; } }); //treemap按照key降序排列,相同count值不覆蓋 for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { //只保留前面TopN個元素 treemap.pollLastEntry(); } } for (Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect(entry.getValue()); } } }
分組TopN
在部分場景下,用戶希望根據不同的分組進行排序,計算出每個分組的一個排行榜。
wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組 .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統計上游數據 .process(new TopNFunction(5)) //分組TopN統計
private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { // TODO Auto-generated method stub return value.f0.substring(0, 1); //取首字母做key } }
/** * *針對keyby window的TopN函數,繼承自ProcessWindowFunction * */ private static class TopNFunction extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> { private int topSize = 10; public TopNFunction(int topSize) { // TODO Auto-generated constructor stub this.topSize = topSize; } @Override public void process( String arg0, ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context arg1, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { // TODO Auto-generated method stub TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>( new Comparator<Integer>() { @Override public int compare(Integer y, Integer x) { // TODO Auto-generated method stub return (x < y) ? -1 : 1; } }); for (Tuple2<String, Integer> element : input) { treemap.put(element.f1, element); if (treemap.size() > topSize) { treemap.pollLastEntry(); } } for (Entry<Integer, Tuple2<String, Integer>> entry : treemap .entrySet()) { out.collect(entry.getValue()); } } }
上面的代碼實現了按照首字母分組,取每組元素count最高的TopN方法。
嵌套TopN
全局topN的缺陷是,由於windowall是一個全局並發為1的操作,所有的數據只能匯集到一個節點進行 TopN 的計算,那么計算能力就會受限於單台機器,容易產生數據熱點問題。
解決思路就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用於分散熱點。例如可以先加一層分組 TopN,第一層會計算出每一組的 TopN,而后在第二層中進行合並匯總,得到最終的全網TopN。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴展的。