來源於 https://blog.csdn.net/qq_31866793/article/details/100690542
在使用aggregate()函數的時候,一直報錯,先貼代碼吧
一直不知道是什么原因,只是說方法不匹配,想想可能是keyBy函數的返回key值類型不對,改成String類型,果然是這個問題。
因為是java寫,所以要注意這個問題:
使用對象:
1 package com.aliyun.market; 2 3 4 import com.alibaba.fastjson.JSON; 5 import org.apache.flink.api.common.functions.AggregateFunction; 6 import org.apache.flink.api.common.functions.FilterFunction; 7 import org.apache.flink.api.common.functions.MapFunction; 8 import org.apache.flink.api.common.serialization.SimpleStringSchema; 9 import org.apache.flink.api.common.state.ListState; 10 import org.apache.flink.api.common.state.ListStateDescriptor; 11 import org.apache.flink.api.java.tuple.Tuple; 12 import org.apache.flink.api.java.tuple.Tuple1; 13 import org.apache.flink.api.java.utils.ParameterTool; 14 import org.apache.flink.configuration.Configuration; 15 import org.apache.flink.streaming.api.TimeCharacteristic; 16 import org.apache.flink.streaming.api.datastream.DataStream; 17 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 18 import org.apache.flink.streaming.api.functions.KeyedProcessFunction; 19 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; 20 import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 21 import org.apache.flink.streaming.api.windowing.time.Time; 22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 23 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 24 import org.apache.flink.util.Collector; 25 26 import java.sql.Timestamp; 27 import java.util.ArrayList; 28 import java.util.Comparator; 29 import java.util.List; 30 import java.util.Properties; 31 32 33 /** 34 * 熱門商品topN統計 35 * 數據下載 curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv 36 * 數據在resource 下 UserBehavior.csv 37 * 參考: http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/ 38 * <p> 39 * 知識點: aggregate函數 是專門來做統計的,一般跟着keyBy()后面 40 * <p> 41 * 官網使用 : aggregate(SUM, 0).and(MIN, 2) 42 */ 43 public class HotTopDemo { 44 public static void main(String[] args) throws Exception { 45 46 // todo 1,讀取kafka數據 47 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 48 49 //todo 獲取kafka的配置屬性 50 args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091", 51 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; 52 53 ParameterTool parameterTool = ParameterTool.fromArgs(args); 54 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 55 56 57 // 58 Properties pros = parameterTool.getProperties(); 59 // //todo 指定輸入數據為kafka topic 60 DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( 61 pros.getProperty("input-topic"), 62 new SimpleStringSchema(), 63 pros).setStartFromLatest() 64 65 ).setParallelism(1); 66 67 68 69 //todo 我們用 PojoCsvInputFormat 創建輸入源。 70 DataStream<UserBehavior> dataDstream =kafkaDstream.map(new MapFunction<String, UserBehavior>() { 71 @Override 72 public UserBehavior map(String input) throws Exception { 73 return JSON.parseObject(input, UserBehavior.class); 74 } 75 }); 76 77 //給數據加上了一個水印 78 DataStream<UserBehavior> timedData = dataDstream 79 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { 80 @Override 81 public long extractAscendingTimestamp(UserBehavior userBehavior) { 82 // 原始數據單位秒 ,將其轉成毫秒 83 return userBehavior.category_id * 1000; 84 } 85 }); 86 87 //過濾出點擊事件 88 89 DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() { 90 @Override 91 public boolean filter(UserBehavior userBehavior) throws Exception { 92 return userBehavior.behavior.equals("pv"); 93 } 94 }); 95 96 //todo 窗口統計點擊量,設置窗口大小為1個小時,5分鍾滑動一次 97 //由於要每隔5分鍾統計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鍾滑動一次。即分別要統計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。 98 99 DataStream<ItemViewCount> windowedData = pvData.keyBy("item_id") //按字段分組 100 .timeWindow(Time.seconds(20), Time.seconds(10)) 101 .aggregate(new CountAgg(), new WindowResultFunction()); //使用aggregate做增量聚合統計 102 103 // TopN 計算最熱門商品 104 DataStream<String> topItems = windowedData 105 .keyBy("windowEnd") 106 .process(new TopNHotItems(2)); // 求點擊量前3名的商品 107 topItems.print(); 108 env.execute("Hot Items Job"); 109 110 } 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 //todo 功能是統計窗口中的條數,即遇到一條數據就加一 127 128 public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> { 129 130 @Override 131 public Long createAccumulator() { //創建一個數據統計的容器,提供給后續操作使用。 132 return 0L; 133 } 134 135 @Override 136 public Long add(UserBehavior userBehavior, Long acc) { //每個元素被添加進窗口的時候調用。 137 return acc + 1; 138 } 139 140 @Override 141 public Long getResult(Long acc) { 142 ;//窗口統計事件觸發時調用來返回出統計的結果。 143 return acc; 144 } 145 146 @Override 147 public Long merge(Long acc1, Long acc2) { //只有在當窗口合並的時候調用,合並2個容器 148 return acc1 + acc2; 149 } 150 } 151 152 // todo 指定格式輸出 :將每個 key每個窗口聚合后的結果帶上其他信息進行輸出 進入的數據為Long 返回 ItemViewCount對象 153 public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { 154 155 @Override 156 public void apply( 157 Tuple key, // 窗口的主鍵,即 itemId 158 TimeWindow window, // 窗口 159 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值 160 Collector<ItemViewCount> collector // 輸出類型為 ItemViewCount 161 ) throws Exception { 162 Long itemId = ((Tuple1<Long>) key).f0; 163 Long count = aggregateResult.iterator().next(); 164 collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); 165 } 166 } 167 168 /** 169 * 商品點擊量(窗口操作的輸出類型) 170 */ 171 public static class ItemViewCount { 172 public long itemId; // 商品ID 173 public long windowEnd; // 窗口結束時間戳 174 public long viewCount; // 商品的點擊量 175 176 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { 177 ItemViewCount result = new ItemViewCount(); 178 result.itemId = itemId; 179 result.windowEnd = windowEnd; 180 result.viewCount = viewCount; 181 return result; 182 } 183 } 184 185 /** 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串 */ 186 public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { 187 188 private final int topSize; 189 190 public TopNHotItems(int topSize) { 191 this.topSize = topSize; 192 } 193 194 // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據后,再觸發 TopN 計算 195 private ListState<ItemViewCount> itemState; 196 197 @Override 198 public void open(Configuration parameters) throws Exception { 199 super.open(parameters); 200 // 狀態的注冊 201 ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( 202 "itemState-state", 203 ItemViewCount.class); 204 itemState = getRuntimeContext().getListState(itemsStateDesc); 205 } 206 207 @Override 208 public void processElement( 209 ItemViewCount input, 210 Context context, 211 Collector<String> collector) throws Exception { 212 213 // 每條數據都保存到狀態中 214 itemState.add(input); 215 // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據 216 context.timerService().registerEventTimeTimer(input.windowEnd + 1); 217 } 218 219 @Override 220 public void onTimer( 221 long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { 222 // 獲取收到的所有商品點擊量 223 List<ItemViewCount> allItems = new ArrayList<>(); 224 for (ItemViewCount item : itemState.get()) { 225 allItems.add(item); 226 } 227 // 提前清除狀態中的數據,釋放空間 228 itemState.clear(); 229 // 按照點擊量從大到小排序 230 allItems.sort(new Comparator<ItemViewCount>() { 231 @Override 232 public int compare(ItemViewCount o1, ItemViewCount o2) { 233 return (int) (o2.viewCount - o1.viewCount); 234 } 235 }); 236 // 將排名信息格式化成 String, 便於打印 237 StringBuilder result = new StringBuilder(); 238 result.append("====================================\n"); 239 result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n"); 240 for (int i=0;i<topSize;i++) { 241 ItemViewCount currentItem = allItems.get(i); 242 // No1: 商品ID=12224 瀏覽量=2413 243 result.append("No").append(i).append(":") 244 .append(" 商品ID=").append(currentItem.itemId) 245 .append(" 瀏覽量=").append(currentItem.viewCount) 246 .append("\n"); 247 } 248 result.append("====================================\n\n"); 249 out.collect(result.toString()); 250 } 251 } 252 }
使用fastJson
1 package com.aliyun.market; 2 3 import com.alibaba.fastjson.JSONObject; 4 import org.apache.flink.api.common.functions.AggregateFunction; 5 import org.apache.flink.api.common.functions.FilterFunction; 6 import org.apache.flink.api.common.functions.MapFunction; 7 import org.apache.flink.api.common.serialization.SimpleStringSchema; 8 import org.apache.flink.api.java.functions.KeySelector; 9 import org.apache.flink.api.java.utils.ParameterTool; 10 import org.apache.flink.streaming.api.TimeCharacteristic; 11 import org.apache.flink.streaming.api.datastream.DataStream; 12 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 13 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; 14 import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 15 import org.apache.flink.streaming.api.windowing.time.Time; 16 import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 17 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 18 import org.apache.flink.util.Collector; 19 20 import java.util.Properties; 21 22 /** 23 * 商場實時數據統計,基於Flink 1.9版本 24 */ 25 public class MarketStreamCount { 26 public static void main(String[] args) { 27 28 29 // todo 1,讀取kafka數據 30 final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8); 31 //todo 獲取kafka的配置屬性 32 args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091", 33 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; 34 35 ParameterTool parameterTool = ParameterTool.fromArgs(args); 36 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 37 38 Properties pros = parameterTool.getProperties(); 39 // //todo 指定輸入數據為kafka topic 40 DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( 41 pros.getProperty("input-topic"), 42 new SimpleStringSchema(), 43 pros).setStartFromEarliest() 44 45 ).setParallelism(1); 46 47 //todo 2,每層實時顧客人數,實時顧客總數,一天的實時顧客總數 48 49 // 轉成json 50 DataStream<JSONObject> kafkaDstream2 = kafkaDstream.map(new MapFunction<String, JSONObject>() { 51 @Override 52 public JSONObject map(String input) throws Exception { 53 JSONObject inputJson = null; 54 try { 55 inputJson = JSONObject.parseObject(input); 56 return inputJson; 57 } catch (Exception e) { 58 e.printStackTrace(); 59 // return null; 60 } 61 return inputJson; 62 } 63 }).filter(new FilterFunction<JSONObject>() { 64 @Override 65 public boolean filter(JSONObject jsonObject) throws Exception { 66 if (jsonObject!=null){ 67 return true; 68 } 69 return false; 70 } 71 }); 72 //給數據加上了一個水印 73 DataStream<JSONObject> timedData = kafkaDstream2 74 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() { 75 @Override 76 public long extractAscendingTimestamp(JSONObject json) { 77 // 原始數據單位秒 ,將其轉成毫秒 78 return json.getLong("category_id") * 1000; 79 } 80 }); 81 82 //過濾出點擊事件 83 // 實時客人數,各個層級 84 DataStream<JSONObject> windowedData = timedData.keyBy(new KeySelector<JSONObject, String>() { 85 @Override 86 public String getKey(JSONObject jsonObject) throws Exception { 87 return jsonObject.getString("user_id"); 88 } 89 }).timeWindow(Time.seconds(5L),Time.seconds(1L)) 90 //使用aggregate做增量聚合統計 91 .aggregate(new CountAgg(),new WindowResultFunction()); 92 93 94 95 windowedData.print(); 96 try { 97 env.execute(); 98 } catch (Exception e) { 99 e.printStackTrace(); 100 } 101 } 102 103 104 //todo 功能是統計窗口中的條數,即遇到一條數據就加一 105 106 public static class CountAgg implements AggregateFunction<JSONObject, Long, Long> { 107 108 @Override 109 public Long createAccumulator() { //創建一個數據統計的容器,提供給后續操作使用。 110 return 0L; 111 } 112 113 @Override 114 public Long add(JSONObject json, Long acc) { //每個元素被添加進窗口的時候調用。 115 return acc + 1; 116 } 117 118 @Override 119 public Long getResult(Long acc) { 120 //窗口統計事件觸發時調用來返回出統計的結果。 121 return acc; 122 } 123 124 @Override 125 public Long merge(Long acc1, Long acc2) { //只有在當窗口合並的時候調用,合並2個容器 126 return acc1 + acc2; 127 } 128 } 129 130 // todo 指定格式輸出 131 public static class WindowResultFunction implements WindowFunction<Long, JSONObject, String, TimeWindow> { 132 133 @Override 134 public void apply( 135 String key, // 窗口的主鍵,即 itemId 136 TimeWindow window, // 窗口 137 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值 138 Collector<JSONObject> collector // 輸出類型為 ItemViewCount 139 ) throws Exception { 140 141 Long count = aggregateResult.iterator().next(); 142 //窗口結束時間 143 long end = window.getEnd(); 144 JSONObject json = new JSONObject(); 145 json.put("key",key); 146 json.put("count",count); 147 json.put("end",end); 148 collector.collect(json); 149 } 150 } 151 152 }