使用flink實現一個topN的程序


  topN功能是一個非常常見的功能,比如查看最近幾分鍾的閱讀最高數,購買最高數。

  flink實現topN的功能也非常方便,下面就開始構建一個flink topN的程序。

  還是像上篇博客一樣,從kafka讀取數據,然后進行計算和數據轉換,最后sink到mysql中。

  假設有個需求,實現一個統計每5分鍾最高購買數的商品。

  使用maven創建一個工程,具體步驟可以參考上邊博文。然后創建一個數據庫表,用於存儲最終的結果集。語句如下:

CREATE TABLE `itembuycount` (
  `id` mediumint NOT NULL auto_increment,
  `itemId` bigint(255) NOT NULL,
  `buyCount` bigint(11) DEFAULT NULL,
  `createDate` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

  創建一個表對應的pojo類文件UserAction。里邊主要是用戶id,商品id,用戶的行為,pv用戶瀏覽,buy用戶購買,cart加購物車,fav加入收藏。

package myflinktopn.pojo;

/**
 * @author huangqingshi
 * @Date 2019-12-13
 */
public class UserAction {

    public long userId; //用戶id
    public long itemId; //商品id
    public int categoryId; //商品分類id
    public String behavior; //用戶行為(pv, buy, cart, fav)
    public long timestamp; //操作時間戳

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }

    public long getItemId() {
        return itemId;
    }

    public void setItemId(long itemId) {
        this.itemId = itemId;
    }

    public int getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(int categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}

  接下來創建一個kafka主題,存儲發送和接受數據使用。

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic USER_ACTION

  kafka的主題創建好了之后,寫一個程序往kafka里邊寫數據,一秒寫一條。

package myflinktopn.kafka;

import com.alibaba.fastjson.JSON;
import myflinktopn.pojo.UserAction;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class KafkaWriter {

    //本地的kafka機器列表
    public static final String BROKER_LIST = "localhost:9092";
    //kafka的topic
    public static final String TOPIC_USER_ACTION = "USER_ACTION";
    //key序列化的方式,采用字符串的形式
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //value的序列化的方式
    public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    //用戶的行為列表
    public static final List<String> userBehaviors = Arrays.asList("pv", "buy", "cart", "fav");

    public static void writeToKafka() throws Exception{
        Properties props = new Properties();
        props.put("bootstrap.servers", BROKER_LIST);
        props.put("key.serializer", KEY_SERIALIZER);
        props.put("value.serializer", VALUE_SERIALIZER);

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        UserAction userAction = new UserAction();
        userAction.setUserId(RandomUtils.nextLong(1, 100));
        userAction.setItemId(RandomUtils.nextLong(1, 1000));
        userAction.setCategoryId(RandomUtils.nextInt(1, 30));
        userAction.setBehavior(userBehaviors.get(RandomUtils.nextInt(0, 3)));
        userAction.setTimestamp(System.currentTimeMillis());

        //轉換成JSON
        String userActionJson = JSON.toJSONString(userAction);

        //包裝成kafka發送的記錄
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_ACTION, null,
                null, userActionJson);
        //發送到緩存
        producer.send(record);
        System.out.println("向kafka發送數據:" + userActionJson);
        //立即發送
        producer.flush();

    }

    public static void main(String[] args) {
        while(true) {
            try {
                //每1秒寫一條數據
                TimeUnit.SECONDS.sleep(1);
                writeToKafka();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

}

  接下來還是創建數據庫的連接工具類。

package myflinktopn.db;

import com.alibaba.druid.pool.DruidDataSource;

import java.sql.Connection;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class DbUtils {

    private static DruidDataSource dataSource;

    public static Connection getConnection() throws Exception {
        dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/testdb");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        //設置初始化連接數,最大連接數,最小閑置數
        dataSource.setInitialSize(10);
        dataSource.setMaxActive(50);
        dataSource.setMinIdle(5);
        //返回連接
        return  dataSource.getConnection();
    }

}

  接下來寫sink到數據庫的MySqlSink類,用於將結果接數據保存到數據庫。

package myflinktopn.sink;

import myflinktopn.TopNJob;
import myflinktopn.db.DbUtils;
import myflinktopn.pojo.UserAction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.List;

/**
 * @author huangqingshi
 * @Date 2019-12-07
 */
public class MySqlSink extends RichSinkFunction<List<TopNJob.ItemBuyCount>> {

    private PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //獲取數據庫連接,准備寫入數據庫
        connection = DbUtils.getConnection();
        String sql = "insert into itembuycount(itemId, buyCount, createDate) values (?, ?, ?); ";
        ps = connection.prepareStatement(sql);
        System.out.println("-------------open------------");
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關閉並釋放資源
        if(connection != null) {
            connection.close();
        }

        if(ps != null) {
            ps.close();
        }
        System.out.println("-------------close------------");
    }

    @Override
    public void invoke(List<TopNJob.ItemBuyCount> topNItems, Context context) throws Exception {
        for(TopNJob.ItemBuyCount itemBuyCount : topNItems) {
            ps.setLong(1, itemBuyCount.itemId);
            ps.setLong(2, itemBuyCount.buyCount);
            ps.setTimestamp(3, new Timestamp(itemBuyCount.windowEnd));
            ps.addBatch();
        }

        //一次性寫入
        int[] count = ps.executeBatch();
        System.out.println("-------------invoke------------");
        System.out.println("成功寫入Mysql數量:" + count.length);

    }
}

  接下來咱們看一下實現TopNJob的全部代碼,然后再繼續分析下里邊的細節。

package myflinktopn;

import com.alibaba.fastjson.JSONObject;
import myflinktopn.kafka.KafkaWriter;
import myflinktopn.pojo.UserAction;
import myflinktopn.sink.MySqlSink;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;

/**
 * @author huangqingshi
 * @Date 2019-12-13
 */
public class TopNJob {

    //最對延遲到達的時間
    public static final long MAX_EVENT_DELAY = 10L;

    public static void main(String[] args) throws Exception {
        //構建流執行環境
        StreamExecutionEnvironment env  = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置並行度1,方便打印
        env.setParallelism(1);

        /** ProcessingTime:事件被處理的時間。也就是由機器的系統時間來決定。
         EventTime:事件發生的時間。一般就是數據本身攜帶的時間。
         */
        //設置下eventTime,默認為processTime即系統處理時間,我們需要統計一小時內的數據,也就是數據帶的時間eventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //kafka
        Properties prop = new Properties();
        prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST);
        prop.put("zookeeper.connect", "localhost:2181");
        prop.put("group.id", KafkaWriter.TOPIC_USER_ACTION);
        prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER);
        prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER);
        prop.put("auto.offset.reset", "latest");

        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<>(
                KafkaWriter.TOPIC_USER_ACTION,
                new SimpleStringSchema(),
                prop
        ));

        //從kafka里讀取數據,轉換成UserAction對象
        DataStream<UserAction> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, UserAction.class));

        //將亂序的數據進行抽取出來,設置watermark,數據如果晚到10秒的會被丟棄
        DataStream<UserAction> timedData = dataStream.assignTimestampsAndWatermarks(new UserActionTSExtractor());

        //為了統計5分鍾購買的最多的,所以我們需要過濾出購買的行為
        DataStream<UserAction> filterData = timedData.filter(new FilterFunction<UserAction>() {
            @Override
            public boolean filter(UserAction userAction) throws Exception {
                return userAction.getBehavior().contains("buy");
            }
        });

        //窗口統計點擊量 滑動的窗口 5分鍾一次  統計一小時最高的  比如 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…
        DataStream<ItemBuyCount> windowedData = filterData
                .keyBy("itemId")
                .timeWindow(Time.minutes(60L), Time.minutes(5L))
                .aggregate(new CountAgg(), new WindowResultFunciton());

        //Top N 計算最熱門的商品
        DataStream<List<ItemBuyCount>> topItems = windowedData
                .keyBy("windowEnd")
                //點擊前3的商品
                .process(new TopNHotItems(3));

        topItems.addSink(new MySqlSink());
        //topItems.print();
        env.execute("Top N Job");
    }

    /**
     * 用於行為時間戳抽取器,最多十秒延遲,也就是晚到10秒的數據會被丟棄掉
     */
    public static class UserActionTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserAction> {


        public UserActionTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(UserAction userAction) {
            return userAction.getTimestamp();
        }
    }

    /**
     * 商品購買量(窗口操作的輸出類型)
     */
    public static class ItemBuyCount {
        public long itemId; //商品ID;
        public long windowEnd; //窗口結束時間戳
        public long buyCount; //購買數量

        public static ItemBuyCount of(long itemId, long windowEnd, long buyCount) {
            ItemBuyCount itemBuyCount = new ItemBuyCount();
            itemBuyCount.itemId = itemId;
            itemBuyCount.windowEnd = windowEnd;
            itemBuyCount.buyCount = buyCount;
            return itemBuyCount;
        }
    }

    /**
     *
     * COUNT 聚合函數實現,每出現一條記錄加一。AggregateFunction<輸入,匯總,輸出>
     */
    public static class CountAgg implements AggregateFunction<UserAction, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserAction userAction, Long acc) {
            return acc + 1;
        }

        @Override
        public Long getResult(Long acc) {
            return acc;
        }

        @Override
        public Long merge(Long acc1, Long acc2) {
            return acc1 + acc2;
        }
    }

    /**
     * 用於輸出結果的窗口WindowFunction<輸入,輸出,鍵,窗口>
     */
    public static class WindowResultFunciton implements WindowFunction<Long, ItemBuyCount, Tuple, TimeWindow> {

        @Override
        public void apply(
                Tuple key, //窗口主鍵即itemId
                TimeWindow window, //窗口
                Iterable<Long> aggregationResult, //集合函數的結果,即count的值
                Collector<ItemBuyCount> collector //輸出類型collector
        ) throws Exception {

            Long itemId = ((Tuple1<Long>) key).f0;
            Long count =aggregationResult.iterator().next();
            collector.collect(ItemBuyCount.of(itemId, window.getEnd(), count));

        }
    }


    /**
     * 求某個窗口中前N名的熱門點擊商品,key為窗口時間戳,輸出為Top N 的結果字符串
     */
    public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemBuyCount, List<ItemBuyCount>> {

        private final int topSize;

        public TopNHotItems(int topSize) {
            this.topSize = topSize;
        }

        //用於存儲商品與購買數的狀態,待收齊同一個窗口的數據后,再觸發 Top N 計算
        private ListState<ItemBuyCount> itemState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //狀態注冊
            ListStateDescriptor<ItemBuyCount> itemViewStateDesc = new ListStateDescriptor<ItemBuyCount>(
                    "itemState-state", ItemBuyCount.class
            );
            itemState = getRuntimeContext().getListState(itemViewStateDesc);
        }

        @Override
        public void processElement(
                ItemBuyCount input,
                Context context,
                Collector<List<ItemBuyCount>> collector
        ) throws Exception {
            //每條數據都保存到狀態
            itemState.add(input);
            //注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收集好了所有 windowEnd的商品數據
            context.timerService().registerEventTimeTimer(input.windowEnd + 1);
        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemBuyCount>> out) throws Exception {
            //獲取收集到的所有商品點擊量
            List<ItemBuyCount> allItems = new ArrayList<ItemBuyCount>();
            for(ItemBuyCount item : itemState.get()) {
                allItems.add(item);
            }
            //提前清除狀態中的數據,釋放空間
            itemState.clear();
            //按照點擊量從大到小排序
            allItems.sort(new Comparator<ItemBuyCount>() {
                @Override
                public int compare(ItemBuyCount o1, ItemBuyCount o2) {
                    return (int) (o2.buyCount - o1.buyCount);
                }
            });

            List<ItemBuyCount> itemBuyCounts = new ArrayList<>();
            //將排名信息格式化成String,方便打印
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            result.append("時間:").append(new Timestamp(timestamp-1)).append("\n");
            for (int i=0;i<topSize;i++) {
                ItemBuyCount currentItem = allItems.get(i);
                // No1:  商品ID=12224  購買量=2
                result.append("No").append(i).append(":")
                        .append("  商品ID=").append(currentItem.itemId)
                        .append("  購買量=").append(currentItem.buyCount)
                        .append("\n");
                itemBuyCounts.add(currentItem);
            }
            result.append("====================================\n\n");

            out.collect(itemBuyCounts);

        }
    }

}

  以上里邊的步驟:

  1. 構建流執行的環境。

  2. 設置並行度為1,為了方便下邊打印。但是實際上可以不用設置並行度。

  3. 設置流時間的特性,即EventTime。流時間一共有三種時間:

    1)ProcessingTime時間,即算子處理的時間,默認flink的時間特性。

    2)EventTime,事件處理時間,也就是程序所帶的時間戳。此例子將使用由業務產生的時間戳作為時間戳。

    3)IntestionTime,即到達flink的時間。

  4. 連接kafka的屬性讀取數據。將kafka的數據轉換成pojo對象。

  5. 將亂序的數據使用BoundedOutOfOrdernessTimestampExtractor進行時間戳抽取和設置watermark。

  6. 進行數據過濾,將useAction為buy的數據進行過濾出來,其他的直接丟棄。

  7. 采用滑動窗口的方式對數據進行匯總並返回指定類型的數據。一小時為一個窗口,每五分鍾往后滑動一下。數據的形式為[9:00 10:00), [9:05 10:05), [9:10 10:10)依次類推。

  8. 最后是將數據排序,然后獲取topN的數據。

  接下來咱們再針對里邊的細節看一下,如果數據是生序的數據,則采用AscendingTimestampExtractor進行時間戳抽取,無序可以采用上邊5條說的類。有序的只需要進行時間戳抽取,抽取的時間戳就認為是整個的時間,也就是watermark時間,可以理解為全局時間。無序抽取的時候設置一個最晚到達的時間。舉個例子,如果一條數據時間為9:04:55, 到達的時間為9:05:05,這樣這條數據還是屬於窗口[9:00 9:05:01)里邊的,而不是[9:05:01 9:10:01)。

  這個聚合方法aggregate(new CountAgg(), new WindowResultFunciton())里邊聲明了兩個類,一個是聚合功能類,一個是結果窗口功能類。也就是使用第一個類將數據進行匯聚,第二個類將數據進行合並匯總並且輸出自己定義的窗口結果集ItemBuyCount。

  最后就是topN功能,這個類實現了KeyedProcessFunction,里邊有三個方法 open, processElement, onTimer, 里邊還有一個ListState<ItemBuyCount>,用於收集和記錄狀態信息,保證數據exactly-once語義。

  open方法用於進行狀態注冊。

  processElement把數據狀態進行添加到list里邊,然后注冊時間時間windowEnd+1,即數據超過了windowEnd的時候就會觸發onTimer時間。

  onTimer用於把五分鍾收集好的數據進行處理,從數據狀態記錄中把數據拿出來,然后清理數據狀態,釋放空間。然后將拿出來的數據進行排序,最后整理成sink所需要的結果集。

  最后就把數據進行sink,保存到數據庫。

  下面看一下數據運行的結果,kafkaWriter打印的日志。

向kafka發送數據:{"behavior":"pv","categoryId":7,"itemId":19,"timestamp":1576325365016,"userId":56}

  執行TopNJob,MysqlSink執行的日志打印。

-------------invoke------------
成功寫入Mysql數量:3
[myflinktopn.TopNJob$ItemBuyCount@611553e4, myflinktopn.TopNJob$ItemBuyCount@19739780, myflinktopn.TopNJob$ItemBuyCount@a76f1f0]
-------------invoke------------
成功寫入Mysql數量:3
[myflinktopn.TopNJob$ItemBuyCount@3db32bd3, myflinktopn.TopNJob$ItemBuyCount@10f855ae, myflinktopn.TopNJob$ItemBuyCount@6a022d0b]
-------------invoke------------
成功寫入Mysql數量:3
[myflinktopn.TopNJob$ItemBuyCount@4ae16ab8, myflinktopn.TopNJob$ItemBuyCount@6a8a29a3, myflinktopn.TopNJob$ItemBuyCount@2326adfb]

  寫入到數據庫的記錄如下:

 

 

    寫入的時間都在同一個時刻,獲取前三條購買最多的,所以三條寫入的時間都一樣的。

  還有在運行的時候要注意kafkaWriter要多謝一些,因為要收集5分鍾的數據,所以至少得跑5分鍾。

  最后我把代碼放到git上了,可以進行訪問:https://github.com/stonehqs/flink-topn ,有不對的地方,歡迎指正。 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM