Flink aggregate()函數使用遇到的一點小問題


來源於  https://blog.csdn.net/qq_31866793/article/details/100690542

 

在使用aggregate()函數的時候,一直報錯,先貼代碼吧

 

一直不知道是什么原因,只是說方法不匹配,想想可能是keyBy函數的返回key值類型不對,改成String類型,果然是這個問題。

因為是java寫,所以要注意這個問題:

 

使用對象:

  1 package com.aliyun.market;
  2  
  3  
  4 import com.alibaba.fastjson.JSON;
  5 import org.apache.flink.api.common.functions.AggregateFunction;
  6 import org.apache.flink.api.common.functions.FilterFunction;
  7 import org.apache.flink.api.common.functions.MapFunction;
  8 import org.apache.flink.api.common.serialization.SimpleStringSchema;
  9 import org.apache.flink.api.common.state.ListState;
 10 import org.apache.flink.api.common.state.ListStateDescriptor;
 11 import org.apache.flink.api.java.tuple.Tuple;
 12 import org.apache.flink.api.java.tuple.Tuple1;
 13 import org.apache.flink.api.java.utils.ParameterTool;
 14 import org.apache.flink.configuration.Configuration;
 15 import org.apache.flink.streaming.api.TimeCharacteristic;
 16 import org.apache.flink.streaming.api.datastream.DataStream;
 17 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 18 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 19 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 20 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 21 import org.apache.flink.streaming.api.windowing.time.Time;
 22 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 23 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 24 import org.apache.flink.util.Collector;
 25  
 26 import java.sql.Timestamp;
 27 import java.util.ArrayList;
 28 import java.util.Comparator;
 29 import java.util.List;
 30 import java.util.Properties;
 31  
 32  
 33 /**
 34  * 熱門商品topN統計
 35  * 數據下載 curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
 36  * 數據在resource 下 UserBehavior.csv
 37  * 參考: http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
 38  * <p>
 39  * 知識點: aggregate函數 是專門來做統計的,一般跟着keyBy()后面
 40  * <p>
 41  * 官網使用 : aggregate(SUM, 0).and(MIN, 2)
 42  */
 43 public class HotTopDemo {
 44     public static void main(String[] args) throws Exception {
 45  
 46         // todo 1,讀取kafka數據
 47         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 48  
 49         //todo 獲取kafka的配置屬性
 50         args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091",
 51                 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};
 52  
 53         ParameterTool parameterTool = ParameterTool.fromArgs(args);
 54         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 55  
 56  
 57 //
 58         Properties pros = parameterTool.getProperties();
 59 //        //todo 指定輸入數據為kafka topic
 60         DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
 61                 pros.getProperty("input-topic"),
 62                 new SimpleStringSchema(),
 63                 pros).setStartFromLatest()
 64  
 65         ).setParallelism(1);
 66  
 67  
 68  
 69         //todo 我們用 PojoCsvInputFormat 創建輸入源。
 70         DataStream<UserBehavior> dataDstream =kafkaDstream.map(new MapFunction<String, UserBehavior>() {
 71             @Override
 72             public UserBehavior map(String input) throws Exception {
 73                 return  JSON.parseObject(input, UserBehavior.class);
 74             }
 75         });
 76  
 77         //給數據加上了一個水印
 78         DataStream<UserBehavior> timedData = dataDstream
 79                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
 80                     @Override
 81                     public long extractAscendingTimestamp(UserBehavior userBehavior) {
 82                         // 原始數據單位秒 ,將其轉成毫秒
 83                         return userBehavior.category_id * 1000;
 84                     }
 85                 });
 86  
 87         //過濾出點擊事件
 88  
 89         DataStream<UserBehavior> pvData = timedData.filter(new FilterFunction<UserBehavior>() {
 90             @Override
 91             public boolean filter(UserBehavior userBehavior) throws Exception {
 92                 return userBehavior.behavior.equals("pv");
 93             }
 94         });
 95  
 96         //todo 窗口統計點擊量,設置窗口大小為1個小時,5分鍾滑動一次
 97         //由於要每隔5分鍾統計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鍾滑動一次。即分別要統計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。
 98  
 99         DataStream<ItemViewCount> windowedData = pvData.keyBy("item_id") //按字段分組
100                 .timeWindow(Time.seconds(20), Time.seconds(10))
101                 .aggregate(new CountAgg(), new WindowResultFunction()); //使用aggregate做增量聚合統計
102  
103 //        TopN 計算最熱門商品
104         DataStream<String> topItems = windowedData
105                 .keyBy("windowEnd")
106                 .process(new TopNHotItems(2));  // 求點擊量前3名的商品
107         topItems.print();
108         env.execute("Hot Items Job");
109  
110     }
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126     //todo 功能是統計窗口中的條數,即遇到一條數據就加一
127  
128     public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
129  
130         @Override
131         public Long createAccumulator() { //創建一個數據統計的容器,提供給后續操作使用。
132             return 0L;
133         }
134  
135         @Override
136         public Long add(UserBehavior userBehavior, Long acc) { //每個元素被添加進窗口的時候調用。
137             return acc + 1;
138         }
139  
140         @Override
141         public Long getResult(Long acc) {
142             ;//窗口統計事件觸發時調用來返回出統計的結果。
143             return acc;
144         }
145  
146         @Override
147         public Long merge(Long acc1, Long acc2) { //只有在當窗口合並的時候調用,合並2個容器
148             return acc1 + acc2;
149         }
150     }
151  
152     // todo 指定格式輸出 :將每個 key每個窗口聚合后的結果帶上其他信息進行輸出  進入的數據為Long 返回 ItemViewCount對象
153     public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
154  
155         @Override
156         public void apply(
157                 Tuple key,  // 窗口的主鍵,即 itemId
158                 TimeWindow window,  // 窗口
159                 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值
160                 Collector<ItemViewCount> collector  // 輸出類型為 ItemViewCount
161         ) throws Exception {
162             Long itemId = ((Tuple1<Long>) key).f0;
163             Long count = aggregateResult.iterator().next();
164             collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
165         }
166     }
167  
168     /**
169      * 商品點擊量(窗口操作的輸出類型)
170      */
171     public static class ItemViewCount {
172         public long itemId;     // 商品ID
173         public long windowEnd;  // 窗口結束時間戳
174         public long viewCount;  // 商品的點擊量
175  
176         public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
177             ItemViewCount result = new ItemViewCount();
178             result.itemId = itemId;
179             result.windowEnd = windowEnd;
180             result.viewCount = viewCount;
181             return result;
182         }
183     }
184  
185     /** 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串 */
186     public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
187  
188         private final int topSize;
189  
190         public TopNHotItems(int topSize) {
191             this.topSize = topSize;
192         }
193  
194         // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據后,再觸發 TopN 計算
195         private ListState<ItemViewCount> itemState;
196  
197         @Override
198         public void open(Configuration parameters) throws Exception {
199             super.open(parameters);
200             // 狀態的注冊
201             ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
202                     "itemState-state",
203                     ItemViewCount.class);
204             itemState = getRuntimeContext().getListState(itemsStateDesc);
205         }
206  
207         @Override
208         public void processElement(
209                 ItemViewCount input,
210                 Context context,
211                 Collector<String> collector) throws Exception {
212  
213             // 每條數據都保存到狀態中
214             itemState.add(input);
215             // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
216             context.timerService().registerEventTimeTimer(input.windowEnd + 1);
217         }
218  
219         @Override
220         public void onTimer(
221                 long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
222             // 獲取收到的所有商品點擊量
223             List<ItemViewCount> allItems = new ArrayList<>();
224             for (ItemViewCount item : itemState.get()) {
225                 allItems.add(item);
226             }
227             // 提前清除狀態中的數據,釋放空間
228             itemState.clear();
229             // 按照點擊量從大到小排序
230             allItems.sort(new Comparator<ItemViewCount>() {
231                 @Override
232                 public int compare(ItemViewCount o1, ItemViewCount o2) {
233                     return (int) (o2.viewCount - o1.viewCount);
234                 }
235             });
236             // 將排名信息格式化成 String, 便於打印
237             StringBuilder result = new StringBuilder();
238             result.append("====================================\n");
239             result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n");
240             for (int i=0;i<topSize;i++) {
241                 ItemViewCount currentItem = allItems.get(i);
242                 // No1:  商品ID=12224  瀏覽量=2413
243                 result.append("No").append(i).append(":")
244                         .append("  商品ID=").append(currentItem.itemId)
245                         .append("  瀏覽量=").append(currentItem.viewCount)
246                         .append("\n");
247             }
248             result.append("====================================\n\n");
249             out.collect(result.toString());
250         }
251     }
252 }

 

使用fastJson 

  1 package com.aliyun.market;
  2 
  3 import com.alibaba.fastjson.JSONObject;
  4 import org.apache.flink.api.common.functions.AggregateFunction;
  5 import org.apache.flink.api.common.functions.FilterFunction;
  6 import org.apache.flink.api.common.functions.MapFunction;
  7 import org.apache.flink.api.common.serialization.SimpleStringSchema;
  8 import org.apache.flink.api.java.functions.KeySelector;
  9 import org.apache.flink.api.java.utils.ParameterTool;
 10 import org.apache.flink.streaming.api.TimeCharacteristic;
 11 import org.apache.flink.streaming.api.datastream.DataStream;
 12 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 13 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 14 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 15 import org.apache.flink.streaming.api.windowing.time.Time;
 16 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 17 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 18 import org.apache.flink.util.Collector;
 19 
 20 import java.util.Properties;
 21 
 22 /**
 23  * 商場實時數據統計,基於Flink 1.9版本
 24  */
 25 public class MarketStreamCount {
 26     public static void main(String[] args) {
 27 
 28 
 29         // todo 1,讀取kafka數據
 30         final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
 31         //todo 獲取kafka的配置屬性
 32         args = new String[]{"--input-topic", "user_behavior", "--bootstrap.servers", "node2.hadoop:9091,node3.hadoop:9091",
 33                 "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"};
 34 
 35         ParameterTool parameterTool = ParameterTool.fromArgs(args);
 36         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 37 
 38          Properties pros = parameterTool.getProperties();
 39 //        //todo 指定輸入數據為kafka topic
 40         DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>(
 41                 pros.getProperty("input-topic"),
 42                 new SimpleStringSchema(),
 43                 pros).setStartFromEarliest()
 44 
 45         ).setParallelism(1);
 46 
 47         //todo 2,每層實時顧客人數,實時顧客總數,一天的實時顧客總數
 48 
 49         // 轉成json
 50         DataStream<JSONObject> kafkaDstream2 = kafkaDstream.map(new MapFunction<String, JSONObject>() {
 51             @Override
 52             public JSONObject map(String input) throws Exception {
 53                 JSONObject inputJson = null;
 54                 try {
 55                     inputJson = JSONObject.parseObject(input);
 56                     return inputJson;
 57                 } catch (Exception e) {
 58                     e.printStackTrace();
 59 //                    return null;
 60                 }
 61                 return inputJson;
 62             }
 63         }).filter(new FilterFunction<JSONObject>() {
 64             @Override
 65             public boolean filter(JSONObject jsonObject) throws Exception {
 66                 if (jsonObject!=null){
 67                     return true;
 68                 }
 69                 return false;
 70             }
 71         });
 72         //給數據加上了一個水印
 73         DataStream<JSONObject> timedData = kafkaDstream2
 74                 .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
 75                     @Override
 76                     public long extractAscendingTimestamp(JSONObject json) {
 77                         // 原始數據單位秒 ,將其轉成毫秒
 78                         return json.getLong("category_id") * 1000;
 79                     }
 80                 });
 81 
 82         //過濾出點擊事件
 83         // 實時客人數,各個層級
 84         DataStream<JSONObject> windowedData = timedData.keyBy(new KeySelector<JSONObject, String>() {
 85             @Override
 86             public String getKey(JSONObject jsonObject) throws Exception {
 87                 return  jsonObject.getString("user_id");
 88             }
 89         }).timeWindow(Time.seconds(5L),Time.seconds(1L))
 90                 //使用aggregate做增量聚合統計
 91                 .aggregate(new CountAgg(),new WindowResultFunction());
 92 
 93 
 94 
 95         windowedData.print();
 96         try {
 97             env.execute();
 98         } catch (Exception e) {
 99             e.printStackTrace();
100         }
101     }
102 
103 
104     //todo 功能是統計窗口中的條數,即遇到一條數據就加一
105 
106     public static class CountAgg implements AggregateFunction<JSONObject, Long, Long> {
107 
108         @Override
109         public Long createAccumulator() { //創建一個數據統計的容器,提供給后續操作使用。
110             return 0L;
111         }
112 
113         @Override
114         public Long add(JSONObject json, Long acc) { //每個元素被添加進窗口的時候調用。
115             return acc + 1;
116         }
117 
118         @Override
119         public Long getResult(Long acc) {
120             //窗口統計事件觸發時調用來返回出統計的結果。
121             return acc;
122         }
123 
124         @Override
125         public Long merge(Long acc1, Long acc2) { //只有在當窗口合並的時候調用,合並2個容器
126             return acc1 + acc2;
127         }
128     }
129 
130     // todo 指定格式輸出
131     public static class WindowResultFunction implements WindowFunction<Long, JSONObject, String, TimeWindow> {
132 
133         @Override
134         public void apply(
135                 String key,  // 窗口的主鍵,即 itemId
136                 TimeWindow window,  // 窗口
137                 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值
138                 Collector<JSONObject> collector  // 輸出類型為 ItemViewCount
139         ) throws Exception {
140 
141             Long count = aggregateResult.iterator().next();
142             //窗口結束時間
143             long end = window.getEnd();
144             JSONObject json = new JSONObject();
145             json.put("key",key);
146             json.put("count",count);
147             json.put("end",end);
148             collector.collect(json);
149         }
150     }
151 
152 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM