Flink使用二次聚合實現TopN計算


一、背景說明:

有需求需要對數據進行統計,要求每隔5分鍾輸出最近1小時內點擊量最多的前N個商品,數據格式預覽如下:

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
....

最后統計輸出結果如下:

==============2017-11-26 09:05:00.0==============
Top1 ItemId:5051027 Counts:3
Top2 ItemId:3493253 Counts:3
Top3 ItemId:4261030 Counts:3
Top4 ItemId:4894670 Counts:2
Top5 ItemId:3781391 Counts:2
==============2017-11-26 09:05:00.0==============

==============2017-11-26 09:10:00.0==============
Top1 ItemId:812879 Counts:5
Top2 ItemId:2600165 Counts:4
Top3 ItemId:2828948 Counts:4
Top4 ItemId:2338453 Counts:4
Top5 ItemId:4261030 Counts:4
==============2017-11-26 09:10:00.0==============

二、實現過程

  1. 實現思路:
    ①建立環境,設置並行度及CK。
    ②定義watermark策略及事件時間,獲取數據並對應到JavaBean,篩選pv數據。
    ③第一次聚合,按商品id分組開窗聚合,使用aggregate算子進行增量計算。
    ④第二次聚合,按窗口聚合,使用ListState存放數據,並定義定時器,在watermark達到后1秒觸發,對窗口數據排序輸出。
    ⑤打印結果及執行。

  2. 代碼細節說明:
    2.1 、第一次聚合代碼:

//第一次聚合
SingleOutputStreamOperator<ItemCount> aggregateDS = userBehaviorDS
		.map(new MapFunction<UserBehavior, Tuple2<Long, Integer>>() {
			@Override
			public Tuple2<Long, Integer> map(UserBehavior value) throws Exception {
				return new Tuple2<>(value.getItemId(), 1);
			}})
		.keyBy(data -> data.f0)
		.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
		.aggregate(new ItemCountAggFunc(), new ItemCountWindowFunc());

①第一次聚合這里將商品id進行提取並轉換為Tuple2<id,1>的格式,再對id進行keyby后聚合,避免直接使用對應的JavaBean進行分組聚合提高效率:

②這里使用aggregate算子進行增量計算,Flink的window function來負責一旦窗口關閉, 去計算處理窗口中的每個元素。window function 是如下三種:

  • ReduceFunction (增量聚合函數) 輸入及輸出類型得一致
  • AggregateFunction(增量聚合函數)輸入及輸出類型可以不一致
  • ProcessWindowFunction(全窗口函數)

ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以對到來的元素進行增量聚合 .
ProcessWindowFunction 可以得到一個包含這個窗口中所有元素的迭代器, 以及這些元素所屬窗口的一些元數據信息.
ProcessWindowFunction不能被高效執行的原因是Flink在執行這個函數之前, 需要在內部緩存這個窗口上所有的元素

2.2、重寫AggregateFunction函數代碼

public static class ItemCountAggFunc implements AggregateFunction<Tuple2<Long,Integer>,Integer,Integer>{
	@Override
	public Integer createAccumulator() { return 0; }
	@Override
	public Integer add(Tuple2<Long, Integer> value, Integer accumulator) { return accumulator+1; }
	@Override
	public Integer getResult(Integer accumulator) { return accumulator; }
	@Override
	public Integer merge(Integer a, Integer b) { return a+b; }
}

這里對AggregateFunction函數里面四個方法進行重寫自定義計數規則,入參<IN,ACC,OUT>對應為Tuple2,累加器用Integer過度,輸出結果為Integer。

  • createAccumulator
    這個方法首先要創建一個累加器,要進行一些初始化的工作,這里初始值為0.
  • add
    add方法就是做聚合的時候的核心邏輯,這里這是對tuple的第二位整數進行累加。
  • merge
    Flink是一個分布式計算框架,可能計算是分布在很多節點上同時進行的,如果計算在多個節點進行,需要對結果進行合並,這個merge方法就是做這個工作的,所以入參和出參的類型都是中間結果類型ACC。
  • getResult
    這個方法就是將每個用戶最后聚合的結果經過處理之后,按照OUT的類型返回,返回的結果也就是聚合函數的輸出結果了。

這里也是AggregateFunction和ReduceFunction區別的地方,reduce的input為Tuple2,則output也必須是Tuple2。

三、完整代碼

這里處理的是順序數據,如果是亂序數據,在窗口觸發計算后遲到數據統計會有問題,優化思路為在窗口關閉后再觸發鍵控狀態的清除,及使用MapState來避免同個產品Id多個結果的問題。

package com.test.topN;

import bean.ItemCount;
import bean.UserBehavior;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
/**
 * @author: Rango
 * @create: 2021-05-24 10:37
 * @description: 每隔5分鍾輸出最近1小時內點擊量最多的前N個商品
 **/
public class ProductTopN {
    public static void main(String[] args) throws Exception {
        //1.建立環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //2.設置watermark及定義事件時間,從socket獲取數據並對應到JavaBean,篩選只取pv數據
        WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                    @Override
                    public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                        return element.getTimestamp() * 1000L;
                    }
                });
        SingleOutputStreamOperator<UserBehavior> userBehaviorDS = env
                //.socketTextStream("hadoop102", 9999)
                .readTextFile("input/UserBehavior.csv")
                .map(new MapFunction<String, UserBehavior>() {
                    @Override
                    public UserBehavior map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new UserBehavior(Long.parseLong(split[0]),
                                Long.parseLong(split[1]),
                                Integer.parseInt(split[2]),
                                split[3],
                                Long.parseLong(split[4]));
                    }
                })
                .filter(data -> "pv".equals(data.getBehavior()))
                .assignTimestampsAndWatermarks(wms);

        //3.第一次聚合,按商品id分組開窗聚合,使用aggregate進行增量計算,將商品id用tuple2抽離出來提高效率
        SingleOutputStreamOperator<ItemCount> aggregateDS = userBehaviorDS
                .map(new MapFunction<UserBehavior, Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> map(UserBehavior value) throws Exception {
                        return new Tuple2<>(value.getItemId(), 1);
                    }})
                .keyBy(data -> data.f0)
                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
                .aggregate(new ItemCountAggFunc(), new ItemCountWindowFunc());

        //4.第二次聚合,按窗口聚合,基於狀態編程實現窗口內有序
        SingleOutputStreamOperator<String> processDS = aggregateDS.keyBy(ItemCount::getTime)
                .process(new ItemCountProcessFunc(5));

        //5.打印結果並執行
        processDS.print();
        env.execute();
    }

    public static class ItemCountAggFunc implements AggregateFunction<Tuple2<Long,Integer>,Integer,Integer>{
        @Override
        public Integer createAccumulator() { return 0; }
        @Override
        public Integer add(Tuple2<Long, Integer> value, Integer accumulator) { return accumulator+1; }
        @Override
        public Integer getResult(Integer accumulator) { return accumulator; }
        @Override
        public Integer merge(Integer a, Integer b) { return a+b; }
    }
    public static class ItemCountWindowFunc implements WindowFunction<Integer, ItemCount,Long, TimeWindow>{
        @Override
        public void apply(Long key, TimeWindow window, Iterable<Integer> input, Collector<ItemCount> out) throws Exception {
            Integer next = input.iterator().next();
            out.collect(new ItemCount(key,new Timestamp(window.getEnd()).toString(),next));
        }
    }
    public static class ItemCountProcessFunc extends KeyedProcessFunction<String,ItemCount,String>{
        //定義構造器可以按入參取排名
        private Integer topN;
        public ItemCountProcessFunc(Integer topN) {
            this.topN = topN;
        }

        //使用liststatus並初始化
        private ListState <ItemCount>listState;
        @Override
        public void open(Configuration parameters) throws Exception {
            listState= getRuntimeContext()
                    .getListState(new ListStateDescriptor<ItemCount>("list-state",ItemCount.class));
        }
        //定時器
        @Override
        public void processElement(ItemCount value, Context ctx, Collector<String> out) throws Exception {
            listState.add(value);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            ctx.timerService().registerEventTimeTimer(sdf.parse(value.getTime()).getTime()+1000L);
        }
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //1.獲取狀態的數據並轉為List
            Iterator<ItemCount> iterator = listState.get().iterator();
            ArrayList<ItemCount> itemCounts = Lists.newArrayList(iterator);

            //2.排序
            itemCounts.sort(((o1, o2) -> o2.getCount() - o1.getCount()));

            //3.獲取前n
            StringBuilder sb = new StringBuilder();
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1000L))
                    .append("==============")
                    .append("\n");
            for (int i = 0; i < Math.min(topN,itemCounts.size()); i++) {
                ItemCount itemCount = itemCounts.get(i);
                sb.append("Top").append(i+1);
                sb.append(" ItemId:").append(itemCount.getItem());
                sb.append(" Counts:").append(itemCount.getCount());
                sb.append("\n");
            }
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1000L))
                    .append("==============")
                    .append("\n")
                    .append("\n");
            listState.clear();
            out.collect(sb.toString());
            Thread.sleep(200);//方便查看結果時間休眠
        }}}

學習交流,有任何問題還請隨時評論指出交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM