下載對應操作系統和版本的flink
# 首先確認下Java環境
$ java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
# Linux安裝
$ wget https://www.apache.org/dyn/closer.lua/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz # 下載二進制安裝包
$ tar xzf flink-*.tgz # 解壓安裝包
$ cd flink-1.7.1 # 切到安裝包目錄
# Mac安裝
$ brew install apache-flink
...
# 查看版本
$ flink --version
Version: 1.2.0, Commit ID: 1c659cf
# 查看安裝位置
$ brew info apache-flink
https://flink.apache.org/
/usr/local/Cellar/apache-flink/1.7.1 (170 files, 321.1MB) *
Built from source on 2019-02-14 at 09:32:35
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb
==> Requirements
Required: java = 1.8 ✔
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 915 (30 days), 3,279 (90 days), 9,094 (365 days)
install_on_request: 899 (30 days), 3,226 (90 days), 8,878 (365 days)
build_error: 0 (30 days)
# Mac上注意事項
# Mac上對應Linux的bin目錄在/usr/local/Cellar/apache-flink/1.7.1/libexec
配置運行
$ cd flink-1.7.1
$ ./bin/start-cluster.sh
# 端口運行在localhost:8081
創建IDEA Maven工程 -> Add Archetype
GroupId: org.apache.flink
ArtifactId: flink-quickstart-java
Version: 1.7.1
添加代碼
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
打包運行
$ maven clean package -Dmaven.test.skip=true
開啟監聽端口
$ nc -l 9000
運行上方代碼
# 連接到上方端口, 切到上方打好的jar包路徑
$ flink run -c 包路徑.SocketWindowWordCount jar包路徑 --port 9000 # 包路徑指的是當前的java類的package
我們可以在nc中輸入數據
$ nc -l 9000
hello hello hello
hehe
your world
查看結果
$ tail -f log/flink-*-taskexecutor-*.out
停止flink
$ ./bin/stop-cluster.sh