windows環境下flink入門demo實例


Apache Flink是什么?

Apache Flink 是一個分布式大數據處理引擎,可對有限數據流和無限數據流進行有狀態計算。可部署在各種集群環境,對各種大小的數據規模進行快速計算。上面是非常官方的描述,說白了我們為什么選擇Flink,是因為他在社區口碑非常不錯。在國內的話有阿里這種大數據大流量的公司一直在輸出,當然像騰訊、華為、餓了么、滴滴等也都有使用Apache Flink。

進入正題

本篇博文涉及到的軟件工具以及下載地址:

Apache Flink :https://flink.apache.org/downloads.html

Netcat:https://eternallybored.org/misc/netcat/

Netcat是一個有“瑞士軍刀”美譽的網絡工具,這里用來綁定端口等待Apache Flink的連接

第一步:啟動Flink

從上面的地址下載Flink后是一個壓縮包,解壓后的目錄結構如下:

/conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm內存(默認1024M)大小,web控制台的端口(默認8081)等。我們可以不該任何配置,然后進入到bin下,執行start-cluster.bat。這里要注意不是並不是flink.bat。flink.bat是用來提交job的。還有要確保相關的端口沒有被占用

運行成功后會有兩個java黑窗口(一個TaskManager、一個JobManager),如果只有一個java黑窗口,很可能是你的TaskManager因為端口占用沒有啟動起來,成功后訪問:http://localhost:8081.就會看到如下的web管理控制台了:

如果啟動失敗的話,上面箭頭所指向的地方應該是0.

第二步:job任務編寫

1.首先需要新建一個maven工程,然后導入Flink的接口依賴

  1. <groupId>org.apache.flink</groupId>
  2. <artifactId>flink-java</artifactId>
  3. <version>1.7.1</version>

 

  1. <groupId>org.apache.flink</groupId>
  2. <artifactId>flink-streaming-java_2.11</artifactId>
  3. <version>1.7.1</version>

 

  1. <groupId>org.apache.flink</groupId>
  2. <artifactId>flink-clients_2.11</artifactId>
  3. <version>1.7.1</version>

2.編寫具體的job,官方提供了一個單詞統計的demo

package com.kl;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {

  1. // the host and the port to connect to
  2. final String hostname;
  3. final int port;
  4. try {
  5. final ParameterTool params = ParameterTool.fromArgs(args);
  6. hostname = params.has("hostname") ? params.get("hostname") : "localhost";
  7. port = params.has("port") ? params.getInt("port"):9000;
  8. } catch (Exception e) {
  9. System.err.println( "No port specified. Please run 'SocketWindowWordCount " +
  10. "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
  11. "and port is the address of the text server");
  12. System.err.println( "To start a simple text server, run 'netcat -l <port>' and " +
  13. "type the input text into the command line");
  14. return;
  15. }
  16. // get the execution environment
  17. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. // get input data by connecting to the socket
  19. DataStream<String> text = env.socketTextStream(hostname, port, "\\n");
  20. // parse the data, group it, window it, and aggregate the counts
  21. DataStream<WordWithCount> windowCounts = text
  22. .flatMap( new FlatMapFunction<String, WordWithCount>() {
  23. public void flatMap(String value, Collector<WordWithCount> out) {
  24. for (String word : value.split("\\\s")) {
  25. out.collect(new WordWithCount(word, 1L));
  26. } }})
  27. .keyBy( "word")
  28. .timeWindow(Time.seconds( 5))
  29. .reduce( new ReduceFunction<WordWithCount>() {
  30. public WordWithCount reduce(WordWithCount a, WordWithCount b) {
  31. return new WordWithCount(a.word, a.count + b.count);
  32. }});
  33. // print the results with a single thread, rather than in parallel
  34. windowCounts.print().setParallelism( 1);
  35. env.execute( "Socket Window WordCount");

}
/**

  1. \* Data type for words with count.
  2. */

public static class WordWithCount {

  1. public String word;
  2. public long count;
  3. public WordWithCount() {}
  4. public WordWithCount(String word, long count) {
  5. this.word = word;
  6. this.count = count;
  7. }
  8. @Override
  9. public String toString() {
  10. return word + " : " + count;
  11. }

}
}

上面demo實現了從啟動參數中獲取ip和端口,然后連接從輸入流接收文本信息,然后統計文本里單詞出現的次數。因為要打成可運行的jar,所以,還需要引入maven的jar打包插件,如下:

  1. <plugins>
  2. <plugin>
  3. <groupId>org.apache.maven.plugins</groupId>
  4. <artifactId>maven-shade-plugin</artifactId>
  5. <version>1.2.1</version>
  6. <executions>
  7. <execution>
  8. <phase>package</phase>
  9. <goals>
  10. <goal>shade</goal>
  11. </goals>
  12. <configuration>
  13. <transformers>
  14. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  15. <mainClass>com.kl.SocketWindowWordCount</mainClass>
  16. </transformer>
  17. </transformers>
  18. </configuration>
  19. </execution>
  20. </executions>
  21. </plugin>
  22. </plugins>

mainClass標簽中就是你的main方法所在類全類名。然后mvn install就可以打出一個可運行的jar包了。

第三步:Netcat監聽端口,等待連接

從上面貼的地址下載Netcat后,是一個壓縮包,有些安全軟件可能會報病毒,請忽略就好了。然后解壓文件目錄如下:

進入到這個目錄,然后執行: nc64.exe -l -p 9000。相當於打開了9000端口,並監聽了入站信息。最后實現的效果就是從這個窗口中輸入的數據,回車后會發送Apache Flink中我們提交的job中處理輸出,所以這里的9000端口,要和我們等下啟動job的啟動參數端口一致。

第四步:提交job運行

運行job有兩種方式:可以通過Flink.bat運行,也可以通過web控制台運行。

命令行運行:

flink run E:\flinkWorkingspce\flinkdemo\target\finlk-demo-1.0-SNAPSHOT.jar --port 9000

web控制台運行:

如上圖,點擊Add New后選擇你的jar包然后上傳,上傳成功就會在列表里列出來。然后選中你上傳的jar。就會出現如下圖的輸入框,可以輸入你的啟動參數,然后點擊submit提交就可以了

第五步:驗證效果

提交后如果沒有問題,job的詳情頁面如下:

這個時候我們從Netcat的監聽的黑窗口中敲入一些長文本,就會在Flink的job里統計輸出出來如:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM