(二)Flink滑動窗口單詞統計實例


需求一:通過socket實時產生單詞,使用flink實時接收數據,對指定時間窗口內(例如:2秒)的數據進行聚合統計,並把時間窗口內計算的結果打印出來。

Flink程序開發步驟:

  1、獲得一個執行環境

  2、加載/創建 初始化數據

  3、指定操作數據的transaction算子

  4、指定把計算好的數據放在哪里

  5、調用execute()觸發執行程序

      注意:Flink程序是延遲計算的,只有最后調用execute()方法的時候才會真正出發執行程序。

  延遲計算的好處是:你可以開發復雜的程序,但是Flink可以將復雜的程序轉成一個Plan,將Plan作為一個整體單元執行!

實現代碼:

Java版本:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 滑動窗口計算
 *
 * 通過socket模擬產生單詞數據
 * flink對數據進行統計計算
 *
 * 需要實現每隔1秒對最近2秒內的數據進行匯總計算
 *
 */
public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception{
        //獲取需要的端口號
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        }catch (Exception e){
            System.err.println("No port set. use default port 9000--java");
            port = 9000;
        }

        //獲取flink的運行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "hadoop100";
        String delimiter = "\n";
        //連接socket獲取輸入的數據
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // a a c

        // a 1
        // a 1
        // c 1
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒
                .sum("count");//在這里使用sum或者reduce都可以
                /*.reduce(new ReduceFunction<WordWithCount>() {
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {

                                        return new WordWithCount(a.word,a.count+b.count);
                                    }
                                })*/
        //把數據打印到控制台並且設置並行度
        windowCounts.print().setParallelism(1);

        //這一行代碼一定要實現,否則程序不執行
        env.execute("Socket window count");

    }

    public static class WordWithCount{
        public String word;
        public long count;
        public  WordWithCount(){}
        public WordWithCount(String word,long count){
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

Scala版本:

后續補充......

轉載請注明地址: https://www.cnblogs.com/wynjauu/articles/10542807.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM