使用aggregate 函數報錯:
//以ItemId作為key
.keyBy(new KeySelector<UserBehavior, Long>() {
public Long getKey(UserBehavior userBehavior) throws Exception {
return userBehavior.getItemId();
}
})
//基於KeyedStream創建一個華窗,長度一個小時,滑動步長5min
.timeWindow(Time.hours(1),Time.minutes(5))
.aggregate(new CountAgg(),new WindowResultFunction()); //transform處理數據
UserBehavior_data.filter(new FilterFunction<UserBehavior>() {
public boolean filter(UserBehavior userBehavior) throws Exception {
if(userBehavior.getBehavior()=="pv"){
return true;
}
return false;
}
})
//以ItemId作為key
.keyBy(new KeySelector<UserBehavior, Long>() {
public Long getKey(UserBehavior userBehavior) throws Exception {
return userBehavior.getItemId();
}
})
//基於KeyedStream創建一個華窗,長度一個小時,滑動步長5min
.timeWindow(Time.hours(1),Time.minutes(5))
.aggregate(new CountAgg(),new WindowResultFunction());
報錯:Cannot resolve method 'aggregate(com.dianshang.Util.CountAgg, com.dianshang.Util.WindowResultFunction)'
報錯原因:KeyBy函數的key值類型和WindowResultFunction的窗口主鍵類型不匹配
參考:https://blog.csdn.net/qq_31866793/article/details/100690542