topN功能是一個非常常見的功能,比如查看最近幾分鍾的閱讀最高數,購買最高數。
flink實現topN的功能也非常方便,下面就開始構建一個flink topN的程序。
還是像上篇博客一樣,從kafka讀取數據,然后進行計算和數據轉換,最后sink到mysql中。
假設有個需求,實現一個統計每5分鍾最高購買數的商品。
使用maven創建一個工程,具體步驟可以參考上邊博文。然后創建一個數據庫表,用於存儲最終的結果集。語句如下:
CREATE TABLE `itembuycount` ( `id` mediumint NOT NULL auto_increment, `itemId` bigint(255) NOT NULL, `buyCount` bigint(11) DEFAULT NULL, `createDate` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
創建一個表對應的pojo類文件UserAction。里邊主要是用戶id,商品id,用戶的行為,pv用戶瀏覽,buy用戶購買,cart加購物車,fav加入收藏。
package myflinktopn.pojo; /** * @author huangqingshi * @Date 2019-12-13 */ public class UserAction { public long userId; //用戶id public long itemId; //商品id public int categoryId; //商品分類id public String behavior; //用戶行為(pv, buy, cart, fav) public long timestamp; //操作時間戳 public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public long getItemId() { return itemId; } public void setItemId(long itemId) { this.itemId = itemId; } public int getCategoryId() { return categoryId; } public void setCategoryId(int categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } }
接下來創建一個kafka主題,存儲發送和接受數據使用。
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic USER_ACTION
kafka的主題創建好了之后,寫一個程序往kafka里邊寫數據,一秒寫一條。
package myflinktopn.kafka; import com.alibaba.fastjson.JSON; import myflinktopn.pojo.UserAction; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2019-12-07 */ public class KafkaWriter { //本地的kafka機器列表 public static final String BROKER_LIST = "localhost:9092"; //kafka的topic public static final String TOPIC_USER_ACTION = "USER_ACTION"; //key序列化的方式,采用字符串的形式 public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; //value的序列化的方式 public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; //用戶的行為列表 public static final List<String> userBehaviors = Arrays.asList("pv", "buy", "cart", "fav"); public static void writeToKafka() throws Exception{ Properties props = new Properties(); props.put("bootstrap.servers", BROKER_LIST); props.put("key.serializer", KEY_SERIALIZER); props.put("value.serializer", VALUE_SERIALIZER); KafkaProducer<String, String> producer = new KafkaProducer<>(props); UserAction userAction = new UserAction(); userAction.setUserId(RandomUtils.nextLong(1, 100)); userAction.setItemId(RandomUtils.nextLong(1, 1000)); userAction.setCategoryId(RandomUtils.nextInt(1, 30)); userAction.setBehavior(userBehaviors.get(RandomUtils.nextInt(0, 3))); userAction.setTimestamp(System.currentTimeMillis()); //轉換成JSON String userActionJson = JSON.toJSONString(userAction); //包裝成kafka發送的記錄 ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_ACTION, null, null, userActionJson); //發送到緩存 producer.send(record); System.out.println("向kafka發送數據:" + userActionJson); //立即發送 producer.flush(); } public static void main(String[] args) { while(true) { try { //每1秒寫一條數據 TimeUnit.SECONDS.sleep(1); writeToKafka(); } catch (Exception e) { e.printStackTrace(); } } } }
接下來還是創建數據庫的連接工具類。
package myflinktopn.db; import com.alibaba.druid.pool.DruidDataSource; import java.sql.Connection; /** * @author huangqingshi * @Date 2019-12-07 */ public class DbUtils { private static DruidDataSource dataSource; public static Connection getConnection() throws Exception { dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://localhost:3306/testdb"); dataSource.setUsername("root"); dataSource.setPassword("root"); //設置初始化連接數,最大連接數,最小閑置數 dataSource.setInitialSize(10); dataSource.setMaxActive(50); dataSource.setMinIdle(5); //返回連接 return dataSource.getConnection(); } }
接下來寫sink到數據庫的MySqlSink類,用於將結果接數據保存到數據庫。
package myflinktopn.sink; import myflinktopn.TopNJob; import myflinktopn.db.DbUtils; import myflinktopn.pojo.UserAction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.Timestamp; import java.util.List; /** * @author huangqingshi * @Date 2019-12-07 */ public class MySqlSink extends RichSinkFunction<List<TopNJob.ItemBuyCount>> { private PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //獲取數據庫連接,准備寫入數據庫 connection = DbUtils.getConnection(); String sql = "insert into itembuycount(itemId, buyCount, createDate) values (?, ?, ?); "; ps = connection.prepareStatement(sql); System.out.println("-------------open------------"); } @Override public void close() throws Exception { super.close(); //關閉並釋放資源 if(connection != null) { connection.close(); } if(ps != null) { ps.close(); } System.out.println("-------------close------------"); } @Override public void invoke(List<TopNJob.ItemBuyCount> topNItems, Context context) throws Exception { for(TopNJob.ItemBuyCount itemBuyCount : topNItems) { ps.setLong(1, itemBuyCount.itemId); ps.setLong(2, itemBuyCount.buyCount); ps.setTimestamp(3, new Timestamp(itemBuyCount.windowEnd)); ps.addBatch(); } //一次性寫入 int[] count = ps.executeBatch(); System.out.println("-------------invoke------------"); System.out.println("成功寫入Mysql數量:" + count.length); } }
接下來咱們看一下實現TopNJob的全部代碼,然后再繼續分析下里邊的細節。
package myflinktopn; import com.alibaba.fastjson.JSONObject; import myflinktopn.kafka.KafkaWriter; import myflinktopn.pojo.UserAction; import myflinktopn.sink.MySqlSink; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Properties; /** * @author huangqingshi * @Date 2019-12-13 */ public class TopNJob { //最對延遲到達的時間 public static final long MAX_EVENT_DELAY = 10L; public static void main(String[] args) throws Exception { //構建流執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度1,方便打印 env.setParallelism(1); /** ProcessingTime:事件被處理的時間。也就是由機器的系統時間來決定。 EventTime:事件發生的時間。一般就是數據本身攜帶的時間。 */ //設置下eventTime,默認為processTime即系統處理時間,我們需要統計一小時內的數據,也就是數據帶的時間eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //kafka Properties prop = new Properties(); prop.put("bootstrap.servers", KafkaWriter.BROKER_LIST); prop.put("zookeeper.connect", "localhost:2181"); prop.put("group.id", KafkaWriter.TOPIC_USER_ACTION); prop.put("key.serializer", KafkaWriter.KEY_SERIALIZER); prop.put("value.serializer", KafkaWriter.VALUE_SERIALIZER); prop.put("auto.offset.reset", "latest"); DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<>( KafkaWriter.TOPIC_USER_ACTION, new SimpleStringSchema(), prop )); //從kafka里讀取數據,轉換成UserAction對象 DataStream<UserAction> dataStream = dataStreamSource.map(value -> JSONObject.parseObject(value, UserAction.class)); //將亂序的數據進行抽取出來,設置watermark,數據如果晚到10秒的會被丟棄 DataStream<UserAction> timedData = dataStream.assignTimestampsAndWatermarks(new UserActionTSExtractor()); //為了統計5分鍾購買的最多的,所以我們需要過濾出購買的行為 DataStream<UserAction> filterData = timedData.filter(new FilterFunction<UserAction>() { @Override public boolean filter(UserAction userAction) throws Exception { return userAction.getBehavior().contains("buy"); } }); //窗口統計點擊量 滑動的窗口 5分鍾一次 統計一小時最高的 比如 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… DataStream<ItemBuyCount> windowedData = filterData .keyBy("itemId") .timeWindow(Time.minutes(60L), Time.minutes(5L)) .aggregate(new CountAgg(), new WindowResultFunciton()); //Top N 計算最熱門的商品 DataStream<List<ItemBuyCount>> topItems = windowedData .keyBy("windowEnd") //點擊前3的商品 .process(new TopNHotItems(3)); topItems.addSink(new MySqlSink()); //topItems.print(); env.execute("Top N Job"); } /** * 用於行為時間戳抽取器,最多十秒延遲,也就是晚到10秒的數據會被丟棄掉 */ public static class UserActionTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserAction> { public UserActionTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(UserAction userAction) { return userAction.getTimestamp(); } } /** * 商品購買量(窗口操作的輸出類型) */ public static class ItemBuyCount { public long itemId; //商品ID; public long windowEnd; //窗口結束時間戳 public long buyCount; //購買數量 public static ItemBuyCount of(long itemId, long windowEnd, long buyCount) { ItemBuyCount itemBuyCount = new ItemBuyCount(); itemBuyCount.itemId = itemId; itemBuyCount.windowEnd = windowEnd; itemBuyCount.buyCount = buyCount; return itemBuyCount; } } /** * * COUNT 聚合函數實現,每出現一條記錄加一。AggregateFunction<輸入,匯總,輸出> */ public static class CountAgg implements AggregateFunction<UserAction, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserAction userAction, Long acc) { return acc + 1; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } } /** * 用於輸出結果的窗口WindowFunction<輸入,輸出,鍵,窗口> */ public static class WindowResultFunciton implements WindowFunction<Long, ItemBuyCount, Tuple, TimeWindow> { @Override public void apply( Tuple key, //窗口主鍵即itemId TimeWindow window, //窗口 Iterable<Long> aggregationResult, //集合函數的結果,即count的值 Collector<ItemBuyCount> collector //輸出類型collector ) throws Exception { Long itemId = ((Tuple1<Long>) key).f0; Long count =aggregationResult.iterator().next(); collector.collect(ItemBuyCount.of(itemId, window.getEnd(), count)); } } /** * 求某個窗口中前N名的熱門點擊商品,key為窗口時間戳,輸出為Top N 的結果字符串 */ public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemBuyCount, List<ItemBuyCount>> { private final int topSize; public TopNHotItems(int topSize) { this.topSize = topSize; } //用於存儲商品與購買數的狀態,待收齊同一個窗口的數據后,再觸發 Top N 計算 private ListState<ItemBuyCount> itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //狀態注冊 ListStateDescriptor<ItemBuyCount> itemViewStateDesc = new ListStateDescriptor<ItemBuyCount>( "itemState-state", ItemBuyCount.class ); itemState = getRuntimeContext().getListState(itemViewStateDesc); } @Override public void processElement( ItemBuyCount input, Context context, Collector<List<ItemBuyCount>> collector ) throws Exception { //每條數據都保存到狀態 itemState.add(input); //注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收集好了所有 windowEnd的商品數據 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<List<ItemBuyCount>> out) throws Exception { //獲取收集到的所有商品點擊量 List<ItemBuyCount> allItems = new ArrayList<ItemBuyCount>(); for(ItemBuyCount item : itemState.get()) { allItems.add(item); } //提前清除狀態中的數據,釋放空間 itemState.clear(); //按照點擊量從大到小排序 allItems.sort(new Comparator<ItemBuyCount>() { @Override public int compare(ItemBuyCount o1, ItemBuyCount o2) { return (int) (o2.buyCount - o1.buyCount); } }); List<ItemBuyCount> itemBuyCounts = new ArrayList<>(); //將排名信息格式化成String,方便打印 StringBuilder result = new StringBuilder(); result.append("========================================\n"); result.append("時間:").append(new Timestamp(timestamp-1)).append("\n"); for (int i=0;i<topSize;i++) { ItemBuyCount currentItem = allItems.get(i); // No1: 商品ID=12224 購買量=2 result.append("No").append(i).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 購買量=").append(currentItem.buyCount) .append("\n"); itemBuyCounts.add(currentItem); } result.append("====================================\n\n"); out.collect(itemBuyCounts); } } }
以上里邊的步驟:
1. 構建流執行的環境。
2. 設置並行度為1,為了方便下邊打印。但是實際上可以不用設置並行度。
3. 設置流時間的特性,即EventTime。流時間一共有三種時間:
1)ProcessingTime時間,即算子處理的時間,默認flink的時間特性。
2)EventTime,事件處理時間,也就是程序所帶的時間戳。此例子將使用由業務產生的時間戳作為時間戳。
3)IntestionTime,即到達flink的時間。
4. 連接kafka的屬性讀取數據。將kafka的數據轉換成pojo對象。
5. 將亂序的數據使用BoundedOutOfOrdernessTimestampExtractor進行時間戳抽取和設置watermark。
6. 進行數據過濾,將useAction為buy的數據進行過濾出來,其他的直接丟棄。
7. 采用滑動窗口的方式對數據進行匯總並返回指定類型的數據。一小時為一個窗口,每五分鍾往后滑動一下。數據的形式為[9:00 10:00), [9:05 10:05), [9:10 10:10)依次類推。
8. 最后是將數據排序,然后獲取topN的數據。
接下來咱們再針對里邊的細節看一下,如果數據是生序的數據,則采用AscendingTimestampExtractor進行時間戳抽取,無序可以采用上邊5條說的類。有序的只需要進行時間戳抽取,抽取的時間戳就認為是整個的時間,也就是watermark時間,可以理解為全局時間。無序抽取的時候設置一個最晚到達的時間。舉個例子,如果一條數據時間為9:04:55, 到達的時間為9:05:05,這樣這條數據還是屬於窗口[9:00 9:05:01)里邊的,而不是[9:05:01 9:10:01)。
這個聚合方法aggregate(new CountAgg(), new WindowResultFunciton())里邊聲明了兩個類,一個是聚合功能類,一個是結果窗口功能類。也就是使用第一個類將數據進行匯聚,第二個類將數據進行合並匯總並且輸出自己定義的窗口結果集ItemBuyCount。
最后就是topN功能,這個類實現了KeyedProcessFunction,里邊有三個方法 open, processElement, onTimer, 里邊還有一個ListState<ItemBuyCount>,用於收集和記錄狀態信息,保證數據exactly-once語義。
open方法用於進行狀態注冊。
processElement把數據狀態進行添加到list里邊,然后注冊時間時間windowEnd+1,即數據超過了windowEnd的時候就會觸發onTimer時間。
onTimer用於把五分鍾收集好的數據進行處理,從數據狀態記錄中把數據拿出來,然后清理數據狀態,釋放空間。然后將拿出來的數據進行排序,最后整理成sink所需要的結果集。
最后就把數據進行sink,保存到數據庫。
下面看一下數據運行的結果,kafkaWriter打印的日志。
向kafka發送數據:{"behavior":"pv","categoryId":7,"itemId":19,"timestamp":1576325365016,"userId":56}
執行TopNJob,MysqlSink執行的日志打印。
-------------invoke------------ 成功寫入Mysql數量:3 [myflinktopn.TopNJob$ItemBuyCount@611553e4, myflinktopn.TopNJob$ItemBuyCount@19739780, myflinktopn.TopNJob$ItemBuyCount@a76f1f0] -------------invoke------------ 成功寫入Mysql數量:3 [myflinktopn.TopNJob$ItemBuyCount@3db32bd3, myflinktopn.TopNJob$ItemBuyCount@10f855ae, myflinktopn.TopNJob$ItemBuyCount@6a022d0b] -------------invoke------------ 成功寫入Mysql數量:3 [myflinktopn.TopNJob$ItemBuyCount@4ae16ab8, myflinktopn.TopNJob$ItemBuyCount@6a8a29a3, myflinktopn.TopNJob$ItemBuyCount@2326adfb]
寫入到數據庫的記錄如下:
寫入的時間都在同一個時刻,獲取前三條購買最多的,所以三條寫入的時間都一樣的。
還有在運行的時候要注意kafkaWriter要多謝一些,因為要收集5分鍾的數據,所以至少得跑5分鍾。
最后我把代碼放到git上了,可以進行訪問:https://github.com/stonehqs/flink-topn ,有不對的地方,歡迎指正。