Flink使用二次聚合實現TopN計算-亂序數據


一、背景說明:

在上篇文章實現了TopN計算,但是碰到遲到數據則會無法在當前窗口計算,需要對其中的鍵控狀態優化

Flink使用二次聚合實現TopN計算

本次需求是對數據進行統計,要求每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL,數據流預覽如下(每次一條從端口傳入):

208.115.111.72 - - 17/05/2015:10:25:49 +0000 GET /?N=A&page=21   //15:50-25:50窗口數據
208.115.111.72 - - 17/05/2015:10:25:50 +0000 GET /?N=A&page=21
208.115.111.72 - - 17/05/2015:10:25:51 +0000 GET /?N=A&page=21
208.115.111.72 - - 17/05/2015:10:25:52 +0000 GET /?N=A&page=21   //第一次觸發計算,15:50-25:50窗口
208.115.111.72 - - 17/05/2015:10:25:47 +0000 GET /?N=A&          //遲到數據,不同url
208.115.111.72 - - 17/05/2015:10:25:53 +0000 GET /?N=A&page=21   //第二次觸發計算,15:50-25:50窗口
208.115.111.72 - - 17/05/2015:10:25:46 +0000 GET /?N=A&page=21   //遲到數據
208.115.111.72 - - 17/05/2015:10:25:54 +0000 GET /?N=A&page=21   //第三次觸發計算

最后統計輸出結果如下(遲到數據均在25:50窗口):

==============2015-05-17 10:25:50.0==============               //第一次觸發計算結果
Top1 Url:/?N=A&page=21 Counts:1
==============2015-05-17 10:25:50.0==============

==============2015-05-17 10:25:50.0==============               //第二次觸發計算結果
Top1 Url:/?N=A&page=21 Counts:1
Top2 Url:/?N=A& Counts:1
==============2015-05-17 10:25:50.0==============

==============2015-05-17 10:25:50.0==============               //第三次觸發計算結果
Top1 Url:/?N=A&page=21 Counts:2
Top2 Url:/?N=A& Counts:1
==============2015-05-17 10:25:50.0==============

二、實現過程

  1. 實現思路:
    ①建立環境,設置並行度及CK。
    ②定義watermark策略及事件時間,獲取數據並對應到JavaBean。
    ③第一次聚合,按url分組開窗聚合,使用aggregate算子進行增量計算。
    ④第二次聚合,按窗口聚合,使用MapState存放數據,定義第一個定時器,在watermark達到后1秒觸發,對窗口數據排序輸出,定義第二個定時器,窗口關閉后才清楚狀態。
    ⑤打印結果及執行。

ps:亂序數據不能使用讀取本地文本文件的方式測試,文件讀取加載比較快,無法觀察到遲到數據處理效果,亂序數據的開發測試這里從服務器端口獲取數據的方式測試

  1. 代碼細節說明:

只針對優化部分代碼說明,其他代碼可以在順序數據篇文章查看,這里提取重寫KeyedProcessFunction里面方法的部分代碼

@Override
public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
	//狀態裝入數據
	mapState.put(value.getUrl(), value);
	//定時器,窗口一秒后觸發
	ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
	//再加一個定時器來清除狀態用,在窗口關閉后再清除狀態,這樣延遲數據到達后窗口還能做排序
	ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+61001L);
}
//定時器內容
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
	if (timestamp == ctx.getCurrentKey()+61001L){
		mapState.clear();
		return;}
...
  • 這里改用MapState,如若使用ListState,進來遲到數據后,則會出現同個url在同個窗口的統計出現多個計數的情況,列表狀態不具備去重功能,故在這里使用map狀態來實現去重。
  • 這里使用定時器來清除狀態,原寫法是在onTimer最后排序完直接清除狀態,則會導致遲到數據到達后,原窗口其他數據被清除掉無法實現排名的輸出,這里定時器的時間是在61001毫秒后清除狀態數據。
  • 定時器61001毫秒 = 允許遲到數據1秒(forBoundedOutOfOrderness)+窗口遲到數據1分鍾(allowedLateness)+第一個定時器1毫秒。

三、完整代碼

package com.test.topN;

import bean.ApacheLog;
import bean.UrlCount;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
/**
 * @author: Rango
 * @create: 2021-05-26 10:16
 * @description: 每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL
 **/
public class URLTopN3 {
    public static void main(String[] args) throws Exception {

        //1.建立環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //2.讀取端口數據並映射到JavaBean,並定義watermark時間語義
        WatermarkStrategy<ApacheLog> wms = WatermarkStrategy
                .<ApacheLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner(new SerializableTimestampAssigner<ApacheLog>() {
                    @Override
                    public long extractTimestamp(ApacheLog element, long recordTimestamp) {
                        return element.getTs();
                    }});

        SingleOutputStreamOperator<ApacheLog> apacheLogDS = env.socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, ApacheLog>() {
                    @Override
                    public ApacheLog map(String value) throws Exception {
                        SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yy:HH:mm:ss");
                        String[] split = value.split(" ");
                        return new ApacheLog(split[0],
                                split[2],
                                sdf.parse(split[3]).getTime(),
                                split[5],
                                split[6]);
                    }})
                .assignTimestampsAndWatermarks(wms);

        //3.第一次聚合,按url轉為tuple2分組,開窗,增量聚合
        SingleOutputStreamOperator<UrlCount> aggregateDS = apacheLogDS
                .map(new MapFunction<ApacheLog, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(ApacheLog value) throws Exception {
                return new Tuple2<>(value.getUrl(), 1);
            }}).keyBy(data -> data.f0)
                .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5)))
                .allowedLateness(Time.minutes(1))
                .aggregate(new HotUrlAggFunc(), new HotUrlWindowFunc());

        //4.第二次聚合,對第一次聚合輸出按窗口分組,再全窗口聚合,建立定時器你,每5秒鍾觸發一次
        SingleOutputStreamOperator<String> processDS = aggregateDS
                .keyBy(data -> data.getWindowEnd())
                .process(new HotUrlProcessFunc(5));

        processDS.print();
        env.execute();
    }
    //實現AggregateFunction類中的方法
    public static class HotUrlAggFunc implements AggregateFunction<Tuple2<String, Integer>,Integer,Integer>{
        @Override
        public Integer createAccumulator() {return 0;}
        @Override
        public Integer add(Tuple2<String, Integer> value, Integer accumulator) { return accumulator+1;}
        @Override
        public Integer getResult(Integer accumulator) {return accumulator;}
        @Override
        public Integer merge(Integer a, Integer b) {return a+b; }
    }
    //實現窗口函數的apply方法,把累加函數輸出的整數結果,轉換為javabean類urlcount來做輸出,方便后續按窗口聚合
    public static class HotUrlWindowFunc implements WindowFunction<Integer, UrlCount,String, TimeWindow> {
        @Override
        public void apply(String urls, TimeWindow window, Iterable<Integer> input, Collector<UrlCount> out) throws Exception {
            //獲取按key相加后的次數並新建javabean(urlcount)作為返回
            Integer count = input.iterator().next();
            out.collect(new UrlCount(urls,window.getEnd(),count));
        }
    }
    //繼承KeyedProcessFunction方法,重寫processElemnt方法
    public static class HotUrlProcessFunc extends KeyedProcessFunction<Long,UrlCount,String>{
        //定義TopN為入參
        private Integer TopN;
        public HotUrlProcessFunc(Integer topN) {
            TopN = topN;
        }
        //定義狀態
        private MapState <String,UrlCount>mapState;
        //open方法中初始化狀態
        @Override
        public void open(Configuration parameters) throws Exception {
            mapState = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<String, UrlCount>("map-state",String.class,UrlCount.class));
        }
        @Override
        public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
            //狀態裝入數據
            mapState.put(value.getUrl(), value);
            //定時器,窗口一秒后觸發
            ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+1L);
            //再加一個定時器來清除狀態用,在窗口關閉后再清除狀態,這樣延遲數據到達后窗口還能做排序
            ctx.timerService().registerEventTimeTimer(value.getWindowEnd()+61001L);
        }
        //定時器內容
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            if (timestamp == ctx.getCurrentKey()+61001L){
                mapState.clear();
                return;}

            //取出狀態數據
            Iterator<Map.Entry<String, UrlCount>> iterator = mapState.iterator();
            ArrayList<Map.Entry<String, UrlCount>> entries = Lists.newArrayList(iterator);

            //排序
            entries.sort(((o1, o2) -> o2.getValue().getCount()-o1.getValue().getCount()));

            //排序后裝入StringBulider作為輸出TopN
            StringBuilder sb = new StringBuilder();
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1L))
                    .append("==============")
                    .append("\n");
            for (int i = 0; i < Math.min(TopN,entries.size()); i++) {
                UrlCount urlCount = entries.get(i).getValue();
                sb.append("Top").append(i+1);
                sb.append(" Url:").append(urlCount.getUrl());
                sb.append(" Counts:").append(urlCount.getCount());
                sb.append("\n");
            }
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1L))
                    .append("==============")
                    .append("\n")
                    .append("\n");

            out.collect(sb.toString());
            Thread.sleep(200);
            }}}
           

映射數據源的JavaBean

package bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ApacheLog {
    private String ip;
    private String userId;
    private Long ts;
    private String method;
    private String url;
}

第一次聚合輸出的JavaBean

package bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UrlCount {
    private String url;
    private Long windowEnd;
    private Integer count;
}

學習交流,有任何問題還請隨時評論指出交流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM