Flink aggregate 的一個小問題 Cannot resolve method 'aggregate(com.dianshang.Util.CountAgg, com.dianshang.Util.WindowResultFunction)'


使用aggregate 函數報錯:

 //以ItemId作為key
                .keyBy(new KeySelector<UserBehavior, Long>() {
                    public Long getKey(UserBehavior userBehavior) throws Exception {
                        return userBehavior.getItemId();
                    }
                })
                //基於KeyedStream創建一個華窗,長度一個小時,滑動步長5min
                .timeWindow(Time.hours(1),Time.minutes(5))
                .aggregate(new CountAgg(),new WindowResultFunction()); //transform處理數據
        UserBehavior_data.filter(new FilterFunction<UserBehavior>() {
            public boolean filter(UserBehavior userBehavior) throws Exception {
                if(userBehavior.getBehavior()=="pv"){
                    return true;
                }
                return false;
            }
        })
                //以ItemId作為key
                .keyBy(new KeySelector<UserBehavior, Long>() {
                    public Long getKey(UserBehavior userBehavior) throws Exception {
                        return userBehavior.getItemId();
                    }
                })
                //基於KeyedStream創建一個華窗,長度一個小時,滑動步長5min
                .timeWindow(Time.hours(1),Time.minutes(5))
                .aggregate(new CountAgg(),new WindowResultFunction());

報錯:Cannot resolve method 'aggregate(com.dianshang.Util.CountAgg, com.dianshang.Util.WindowResultFunction)'
報錯原因:KeyBy函數的key值類型和WindowResultFunction的窗口主鍵類型不匹配
參考:https://blog.csdn.net/qq_31866793/article/details/100690542


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM