Flink Window那些事——ProcessWindowFunction/ProcessAllWindowFunction


全量聚合: 窗口需要維護全部原始數據,窗口觸發進行全量聚合。

ProcessWindowFunction獲得一個包含窗口所有元素的可迭代器,以及一個具有時間和狀態信息訪問權的上下文對象,這使得它比其他窗口函數提供更大的靈活性。這是以性能和資源消耗為代價的,因為元素不能增量地聚合,而是需要在內部緩沖,直到認為窗口可以處理為止。

WindowFunction的升級版,可以跟ReduceFunction/AggregateFunction/FoldFunction結合使用(推薦用法)

package com.lynch.stream.window; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; /** * 測試ProcessWinFunction * * @author dajiangtai * @create 2019-06-11-18:37 */
public class TestProcessWinFunctionOnWindow { public static void main(String[] args) throws Exception{ //獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //讀取數據
        DataStream<Tuple3<String,String,Long>> input = env.fromElements(ENGLISH); //求各班級英語成績平均分
        DataStream<Double> avgScore = input.keyBy(0) .countWindow(2) .process(new MyProcessWindowFunction()); avgScore.print(); env.execute("TestProcessWinFunctionOnWindow"); } public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double, Tuple, GlobalWindow>{ //iterable 輸入流中的元素類型集合
 @Override public void process(Tuple tuple, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> out) throws Exception { long sum = 0; long count = 0; for (Tuple3<String,String,Long> in :iterable){ sum+=in.f2; count++; } out.collect((double)(sum/count)); } } public static final Tuple3[] ENGLISH = new Tuple3[]{ Tuple3.of("class1","張三",100L), Tuple3.of("class1","李四",78L), Tuple3.of("class1","王五",99L), Tuple3.of("class2","趙六",81L), Tuple3.of("class2","小七",59L), Tuple3.of("class2","小八",97L), }; }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM