Flink Window窗口機制


  1. 總覽

    • Window 是flink處理無限流的核心,Windows將流拆分為有限大小的“桶”,我們可以在其上應用計算。

    • Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。

    • 而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。

    • Flink 提供了非常完善的窗口機制。

    • 在流處理應用中,數據是連續不斷的,因此我們不可能等到所有數據都到了才開始處理。

    • 當然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鍾內有多少用戶點擊了我們的網頁。

    • 在這種情況下,我們必須定義一個窗口,用來收集最近一分鍾內的數據,並對這個窗口內的數據進行計算。

    • 窗口可以是基於時間驅動的(Time Window,例如:每30秒鍾)

    • 也可以是基於數據驅動的(Count Window,例如:每一百個元素)

    • 同時基於不同事件驅動的窗口又可以分成以下幾類:

      • 翻滾窗口 (Tumbling Window, 無重疊)
      • 滑動窗口 (Sliding Window, 有重疊)
      • 會話窗口 (Session Window, 活動間隙)
      • 全局窗口 (略)
    • Flink要操作窗口,先得將StreamSource 轉成WindowedStream

      Window操作 其作用
      Window Keyed Streaming → WindowedStream 可以在已經分區的KeyedStream上定義Windows,即K,V格式的數據。
      WindowAll DataStream → AllWindowedStream 對常規的DataStream上定義Window,即非K,V格式的數據
      Window Apply WindowedStream → AllWindowedStream AllWindowedStream → DataStream 將函數應用於整個窗口中的數據
      Window Reduce WindowedStream → DataStream 對窗口里的數據進行”reduce”減少聚合統計
      Aggregations on windows WindowedStream → DataStream 對窗口里的數據進行聚合操作: sum(), max(), min()
  2. Tumbling Window(翻滾窗口)

    • 翻滾窗口能將數據流切分成不重疊的窗口,每一個事件只能屬於一個窗口

    • 翻滾窗具有固定的尺寸,不重疊。

    • 例圖:

      image-20191113092146338

      • 代碼

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.datastream.WindowedStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
        
        import java.text.SimpleDateFormat;
        import java.util.Random;
        
        /**
         * 翻滾窗口:窗口不可重疊
         * 1、基於時間驅動
         * 2、基於事件驅動
         */
        public class TumblingWindow {
        
            public static void main(String[] args) {
            //設置執行環境,類似spark中初始化sparkContext
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                
                env.setParallelism(1);
        
                DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
        
                SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
        
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        
                        long timeMillis = System.currentTimeMillis();
        
                        int random = new Random().nextInt(10);
        
                        System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis));
        
                        return new Tuple2<String, Integer>(value, random);
                    }
                });
        
                KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
        
        
                // 基於時間驅動,每隔10s划分一個窗口
                WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10));
        
                // 基於事件驅動, 每相隔3個事件(即三個相同key的數據), 划分一個窗口進行計算
                // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3);
        
                // apply是窗口的應用函數,即apply里的函數將應用在此窗口的數據上。
                timeWindow.apply(new MyTimeWindowFunction()).print();
                // countWindow.apply(new MyCountWindowFunction()).print();
        
                try {
                    // 轉換算子都是lazy init的, 最后要顯式調用 執行程序
                    env.execute();
                } catch (Exception e) {
                    e.printStackTrace();
                }
        
            }
        }
        
        
    • 基於時間驅動

      • 場景1:我們需要統計每一分鍾中用戶購買的商品的總數,需要將用戶的行為事件按每一分鍾進行切分,這種切分被成為翻滾時間窗口(Tumbling Time Window)。

        package com.shsxt.flink.stream.window;
        
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
        import org.apache.flink.util.Collector;
        
        import java.text.SimpleDateFormat;
        
        public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, Tuple, TimeWindow> {
        
            @Override
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        
                int sum = 0;
        
                for(Tuple2<String,Integer> tuple2 : input){
                    sum +=tuple2.f1;
                }
        
                long start = window.getStart();
                long end = window.getEnd();
        
                out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
                        + format.format(start) + "  window_end :" + format.format(end)
                );
        
            }
        }
        
        
    • 基於事件驅動

      • 場景2:當我們想要每100個用戶的購買行為作為驅動,那么每當窗口中填滿100個”相同”元素了,就會對窗口進行計算。

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
        import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
        import org.apache.flink.util.Collector;
        
        import java.text.SimpleDateFormat;
        
        public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {
        
            @Override
            public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        
                int sum = 0;
        
                for (Tuple2<String, Integer> tuple2 : input){
                    sum += tuple2.f1;
                }
                //無用的時間戳,默認值為: Long.MAX_VALUE,因為基於事件計數的情況下,不關心時間。
                long maxTimestamp = window.maxTimestamp();
        
                out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"
                        + maxTimestamp + "," + format.format(maxTimestamp)
                );
            }
        }
        
  3. Sliding Window(滑動窗口)

    • 滑動窗口和翻滾窗口類似,區別在於:滑動窗口可以有重疊的部分。

    • 在滑窗中,一個元素可以對應多個窗口。

    • 例圖:

      image-20191113102254911

    • 基於時間的滑動窗口

      • 場景: 我們可以每30秒計算一次最近一分鍾用戶購買的商品總數。
    • 基於事件的滑動窗口

      • 場景: 每10個 “相同”元素計算一次最近100個元素的總和.
    • 代碼:

      package com.ronnie.flink.stream.window;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.KeyedStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.datastream.WindowedStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
      
      import java.text.SimpleDateFormat;
      import java.util.Random;
      
      /**
       * 滑動窗口:窗口可重疊
       * 1、基於時間驅動
       * 2、基於事件驅動
       */
      public class SlidingWindow {
      
          public static void main(String[] args) {
              // 設置執行環境, 類似spark中初始化SparkContext
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              
              env.setParallelism(1);
      
              DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
      
              SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String value) throws Exception {
                      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                      long timeMillis = System.currentTimeMillis();
      
                      int random = new Random().nextInt(10);
                      System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
      
                      return new Tuple2<String, Integer>(value, random);
                  }
              });
              KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
      
              //基於時間驅動,每隔5s計算一下最近10s的數據
           //   WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
              //基於事件驅動,每隔2個事件,觸發一次計算,本次窗口的大小為3,代表窗口里的每種事件最多為3個
              WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3, 2);
      
           //   timeWindow.sum(1).print();
      
              countWindow.sum(1).print();
      
           //   timeWindow.apply(new MyTimeWindowFunction()).print();
      
              try {
                  env.execute();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
  4. Session Window(會話窗口)

    • 會話窗口不重疊,沒有固定的開始和結束時間

    • 與翻滾窗口和滑動窗口相反, 當會話窗口在一段時間內沒有接收到元素時會關閉會話窗口。

    • 后續的元素將會被分配給新的會話窗口

    • 例圖:

      image-20191113102605969

    • 舉例:

      • 計算每個用戶在活躍期間總共購買的商品數量,如果用戶30秒沒有活動則視為會話斷開。
    • 代碼:

      package com.ronnie.flink.stream.window;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.KeyedStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.datastream.WindowedStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
      
      import java.text.SimpleDateFormat;
      import java.util.Random;
      
      public class SessionWindow {
      
          public static void main(String[] args) {
      
              // 設置執行環境, 類似spark中初始化sparkContext
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              env.setParallelism(1);
      
              DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999);
      
              SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                  @Override
                  public Tuple2<String, Integer> map(String value) throws Exception {
                      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                      long timeMillis = System.currentTimeMillis();
      
                      int random = new Random().nextInt(10);
      
                      System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis));
      
                      return new Tuple2<String, Integer>(value, random);
                  }
              });
              KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);
      
              //如果連續10s內,沒有數據進來,則會話窗口斷開。
              WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
      
              // window.sum(1).print();
              
              window.apply(new MyTimeWindowFunction()).print();
      
              try {
                  env.execute();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM