Flink aggregate()函数使用遇到的一点小问题


来源于  https://blog.csdn.net/qq_31866793/article/details/100690542

 

在使用aggregate()函数的时候,一直报错,先贴代码吧

 

一直不知道是什么原因,只是说方法不匹配,想想可能是keyBy函数的返回key值类型不对,改成String类型,果然是这个问题。

因为是java写,所以要注意这个问题:

 

使用对象:

  1 package com.aliyun.market;
  2  
  3  
  4 import com.alibaba.fastjson.JSON;
  5 import org.apache.flink.api.common.functions.AggregateFunction;
  6 import org.apache.flink.api.common.functions.FilterFunction;
  7 import org.apache.flink.api.common.functions.MapFunction;
  8 import org.apache.flink.api.common.serialization.SimpleStringSchema;
  9 import org.apache.flink.api.common.state.ListState;
 10 import org.apache.flink.api.common.state.ListStateDescriptor;
 11 import org.apache.flink.api.java.tuple.Tuple;
 12 import org.apache.flink.api.java.tuple.Tuple1;
 13 import org.apache.flink.api.java.utils.ParameterTool;
 14 import org.apache.flink.configuration.Configuration;
 15 import org.apache.flink.streaming.api.TimeCharacteristic;
 16 import org.apache.flink.streaming.api.datastream.DataStream;
 17 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 18 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 19 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 20 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 21 import org.apache.flink.streaming.api.windowing.time.Time;
 22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 23 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 24 import org.apache.flink.util.Collector;
 25  
 26 import java.sql.Timestamp;
 27 import java.util.ArrayList;
 28 import java.util.Comparator;
 29 import java.util.List;
 30 import java.util.Properties;
 31  
 32  
 33 /**
 34  * 热门商品topN统计
 35  * 数据下载 curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
 36  * 数据在resource 下 UserBehavior.csv
 37  * 参考: http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
 38  * <p>
 39  * 知识点: aggregate函数 是专门来做统计的,一般跟着keyBy()后面
 40  * <p>
 41  * 官网使用 : aggregate(SUM, 0).and(MIN, 2)
 42  */
 43 public class HotTopDemo {
 44     public static void main(String[] args) throws Exception {
 45  
 46         // todo 1,读取kafka数据
 47         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 48  
 49         //todo 获取kafka的配置属性
 50         args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091",
 51                 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};
 52  
 53         ParameterTool parameterTool = ParameterTool.fromArgs(args);
 54         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 55  
 56  
 57 //
 58         Properties pros = parameterTool.getProperties();
 59 //        //todo 指定输入数据为kafka topic
 60         DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
 61                 pros.getProperty("input-topic"),
 62                 new SimpleStringSchema(),
 63                 pros).setStartFromLatest()
 64  
 65         ).setParallelism(1);
 66  
 67  
 68  
 69         //todo 我们用 PojoCsvInputFormat 创建输入源。
 70         DataStream<UserBehavior> dataDstream =kafkaDstream.map(new MapFunction<String, UserBehavior>() {
 71             @Override
 72             public UserBehavior map(String input) throws Exception {
 73                 return  JSON.parseObject(input, UserBehavior.class);
 74             }
 75         });
 76  
 77         //给数据加上了一个水印
 78         DataStream<UserBehavior> timedData = dataDstream
 79                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
 80                     @Override
 81                     public long extractAscendingTimestamp(UserBehavior userBehavior) {
 82                         // 原始数据单位秒 ,将其转成毫秒
 83                         return userBehavior.category_id * 1000;
 84                     }
 85                 });
 86  
 87         //过滤出点击事件
 88  
 89         DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() {
 90             @Override
 91             public boolean filter(UserBehavior userBehavior) throws Exception {
 92                 return userBehavior.behavior.equals("pv");
 93             }
 94         });
 95  
 96         //todo 窗口统计点击量,设置窗口大小为1个小时,5分钟滑动一次
 97         //由于要每隔5分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。
 98  
 99         DataStream<ItemViewCount> windowedData = pvData.keyBy("item_id") //按字段分组
100                 .timeWindow(Time.seconds(20), Time.seconds(10))
101                 .aggregate(new CountAgg(), new WindowResultFunction()); //使用aggregate做增量聚合统计
102  
103 //        TopN 计算最热门商品
104         DataStream<String> topItems = windowedData
105                 .keyBy("windowEnd")
106                 .process(new TopNHotItems(2));  // 求点击量前3名的商品
107         topItems.print();
108         env.execute("Hot Items Job");
109  
110     }
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126     //todo 功能是统计窗口中的条数,即遇到一条数据就加一
127  
128     public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
129  
130         @Override
131         public Long createAccumulator() { //创建一个数据统计的容器,提供给后续操作使用。
132             return 0L;
133         }
134  
135         @Override
136         public Long add(UserBehavior userBehavior, Long acc) { //每个元素被添加进窗口的时候调用。
137             return acc + 1;
138         }
139  
140         @Override
141         public Long getResult(Long acc) {
142             ;//窗口统计事件触发时调用来返回出统计的结果。
143             return acc;
144         }
145  
146         @Override
147         public Long merge(Long acc1, Long acc2) { //只有在当窗口合并的时候调用,合并2个容器
148             return acc1 + acc2;
149         }
150     }
151  
152     // todo 指定格式输出 :将每个 key每个窗口聚合后的结果带上其他信息进行输出  进入的数据为Long 返回 ItemViewCount对象
153     public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
154  
155         @Override
156         public void apply(
157                 Tuple key,  // 窗口的主键,即 itemId
158                 TimeWindow window,  // 窗口
159                 Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
160                 Collector<ItemViewCount> collector  // 输出类型为 ItemViewCount
161         ) throws Exception {
162             Long itemId = ((Tuple1<Long>) key).f0;
163             Long count = aggregateResult.iterator().next();
164             collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
165         }
166     }
167  
168     /**
169      * 商品点击量(窗口操作的输出类型)
170      */
171     public static class ItemViewCount {
172         public long itemId;     // 商品ID
173         public long windowEnd;  // 窗口结束时间戳
174         public long viewCount;  // 商品的点击量
175  
176         public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
177             ItemViewCount result = new ItemViewCount();
178             result.itemId = itemId;
179             result.windowEnd = windowEnd;
180             result.viewCount = viewCount;
181             return result;
182         }
183     }
184  
185     /** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
186     public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
187  
188         private final int topSize;
189  
190         public TopNHotItems(int topSize) {
191             this.topSize = topSize;
192         }
193  
194         // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
195         private ListState<ItemViewCount> itemState;
196  
197         @Override
198         public void open(Configuration parameters) throws Exception {
199             super.open(parameters);
200             // 状态的注册
201             ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
202                     "itemState-state",
203                     ItemViewCount.class);
204             itemState = getRuntimeContext().getListState(itemsStateDesc);
205         }
206  
207         @Override
208         public void processElement(
209                 ItemViewCount input,
210                 Context context,
211                 Collector<String> collector) throws Exception {
212  
213             // 每条数据都保存到状态中
214             itemState.add(input);
215             // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
216             context.timerService().registerEventTimeTimer(input.windowEnd + 1);
217         }
218  
219         @Override
220         public void onTimer(
221                 long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
222             // 获取收到的所有商品点击量
223             List<ItemViewCount> allItems = new ArrayList<>();
224             for (ItemViewCount item : itemState.get()) {
225                 allItems.add(item);
226             }
227             // 提前清除状态中的数据,释放空间
228             itemState.clear();
229             // 按照点击量从大到小排序
230             allItems.sort(new Comparator<ItemViewCount>() {
231                 @Override
232                 public int compare(ItemViewCount o1, ItemViewCount o2) {
233                     return (int) (o2.viewCount - o1.viewCount);
234                 }
235             });
236             // 将排名信息格式化成 String, 便于打印
237             StringBuilder result = new StringBuilder();
238             result.append("====================================\n");
239             result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
240             for (int i=0;i<topSize;i++) {
241                 ItemViewCount currentItem = allItems.get(i);
242                 // No1:  商品ID=12224  浏览量=2413
243                 result.append("No").append(i).append(":")
244                         .append("  商品ID=").append(currentItem.itemId)
245                         .append("  浏览量=").append(currentItem.viewCount)
246                         .append("\n");
247             }
248             result.append("====================================\n\n");
249             out.collect(result.toString());
250         }
251     }
252 }

 

使用fastJson 

  1 package com.aliyun.market;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import org.apache.flink.api.common.functions.AggregateFunction;
  5 import org.apache.flink.api.common.functions.FilterFunction;
  6 import org.apache.flink.api.common.functions.MapFunction;
  7 import org.apache.flink.api.common.serialization.SimpleStringSchema;
  8 import org.apache.flink.api.java.functions.KeySelector;
  9 import org.apache.flink.api.java.utils.ParameterTool;
 10 import org.apache.flink.streaming.api.TimeCharacteristic;
 11 import org.apache.flink.streaming.api.datastream.DataStream;
 12 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 13 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 14 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 15 import org.apache.flink.streaming.api.windowing.time.Time;
 16 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 17 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 18 import org.apache.flink.util.Collector;
 19 
 20 import java.util.Properties;
 21 
 22 /**
 23  * 商场实时数据统计,基于Flink 1.9版本
 24  */
 25 public class MarketStreamCount {
 26     public static void main(String[] args) {
 27 
 28 
 29         // todo 1,读取kafka数据
 30         final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
 31         //todo 获取kafka的配置属性
 32         args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091",
 33                 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};
 34 
 35         ParameterTool parameterTool = ParameterTool.fromArgs(args);
 36         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 37 
 38          Properties pros = parameterTool.getProperties();
 39 //        //todo 指定输入数据为kafka topic
 40         DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
 41                 pros.getProperty("input-topic"),
 42                 new SimpleStringSchema(),
 43                 pros).setStartFromEarliest()
 44 
 45         ).setParallelism(1);
 46 
 47         //todo 2,每层实时顾客人数,实时顾客总数,一天的实时顾客总数
 48 
 49         // 转成json
 50         DataStream<JSONObject> kafkaDstream2 = kafkaDstream.map(new MapFunction<String, JSONObject>() {
 51             @Override
 52             public JSONObject map(String input) throws Exception {
 53                 JSONObject inputJson = null;
 54                 try {
 55                     inputJson = JSONObject.parseObject(input);
 56                     return inputJson;
 57                 } catch (Exception e) {
 58                     e.printStackTrace();
 59 //                    return null;
 60                 }
 61                 return inputJson;
 62             }
 63         }).filter(new FilterFunction<JSONObject>() {
 64             @Override
 65             public boolean filter(JSONObject jsonObject) throws Exception {
 66                 if (jsonObject!=null){
 67                     return true;
 68                 }
 69                 return false;
 70             }
 71         });
 72         //给数据加上了一个水印
 73         DataStream<JSONObject> timedData = kafkaDstream2
 74                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
 75                     @Override
 76                     public long extractAscendingTimestamp(JSONObject json) {
 77                         // 原始数据单位秒 ,将其转成毫秒
 78                         return json.getLong("category_id") * 1000;
 79                     }
 80                 });
 81 
 82         //过滤出点击事件
 83         // 实时客人数,各个层级
 84         DataStream<JSONObject> windowedData = timedData.keyBy(new KeySelector<JSONObject, String>() {
 85             @Override
 86             public String getKey(JSONObject jsonObject) throws Exception {
 87                 return  jsonObject.getString("user_id");
 88             }
 89         }).timeWindow(Time.seconds(5L),Time.seconds(1L))
 90                 //使用aggregate做增量聚合统计
 91                 .aggregate(new CountAgg(),new WindowResultFunction());
 92 
 93 
 94 
 95         windowedData.print();
 96         try {
 97             env.execute();
 98         } catch (Exception e) {
 99             e.printStackTrace();
100         }
101     }
102 
103 
104     //todo 功能是统计窗口中的条数,即遇到一条数据就加一
105 
106     public static class CountAgg implements AggregateFunction<JSONObject, Long, Long> {
107 
108         @Override
109         public Long createAccumulator() { //创建一个数据统计的容器,提供给后续操作使用。
110             return 0L;
111         }
112 
113         @Override
114         public Long add(JSONObject json, Long acc) { //每个元素被添加进窗口的时候调用。
115             return acc + 1;
116         }
117 
118         @Override
119         public Long getResult(Long acc) {
120             //窗口统计事件触发时调用来返回出统计的结果。
121             return acc;
122         }
123 
124         @Override
125         public Long merge(Long acc1, Long acc2) { //只有在当窗口合并的时候调用,合并2个容器
126             return acc1 + acc2;
127         }
128     }
129 
130     // todo 指定格式输出
131     public static class WindowResultFunction implements WindowFunction<Long, JSONObject, String, TimeWindow> {
132 
133         @Override
134         public void apply(
135                 String key,  // 窗口的主键,即 itemId
136                 TimeWindow window,  // 窗口
137                 Iterable<Long> aggregateResult, // 聚合函数的结果,即 count 值
138                 Collector<JSONObject> collector  // 输出类型为 ItemViewCount
139         ) throws Exception {
140 
141             Long count = aggregateResult.iterator().next();
142             //窗口结束时间
143             long end = window.getEnd();
144             JSONObject json = new JSONObject();
145             json.put("key",key);
146             json.put("count",count);
147             json.put("end",end);
148             collector.collect(json);
149         }
150     }
151 
152 }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM