Flink Window——ReduceFunction、AggregateFunction、ProcessWindowFunction窗口函數詳解


1.使用 ReduceFunction函數

讓兩個元素結合起來,產生一個相同類型的元素,它是增量的,放在KeyBy函數之后

package flink.java.test;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestReduceFunctionOnWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;

        DataStreamSource<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH);
        input.keyBy(x -> x.f0)
                .countWindow(2)
                .reduce(new ReduceFunction<Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception {

                        System.out.println("value1-->"+value1);
                        System.out.println("value2-->"+value2);

                        System.out.println("==========================");
                        return new Tuple3<>(value1.f0,value1.f1,value1.f2+value2.f2);
                    }
                }).print("reduce累加");

        env.execute() ;

    }

    public static final Tuple3[] ENGLISH = new Tuple3[]{
            //班級 姓名 成績
            Tuple3.of("class1","張三",100),
            Tuple3.of("class1","李四",30),
            Tuple3.of("class1","王五",70),
            Tuple3.of("class2","趙六",50),
            Tuple3.of("class2","小七",40),
            Tuple3.of("class2","小八",10),
    };

}

  

執行返回結果:

value1-->(class2,趙六,50)
value2-->(class2,小七,40)
==========================
reduce累加:1> (class2,趙六,90)
value1-->(class1,張三,100)
value2-->(class1,李四,30)
==========================
reduce累加:2> (class1,張三,130)

Process finished with exit code 0

  

2.使用AggregateFunction函數統計計算

AggregateFunction 比 ReduceFunction 更加的通用,它有三個參數,一個輸入類型(IN),一個累加器(ACC),一個輸出類型(OUT)。

輸入類型,就是輸入流的類型。接口中有一個方法,可以把輸入的元素和累加器累加。並且可以初始化一個累加器,然后把兩個累加器合並成一個累加器,獲得輸出結果。

        input.keyBy(x -> x.f0)
                .countWindow(2)

//        AggregateFunction 比 ReduceFunction 更加的通用,它有三個參數,一個輸入類型(IN),一個累加器(ACC),一個輸出類型(OUT)
                .aggregate(new AggregateFunction<Tuple3<String, String, Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() {
//    創建累加器操作:初始化中間值 @Override
public Tuple2<String, Integer> createAccumulator() { return Tuple2.of("class1",1000); }
//    累加器操作 @Override
public Tuple2<String, Integer> add(Tuple3<String, String, Integer> value1, Tuple2<String, Integer> value2) { return Tuple2.of(value1.f0,value1.f2+value2.f1); }
//    獲取結果 @Override
public Tuple2<String, Integer> getResult(Tuple2<String, Integer> value) { return Tuple2.of(value.f0,value.f1); } //    累加器合並操作,只有會話窗口的時候才會調用! @Override public Tuple2<String, Integer> merge(Tuple2<String, Integer> value, Tuple2<String, Integer> acc1) { return Tuple2.of(value.f0,value.f1+acc1.f1); } }) .print("aggregate累加") ; env.execute() ;

執行結果:

aggregate累加:1> (class2,1090)
aggregate累加:2> (class1,1130)

Process finished with exit code 0

  

3.ProcessWindowFunction(全窗口函數)

ProcessWindowFunction 有一個 Iterable 迭代器,用來獲得窗口中所有的元素。

有一個上下文對象用來獲得時間和狀態信息,比其他的窗口函數有更大的靈活性。

但是這樣做損耗了一部分性能和資源,因為元素不能增量聚合,相反 ,在觸發窗口計算時,Flink 需要在內部緩存窗口的所有元素。

案例1:

        input.keyBy(x -> x.f0)
                .countWindow(2)
                //public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction ...
                .process(new ProcessWindowFunction<Tuple3<String, String, Integer>, Tuple3<String,String,Integer>, String, GlobalWindow>() {
                    @Override
                    public void process(String s,   //參數1:key
                                        Context context,    //參數2:上下文對象
                                        Iterable<Tuple3<String, String, Integer>> iterable, //參數3:這個窗口的所有元素
                                        //參數4:收集器,用於向下游傳遞數據
                                        Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                        System.out.println(context.window().maxTimestamp());
                        int sum = 0 ;
                        String name = "" ;
                        for (Tuple3<String,String,Integer> tuple3:iterable){
                            sum += tuple3.f2 ;
                            name = tuple3.f1 ;
                        }

                        collector.collect(Tuple3.of(s,name,sum));
                    }
                }).print();

  

輸出結果:

9223372036854775807
2> (class1,李四,130)
9223372036854775807
1> (class2,小七,90)

Process finished with exit code 0

  

案例2:

.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
    // 參數1: key 參數2: 上下文對象 參數3: 這個窗口內所有的元素 參數4: 收集器, 用於向下游傳遞數據
    @Override
    public void process(String key,
                        Context context,
                        Iterable<Tuple2<String, Long>> elements,
                        Collector<Tuple2<String, Long>> out) throws Exception {
        System.out.println(context.window().getStart());
        long sum = 0L;
        for (Tuple2<String, Long> t : elements) {
            sum += t.f1;
        }
        out.collect(Tuple2.of(key, sum));
    }
})

參考:

Flink Window那些事——ReduceFunction窗口函數

Flink(14) 窗口函數(window function) 詳解


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM