Flink學習(三) 批流版本的wordcount JAVA版本


Flink 開發環境
通常來講,任何一門大數據框架在實際生產環境中都是以集群的形式運行,而我們調試代碼大多數會在本地搭建一個模板工程,Flink 也不例外。

Flink 一個以 Java 及 Scala 作為開發語言的開源大數據項目,通常我們推薦使用 Java 來作為開發語言,Maven 作為編譯和包管理工具進行項目構建和編譯。對於大多數開發者而言,JDK、Maven 和 Git 這三個開發工具是必不可少的。

關於 JDK、Maven 和 Git 的安裝建議如下表所示:

 

 

 

工程創建
一般來說,我們在通過 IDE 創建工程,可以自己新建工程,添加 Maven 依賴,或者直接用 mvn 命令創建應用:

mvn   archetype:generate  \
        -DarchetypeGroupId=org.apache.flink \
        -DarchetypeArtifactId=flink-quickstart-java \
        -DarchetypeVersion=1.10.0

 

 

 這里需要的主要的是,自動生成的項目 pom.xml 文件中對於 Flink 的依賴注釋掉 scope:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <!--<scope>provided</scope>-->
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <!--<scope>provided</scope>-->
</dependency>

 

DataSet WordCount (批處理)
WordCount 程序是大數據處理框架的入門程序,俗稱“單詞計數”。用來統計一段文字每個單詞的出現次數,該程序主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是單詞進行分組計數並打印輸出結果。

    public static void main(String[] args) throws Exception {

      // 創建Flink運行的上下文環境
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      // 創建DataSet,這里我們的輸入是一行一行的文本
      DataSet<String> text = env.fromElements(
            "Flink Spark Storm",
            "Flink Flink Flink",
            "Spark Spark Spark",
            "Storm Storm Storm"
      );
      // 通過Flink內置的轉換函數進行計算
      DataSet<Tuple2<String, Integer>> counts =
            text.flatMap(new LineSplitter())
                  .groupBy(0)
                  .sum(1);
      //結果打印
      counts.printToErr();

   }

   public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

      @Override
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // 將文本分割
         String[] tokens = value.toLowerCase().split("\\W+");

         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<String, Integer>(token, 1));
            }
         }
      }
    }

實現的整個過程中分為以下幾個步驟。

首先,我們需要創建 Flink 的上下文運行環境:

復制ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
然后,使用 fromElements 函數創建一個 DataSet 對象,該對象中包含了我們的輸入,使用 FlatMap、GroupBy、SUM 函數進行轉換。

最后,直接在控制台打印輸出。

我們可以直接右鍵運行一下 main 方法,在控制台會出現我們打印的計算結果:

 

 

DataStream WordCount (流處理)
為了模仿一個流式計算環境,我們選擇監聽一個本地的 Socket 端口,並且使用 Flink 中的滾動窗口,每 5 秒打印一次計算結果。代碼如下:

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // 創建Flink的流式計算環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 監聽本地9000端口
        DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");

        // 將接收的數據進行拆分,分組,窗口計算並且進行聚合輸出
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // 打印結果
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

整個流式計算的過程分為以下幾步。

首先創建一個流式計算環境:

復制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后進行監聽本地 9000 端口,將接收的數據進行拆分、分組、窗口計算並且進行聚合輸出。代碼中使用了 Flink 的窗口函數,我們在后面的課程中將詳細講解。

我們在本地使用 netcat 命令啟動一個端口:

nc -lk 9000
然后直接運行我們的 main 方法:

 

 在 nc 中輸入:

$ nc -lk 9000
Flink Flink Flink 
Flink Spark Storm

可以在控制台看到:

Flink : 4
Spark : 1
Storm : 1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM