Flink Window那些事——AggregateFunction窗口函數


AggregateFunction 比 ReduceFunction 更加的通用,它有三個參數:輸入類型(IN)、累加器類型(ACC)和輸出類型(OUT)

輸入類型是輸入流中的元素類型,AggregateFunction有一個add方
法可以將一個輸入元素添加到一個累加器中。該接口還具有創建初始累加器(createAccumulator方法)、將兩個累加器合並到一個累加器(merge方法)以及從累加器中提取輸出(類型為OUT)的方法。

package com.lynch.stream.window; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * 測試AggFunction——求各個班級英語成績平均分 * */
public class TestAggFunctionOnWindow { public static void main(String[] args) throws Exception { // 獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 讀取數據
        DataStream<Tuple3<String, String, Long>> input = env.fromElements(ENGLISH); // 求各個班級英語成績平均分
        DataStream<Double> avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate()); avgScore.print(); env.execute("TestAggFunctionOnWindow"); } public static final Tuple3[] ENGLISH = new Tuple3[] { Tuple3.of("class1", "張三", 100L), Tuple3.of("class1", "李四", 40L), Tuple3.of("class1", "王五", 60L), Tuple3.of("class2", "趙六", 20L), Tuple3.of("class2", "小七", 30L), Tuple3.of("class2", "小八", 50L), }; //Tuple3<String, String, Long> 輸入類型 //Tuple2<Long, Long> 累加器ACC類型,保存中間狀態 //Double 輸出類型
    public static class AverageAggrate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> { /** * 創建累加器保存中間狀態(sum count) * * sum 英語總成績 * count 學生個數 * * @return
         */ @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } /** * 將元素添加到累加器並返回新的累加器 * * @param value 輸入類型 * @param acc 累加器ACC類型 * * @return 返回新的累加器 */ @Override public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) { //acc.f0 總成績 //value.f2 表示成績 //acc.f1 人數
            return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L); } /** * 從累加器提取結果 * * @param longLongTuple2 * @return
         */ @Override public Double getResult(Tuple2<Long, Long> acc) { return ((double) acc.f0) / acc.f1; } /** * 累加器合並 * * @param longLongTuple2 * @param acc1 * @return
         */ @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) { return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1); } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM