flink基礎之window


  flink會把數據分成不同的窗口,然后進行匯總和統計。

  flink的窗口分為timeWindow, countWindow, sessionWindow, gapWindow。

  timeWindow分為基於時間的滾動窗口和滑動窗口。

  舉個例子,統計每60秒的訪問量需要的就是滾動窗口;每5分鍾統計一次一個小時內的訪問量或者獲取訪問前幾的top值,這個時候就需要用到滑動窗口了。

  如果還不明白,看下面的圖,圖片來源flink官網的blog里。有一個傳感器一直錄入值,然后需要統計每個窗口里邊的匯總值,效果就是這個樣子。

 

  再來看一下滑動窗口的圖,假如sensor給到的是15秒鍾汽車穿過馬路的數量,現在需要每30秒統計1分鍾的穿過馬路的數量。第一次 9+6+8+4 = 22, 然后往右邊滑兩個數,8+4+7+3=22, 然后再往右邊滑兩個數,依此類推。

 

   這里邊需要注意的是,窗口的大小和滑動大小。分為三種情況:

  1. 窗口的大小=滑動的大小,那么效果和滾動窗口是一樣的。

  2. 窗口的大小>滑動的大小,數據就會被重復計算,上邊舉的這樣例子就是這樣的。

  3. 窗口的大小<滑動的大小,那么統計的時候會出現丟數據。

  countWindow也分為滑動計數窗口,滾動計數窗口,也就是這個窗口達到了指定個數后即觸發統計計算,滑動窗口比如countWindow(5,2),說明只要有2個數據到達后就可以往后統計5個數據的值。

  sessionWindow就是多久的session為一個窗口,假如設置的sessionWindow位5秒,那么5秒鍾時間內只要有數據這個窗口就會一直存在,5秒鍾之內沒有任何數據,那么這個就觸發一個窗口進行統計匯總。

  好了,咱們來用代碼看一下,我把所有代碼放到一個代碼中。如下:

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowCount {

    public static void main(String[] args) throws Exception{

        //創建env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //監聽本地的9000端口
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");


        //將輸入的單詞進行解析和收集
        DataStream<WordCount> wordCountStream = text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> out) throws Exception {
                for(String word : value.split("\\s")) {
                    out.collect(WordCount.of(word, 1L));
                }
            }
        });

        //timeWindow 滾動窗口 將收集的單詞進行分組和計數
        DataStream<WordCount> windowsCounts = wordCountStream.
                keyBy("word").
                timeWindow(Time.seconds(5)).
                sum("count");

        //timeWindow 滑動窗口 將收集的單詞進行分組和計數
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                timeWindow(Time.seconds(10), Time.seconds(2)).
//                sum("count");

        //countWindow 滾動窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                countWindow(2).
//                sum("count");

        //countWindow 滾動窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//        keyBy("word").
//        countWindow(5L, 2L).
//        sum("count");

        //sessionWindow 窗口
//        DataStream<WordCount> windowsCounts = wordCountStream.
//                keyBy("word").
//                window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).
//                sum("count");



        //打印時采用單線程打印
        windowsCounts.print().setParallelism(1);

        //提交所設置的執行
        env.execute("Socket Window WordCount");

    }

    public static class WordCount {

        public String word;
        public Long count;

        public static WordCount of(String word, Long count) {
            WordCount wordCount = new WordCount();
            wordCount.word = word;
            wordCount.count = count;
            return wordCount;
        }

        @Override
        public String toString() {
            return "word:" + word + " count:" + count;
        }
    }

}

  里邊幾種場景都涉及到了,下面只運行第一種情況。在電腦里邊輸入命令:nc -lk 9000,這個工具就是創建9000的socket服務,並且可以往里邊輸入數據。

  運行本地程序,第一個例子用timeWindow進行滾動窗口統計每五秒的單詞數量。

  前五秒的時候輸入  hello world hello, 然后又輸入了 hello this, 如下圖:

 

   輸出的結果如下:

 

   這樣就統計出來了,你是否get到了?有問題歡迎指正。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM