来源于 https://blog.csdn.net/qq_31866793/article/details/100690542
在使用aggregate()函数的时候,一直报错,先贴代码吧
一直不知道是什么原因,只是说方法不匹配,想想可能是keyBy函数的返回key值类型不对,改成String类型,果然是这个问题。
因为是java写,所以要注意这个问题:
使用对象:
1 package com.aliyun.market; 2 3 4 import com.alibaba.fastjson.JSON; 5 import org.apache.flink.api.common.functions.AggregateFunction; 6 import org.apache.flink.api.common.functions.FilterFunction; 7 import org.apache.flink.api.common.functions.MapFunction; 8 import org.apache.flink.api.common.serialization.SimpleStringSchema; 9 import org.apache.flink.api.common.state.ListState; 10 import org.apache.flink.api.common.state.ListStateDescriptor; 11 import org.apache.flink.api.java.tuple.Tuple; 12 import org.apache.flink.api.java.tuple.Tuple1; 13 import org.apache.flink.api.java.utils.ParameterTool; 14 import org.apache.flink.configuration.Configuration; 15 import org.apache.flink.streaming.api.TimeCharacteristic; 16 import org.apache.flink.streaming.api.datastream.DataStream; 17 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 18 import org.apache.flink.streaming.api.functions.KeyedProcessFunction; 19 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; 20 import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 21 import org.apache.flink.streaming.api.windowing.time.Time; 22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 23 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 24 import org.apache.flink.util.Collector; 25 26 import java.sql.Timestamp; 27 import java.util.ArrayList; 28 import java.util.Comparator; 29 import java.util.List; 30 import java.util.Properties; 31 32 33 /** 34 * 热门商品topN统计 35 * 数据下载 curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv 36 * 数据在resource 下 UserBehavior.csv 37 * 参考: http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/ 38 * <p> 39 * 知识点: aggregate函数 是专门来做统计的,一般跟着keyBy()后面 40 * <p> 41 * 官网使用 : aggregate(SUM, 0).and(MIN, 2) 42 */ 43 public class HotTopDemo { 44 public static void main(String[] args) throws Exception { 45 46 // todo 1,读取kafka数据 47 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 48 49 //todo 获取kafka的配置属性 50 args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091", 51 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; 52 53 ParameterTool parameterTool = ParameterTool.fromArgs(args); 54 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 55 56 57 // 58 Properties pros = parameterTool.getProperties(); 59 // //todo 指定输入数据为kafka topic 60 DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( 61 pros.getProperty("input-topic"), 62 new SimpleStringSchema(), 63 pros).setStartFromLatest() 64 65 ).setParallelism(1); 66 67 68 69 //todo 我们用 PojoCsvInputFormat 创建输入源。 70 DataStream<UserBehavior> dataDstream =kafkaDstream.map(new MapFunction<String, UserBehavior>() { 71 @Override 72 public UserBehavior map(String input) throws Exception { 73 return JSON.parseObject(input, UserBehavior.class); 74 } 75 }); 76 77 //给数据加上了一个水印 78 DataStream<UserBehavior> timedData = dataDstream 79 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { 80 @Override 81 public long extractAscendingTimestamp(UserBehavior userBehavior) { 82 // 原始数据单位秒 ,将其转成毫秒 83 return userBehavior.category_id * 1000; 84 } 85 }); 86 87 //过滤出点击事件 88 89 DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() { 90 @Override 91 public boolean filter(UserBehavior userBehavior) throws Exception { 92 return userBehavior.behavior.equals("pv"); 93 } 94 }); 95 96 //todo 窗口统计点击量,设置窗口大小为1个小时,5分钟滑动一次 97 //由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。 98 99 DataStream<ItemViewCount> windowedData = pvData.keyBy("item_id") //按字段分组 100 .timeWindow(Time.seconds(20), Time.seconds(10)) 101 .aggregate(new CountAgg(), new WindowResultFunction()); //使用aggregate做增量聚合统计 102 103 // TopN 计算最热门商品 104 DataStream<String> topItems = windowedData 105 .keyBy("windowEnd") 106 .process(new TopNHotItems(2)); // 求点击量前3名的商品 107 topItems.print(); 108 env.execute("Hot Items Job"); 109 110 } 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 //todo 功能是统计窗口中的条数,即遇到一条数据就加一 127 128 public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> { 129 130 @Override 131 public Long createAccumulator() { //创建一个数据统计的容器,提供给后续操作使用。 132 return 0L; 133 } 134 135 @Override 136 public Long add(UserBehavior userBehavior, Long acc) { //每个元素被添加进窗口的时候调用。 137 return acc + 1; 138 } 139 140 @Override 141 public Long getResult(Long acc) { 142 ;//窗口统计事件触发时调用来返回出统计的结果。 143 return acc; 144 } 145 146 @Override 147 public Long merge(Long acc1, Long acc2) { //只有在当窗口合并的时候调用,合并2个容器 148 return acc1 + acc2; 149 } 150 } 151 152 // todo 指定格式输出 :将每个 key每个窗口聚合后的结果带上其他信息进行输出 进入的数据为Long 返回 ItemViewCount对象 153 public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { 154 155 @Override 156 public void apply( 157 Tuple key, // 窗口的主键,即 itemId 158 TimeWindow window, // 窗口 159 Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值 160 Collector<ItemViewCount> collector // 输出类型为 ItemViewCount 161 ) throws Exception { 162 Long itemId = ((Tuple1<Long>) key).f0; 163 Long count = aggregateResult.iterator().next(); 164 collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); 165 } 166 } 167 168 /** 169 * 商品点击量(窗口操作的输出类型) 170 */ 171 public static class ItemViewCount { 172 public long itemId; // 商品ID 173 public long windowEnd; // 窗口结束时间戳 174 public long viewCount; // 商品的点击量 175 176 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { 177 ItemViewCount result = new ItemViewCount(); 178 result.itemId = itemId; 179 result.windowEnd = windowEnd; 180 result.viewCount = viewCount; 181 return result; 182 } 183 } 184 185 /** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */ 186 public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { 187 188 private final int topSize; 189 190 public TopNHotItems(int topSize) { 191 this.topSize = topSize; 192 } 193 194 // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算 195 private ListState<ItemViewCount> itemState; 196 197 @Override 198 public void open(Configuration parameters) throws Exception { 199 super.open(parameters); 200 // 状态的注册 201 ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( 202 "itemState-state", 203 ItemViewCount.class); 204 itemState = getRuntimeContext().getListState(itemsStateDesc); 205 } 206 207 @Override 208 public void processElement( 209 ItemViewCount input, 210 Context context, 211 Collector<String> collector) throws Exception { 212 213 // 每条数据都保存到状态中 214 itemState.add(input); 215 // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据 216 context.timerService().registerEventTimeTimer(input.windowEnd + 1); 217 } 218 219 @Override 220 public void onTimer( 221 long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { 222 // 获取收到的所有商品点击量 223 List<ItemViewCount> allItems = new ArrayList<>(); 224 for (ItemViewCount item : itemState.get()) { 225 allItems.add(item); 226 } 227 // 提前清除状态中的数据,释放空间 228 itemState.clear(); 229 // 按照点击量从大到小排序 230 allItems.sort(new Comparator<ItemViewCount>() { 231 @Override 232 public int compare(ItemViewCount o1, ItemViewCount o2) { 233 return (int) (o2.viewCount - o1.viewCount); 234 } 235 }); 236 // 将排名信息格式化成 String, 便于打印 237 StringBuilder result = new StringBuilder(); 238 result.append("====================================\n"); 239 result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n"); 240 for (int i=0;i<topSize;i++) { 241 ItemViewCount currentItem = allItems.get(i); 242 // No1: 商品ID=12224 浏览量=2413 243 result.append("No").append(i).append(":") 244 .append(" 商品ID=").append(currentItem.itemId) 245 .append(" 浏览量=").append(currentItem.viewCount) 246 .append("\n"); 247 } 248 result.append("====================================\n\n"); 249 out.collect(result.toString()); 250 } 251 } 252 }
使用fastJson
1 package com.aliyun.market; 2 3 import com.alibaba.fastjson.JSONObject; 4 import org.apache.flink.api.common.functions.AggregateFunction; 5 import org.apache.flink.api.common.functions.FilterFunction; 6 import org.apache.flink.api.common.functions.MapFunction; 7 import org.apache.flink.api.common.serialization.SimpleStringSchema; 8 import org.apache.flink.api.java.functions.KeySelector; 9 import org.apache.flink.api.java.utils.ParameterTool; 10 import org.apache.flink.streaming.api.TimeCharacteristic; 11 import org.apache.flink.streaming.api.datastream.DataStream; 12 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 13 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; 14 import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 15 import org.apache.flink.streaming.api.windowing.time.Time; 16 import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 17 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 18 import org.apache.flink.util.Collector; 19 20 import java.util.Properties; 21 22 /** 23 * 商场实时数据统计,基于Flink 1.9版本 24 */ 25 public class MarketStreamCount { 26 public static void main(String[] args) { 27 28 29 // todo 1,读取kafka数据 30 final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8); 31 //todo 获取kafka的配置属性 32 args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091", 33 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; 34 35 ParameterTool parameterTool = ParameterTool.fromArgs(args); 36 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 37 38 Properties pros = parameterTool.getProperties(); 39 // //todo 指定输入数据为kafka topic 40 DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( 41 pros.getProperty("input-topic"), 42 new SimpleStringSchema(), 43 pros).setStartFromEarliest() 44 45 ).setParallelism(1); 46 47 //todo 2,每层实时顾客人数,实时顾客总数,一天的实时顾客总数 48 49 // 转成json 50 DataStream<JSONObject> kafkaDstream2 = kafkaDstream.map(new MapFunction<String, JSONObject>() { 51 @Override 52 public JSONObject map(String input) throws Exception { 53 JSONObject inputJson = null; 54 try { 55 inputJson = JSONObject.parseObject(input); 56 return inputJson; 57 } catch (Exception e) { 58 e.printStackTrace(); 59 // return null; 60 } 61 return inputJson; 62 } 63 }).filter(new FilterFunction<JSONObject>() { 64 @Override 65 public boolean filter(JSONObject jsonObject) throws Exception { 66 if (jsonObject!=null){ 67 return true; 68 } 69 return false; 70 } 71 }); 72 //给数据加上了一个水印 73 DataStream<JSONObject> timedData = kafkaDstream2 74 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() { 75 @Override 76 public long extractAscendingTimestamp(JSONObject json) { 77 // 原始数据单位秒 ,将其转成毫秒 78 return json.getLong("category_id") * 1000; 79 } 80 }); 81 82 //过滤出点击事件 83 // 实时客人数,各个层级 84 DataStream<JSONObject> windowedData = timedData.keyBy(new KeySelector<JSONObject, String>() { 85 @Override 86 public String getKey(JSONObject jsonObject) throws Exception { 87 return jsonObject.getString("user_id"); 88 } 89 }).timeWindow(Time.seconds(5L),Time.seconds(1L)) 90 //使用aggregate做增量聚合统计 91 .aggregate(new CountAgg(),new WindowResultFunction()); 92 93 94 95 windowedData.print(); 96 try { 97 env.execute(); 98 } catch (Exception e) { 99 e.printStackTrace(); 100 } 101 } 102 103 104 //todo 功能是统计窗口中的条数,即遇到一条数据就加一 105 106 public static class CountAgg implements AggregateFunction<JSONObject, Long, Long> { 107 108 @Override 109 public Long createAccumulator() { //创建一个数据统计的容器,提供给后续操作使用。 110 return 0L; 111 } 112 113 @Override 114 public Long add(JSONObject json, Long acc) { //每个元素被添加进窗口的时候调用。 115 return acc + 1; 116 } 117 118 @Override 119 public Long getResult(Long acc) { 120 //窗口统计事件触发时调用来返回出统计的结果。 121 return acc; 122 } 123 124 @Override 125 public Long merge(Long acc1, Long acc2) { //只有在当窗口合并的时候调用,合并2个容器 126 return acc1 + acc2; 127 } 128 } 129 130 // todo 指定格式输出 131 public static class WindowResultFunction implements WindowFunction<Long, JSONObject, String, TimeWindow> { 132 133 @Override 134 public void apply( 135 String key, // 窗口的主键,即 itemId 136 TimeWindow window, // 窗口 137 Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值 138 Collector<JSONObject> collector // 输出类型为 ItemViewCount 139 ) throws Exception { 140 141 Long count = aggregateResult.iterator().next(); 142 //窗口结束时间 143 long end = window.getEnd(); 144 JSONObject json = new JSONObject(); 145 json.put("key",key); 146 json.put("count",count); 147 json.put("end",end); 148 collector.collect(json); 149 } 150 } 151 152 }