Flink 開發環境
通常來講,任何一門大數據框架在實際生產環境中都是以集群的形式運行,而我們調試代碼大多數會在本地搭建一個模板工程,Flink 也不例外。
Flink 一個以 Java 及 Scala 作為開發語言的開源大數據項目,通常我們推薦使用 Java 來作為開發語言,Maven 作為編譯和包管理工具進行項目構建和編譯。對於大多數開發者而言,JDK、Maven 和 Git 這三個開發工具是必不可少的。
關於 JDK、Maven 和 Git 的安裝建議如下表所示:
工程創建
一般來說,我們在通過 IDE 創建工程,可以自己新建工程,添加 Maven 依賴,或者直接用 mvn 命令創建應用:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.10.0
這里需要的主要的是,自動生成的項目 pom.xml 文件中對於 Flink 的依賴注釋掉 scope:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency>
DataSet WordCount (批處理)
WordCount 程序是大數據處理框架的入門程序,俗稱“單詞計數”。用來統計一段文字每個單詞的出現次數,該程序主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是單詞進行分組計數並打印輸出結果。
public static void main(String[] args) throws Exception { // 創建Flink運行的上下文環境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 創建DataSet,這里我們的輸入是一行一行的文本 DataSet<String> text = env.fromElements( "Flink Spark Storm", "Flink Flink Flink", "Spark Spark Spark", "Storm Storm Storm" ); // 通過Flink內置的轉換函數進行計算 DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); //結果打印 counts.printToErr(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // 將文本分割 String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
實現的整個過程中分為以下幾個步驟。
首先,我們需要創建 Flink 的上下文運行環境:
復制ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
然后,使用 fromElements 函數創建一個 DataSet 對象,該對象中包含了我們的輸入,使用 FlatMap、GroupBy、SUM 函數進行轉換。
最后,直接在控制台打印輸出。
我們可以直接右鍵運行一下 main 方法,在控制台會出現我們打印的計算結果:
DataStream WordCount (流處理)
為了模仿一個流式計算環境,我們選擇監聽一個本地的 Socket 端口,並且使用 Flink 中的滾動窗口,每 5 秒打印一次計算結果。代碼如下:
public class StreamingJob { public static void main(String[] args) throws Exception { // 創建Flink的流式計算環境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 監聽本地9000端口 DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n"); // 將接收的數據進行拆分,分組,窗口計算並且進行聚合輸出 DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // 打印結果 windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // Data type for words with count public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
整個流式計算的過程分為以下幾步。
首先創建一個流式計算環境:
復制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后進行監聽本地 9000 端口,將接收的數據進行拆分、分組、窗口計算並且進行聚合輸出。代碼中使用了 Flink 的窗口函數,我們在后面的課程中將詳細講解。
我們在本地使用 netcat 命令啟動一個端口:
nc -lk 9000
然后直接運行我們的 main 方法:
在 nc 中輸入:
$ nc -lk 9000
Flink Flink Flink
Flink Spark Storm
可以在控制台看到:
Flink : 4 Spark : 1 Storm : 1