Apache Flink是什么?
Apache Flink 是一個分布式大數據處理引擎,可對有限數據流和無限數據流進行有狀態計算。可部署在各種集群環境,對各種大小的數據規模進行快速計算。上面是非常官方的描述,說白了我們為什么選擇Flink,是因為他在社區口碑非常不錯。在國內的話有阿里這種大數據大流量的公司一直在輸出,當然像騰訊、華為、餓了么、滴滴等也都有使用Apache Flink。
進入正題
本篇博文涉及到的軟件工具以及下載地址:
Apache Flink :https://flink.apache.org/downloads.html
Netcat:https://eternallybored.org/misc/netcat/
Netcat是一個有“瑞士軍刀”美譽的網絡工具,這里用來綁定端口等待Apache Flink的連接
第一步:啟動Flink
從上面的地址下載Flink后是一個壓縮包,解壓后的目錄結構如下:
/conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm內存(默認1024M)大小,web控制台的端口(默認8081)等。我們可以不該任何配置,然后進入到bin下,執行start-cluster.bat。這里要注意不是並不是flink.bat。flink.bat是用來提交job的。還有要確保相關的端口沒有被占用
運行成功后會有兩個java黑窗口(一個TaskManager、一個JobManager),如果只有一個java黑窗口,很可能是你的TaskManager因為端口占用沒有啟動起來,成功后訪問:http://localhost:8081.就會看到如下的web管理控制台了:
如果啟動失敗的話,上面箭頭所指向的地方應該是0.
第二步:job任務編寫
1.首先需要新建一個maven工程,然后導入Flink的接口依賴
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-java</artifactId>
-
<version>1.7.1</version>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-java_2.11</artifactId>
-
<version>1.7.1</version>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-clients_2.11</artifactId>
-
<version>1.7.1</version>
2.編寫具體的job,官方提供了一個單詞統計的demo
package com.kl;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
-
// the host and the port to connect to
-
final String hostname;
-
final int port;
-
try {
-
final ParameterTool params = ParameterTool.fromArgs(args);
-
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
-
port = params.has("port") ? params.getInt("port"):9000;
-
} catch (Exception e) {
-
System.err.println( "No port specified. Please run 'SocketWindowWordCount " +
-
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
-
"and port is the address of the text server");
-
System.err.println( "To start a simple text server, run 'netcat -l <port>' and " +
-
"type the input text into the command line");
-
return;
-
}
-
// get the execution environment
-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
// get input data by connecting to the socket
-
DataStream<String> text = env.socketTextStream(hostname, port, "\\n");
-
// parse the data, group it, window it, and aggregate the counts
-
DataStream<WordWithCount> windowCounts = text
-
.flatMap( new FlatMapFunction<String, WordWithCount>() {
-
public void flatMap(String value, Collector<WordWithCount> out) {
-
for (String word : value.split("\\\s")) {
-
out.collect(new WordWithCount(word, 1L));
-
} }})
-
.keyBy( "word")
-
.timeWindow(Time.seconds( 5))
-
.reduce( new ReduceFunction<WordWithCount>() {
-
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
-
return new WordWithCount(a.word, a.count + b.count);
-
}});
-
// print the results with a single thread, rather than in parallel
-
windowCounts.print().setParallelism( 1);
-
env.execute( "Socket Window WordCount");
}
/**
-
\* Data type for words with count.
-
*/
public static class WordWithCount {
-
public String word;
-
public long count;
-
public WordWithCount() {}
-
public WordWithCount(String word, long count) {
-
this.word = word;
-
this.count = count;
-
}
-
-
public String toString() {
-
return word + " : " + count;
-
}
}
}
上面demo實現了從啟動參數中獲取ip和端口,然后連接從輸入流接收文本信息,然后統計文本里單詞出現的次數。因為要打成可運行的jar,所以,還需要引入maven的jar打包插件,如下:
-
<plugins>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-shade-plugin</artifactId>
-
<version>1.2.1</version>
-
<executions>
-
<execution>
-
<phase>package</phase>
-
<goals>
-
<goal>shade</goal>
-
</goals>
-
<configuration>
-
<transformers>
-
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-
<mainClass>com.kl.SocketWindowWordCount</mainClass>
-
</transformer>
-
</transformers>
-
</configuration>
-
</execution>
-
</executions>
-
</plugin>
-
</plugins>
mainClass標簽中就是你的main方法所在類全類名。然后mvn install就可以打出一個可運行的jar包了。
第三步:Netcat監聽端口,等待連接
從上面貼的地址下載Netcat后,是一個壓縮包,有些安全軟件可能會報病毒,請忽略就好了。然后解壓文件目錄如下:
進入到這個目錄,然后執行: nc64.exe -l -p 9000。相當於打開了9000端口,並監聽了入站信息。最后實現的效果就是從這個窗口中輸入的數據,回車后會發送Apache Flink中我們提交的job中處理輸出,所以這里的9000端口,要和我們等下啟動job的啟動參數端口一致。
第四步:提交job運行
運行job有兩種方式:可以通過Flink.bat運行,也可以通過web控制台運行。
命令行運行:
flink run E:\flinkWorkingspce\flinkdemo\target\finlk-demo-1.0-SNAPSHOT.jar --port 9000
web控制台運行:
如上圖,點擊Add New后選擇你的jar包然后上傳,上傳成功就會在列表里列出來。然后選中你上傳的jar。就會出現如下圖的輸入框,可以輸入你的啟動參數,然后點擊submit提交就可以了
第五步:驗證效果
提交后如果沒有問題,job的詳情頁面如下:
這個時候我們從Netcat的監聽的黑窗口中敲入一些長文本,就會在Flink的job里統計輸出出來如: