一、部署模式
Flink 支持使用多種部署模式來滿足不同規模應用的需求,常見的有單機模式,Standalone Cluster 模式,同時 Flink 也支持部署在其他第三方平台上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介紹其單機模式和 Standalone Cluster 模式的部署。
二、單機模式
單機模式是一種開箱即用的模式,可以在單台服務器上運行,適用於日常的開發和調試。具體操作步驟如下:
2.1 安裝部署
1. 前置條件
Flink 的運行依賴 JAVA 環境,故需要預先安裝好 JDK,具體步驟可以參考:Linux 環境下 JDK 安裝
2. 下載 & 解壓 & 運行
Flink 所有版本的安裝包可以直接從其官網進行下載,這里我下載的 Flink 的版本為 1.9.1
,要求的 JDK 版本為 1.8.x +
。 下載后解壓到指定目錄:
tar -zxvf flink-1.9.1-bin-scala_2.12.tgz -C /usr/app
不需要進行任何配置,直接使用以下命令就可以啟動單機版本的 Flink:
bin/start-cluster.sh
3. WEB UI 界面
Flink 提供了 WEB 界面用於直觀的管理 Flink 集群,訪問端口為 8081
:
Flink 的 WEB UI 界面支持大多數常用功能,如提交作業,取消作業,查看各個節點運行情況,查看作業執行情況等,大家可以在部署完成后,進入該頁面進行詳細的瀏覽。
2.2 作業提交
啟動后可以運行安裝包中自帶的詞頻統計案例,具體步驟如下:
1. 開啟端口
nc -lk 9999
2. 提交作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
該 JAR 包的源碼可以在 Flink 官方的 GitHub 倉庫中找到,地址為 :SocketWindowWordCount ,可選傳參有 hostname, port,對應的詞頻數據需要使用空格進行分割。
3. 輸入測試數據
a a b b c c c a e
4. 查看控制台輸出
可以通過 WEB UI 的控制台查看作業統運行情況:
也可以通過 WEB 控制台查看到統計結果:
2.3 停止作業
可以直接在 WEB 界面上點擊對應作業的 Cancel Job
按鈕進行取消,也可以使用命令行進行取消。使用命令行進行取消時,需要先獲取到作業的 JobId,可以使用 flink list
命令查看,輸出如下:
[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
獲取到 JobId 后,就可以使用 flink cancel
命令取消作業:
bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc
2.4 停止 Flink
命令如下:
bin/stop-cluster.sh
三、Standalone Cluster
Standalone Cluster 模式是 Flink 自帶的一種集群模式,具體配置步驟如下:
3.1 前置條件
使用該模式前,需要確保所有服務器間都已經配置好 SSH 免密登錄服務。這里我以三台服務器為例,主機名分別為 hadoop001,hadoop002,hadoop003 , 其中 hadoop001 為 master 節點,其余兩台為 slave 節點,搭建步驟如下:
3.2 搭建步驟
修改 conf/flink-conf.yaml
中 jobmanager 節點的通訊地址為 hadoop001:
jobmanager.rpc.address: hadoop001
修改 conf/slaves
配置文件,將 hadoop002 和 hadoop003 配置為 slave 節點:
hadoop002
hadoop003
將配置好的 Flink 安裝包分發到其他兩台服務器上:
scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app
在 hadoop001 上使用和單機模式相同的命令來啟動集群:
bin/start-cluster.sh
此時控制台輸出如下:
啟動完成后可以使用 Jps
命令或者通過 WEB 界面來查看是否啟動成功。
3.3 可選配置
除了上面介紹的 jobmanager.rpc.address 是必選配置外,Flink h還支持使用其他可選參數來優化集群性能,主要如下:
- jobmanager.heap.size:JobManager 的 JVM 堆內存大小,默認為 1024m 。
- taskmanager.heap.size:Taskmanager 的 JVM 堆內存大小,默認為 1024m 。
- taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的數量,通常設置為 CPU 核心的數量,或其一半。
- parallelism.default:任務默認的並行度。
- io.tmp.dirs:存儲臨時文件的路徑,如果沒有配置,則默認采用服務器的臨時目錄,如 LInux 的
/tmp
目錄。
更多配置可以參考 Flink 的官方手冊:Configuration
四、Standalone Cluster HA
上面我們配置的 Standalone 集群實際上只有一個 JobManager,此時是存在單點故障的,所以官方提供了 Standalone Cluster HA 模式來實現集群高可用。
4.1 前置條件
在 Standalone Cluster HA 模式下,集群可以由多個 JobManager,但只有一個處於 active 狀態,其余的則處於備用狀態,Flink 使用 ZooKeeper 來選舉出 Active JobManager,並依賴其來提供一致性協調服務,所以需要預先安裝 ZooKeeper 。
另外在高可用模式下,還需要使用分布式文件系統來持久化存儲 JobManager 的元數據,最常用的就是 HDFS,所以 Hadoop 也需要預先安裝。關於 Hadoop 集群和 ZooKeeper 集群的搭建可以參考:
4.2 搭建步驟
修改 conf/flink-conf.yaml
文件,增加如下配置:
# 配置使用zookeeper來開啟高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群時,可以使用逗號來分隔多個節點地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存儲flink集群元信息的路徑
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存儲JobManager元數據的地址,zookeeper上存儲的只是指向該元數據的指針信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery
修改 conf/masters
文件,將 hadoop001 和 hadoop002 都配置為 master 節點:
hadoop001:8081
hadoop002:8081
確保 Hadoop 和 ZooKeeper 已經啟動后,使用以下命令來啟動集群:
bin/start-cluster.sh
此時輸出如下:
可以看到集群已經以 HA 的模式啟動,此時還需要在各個節點上使用 jps
命令來查看進程是否啟動成功,正常情況如下:
只有 hadoop001 和 hadoop002 的 JobManager 進程,hadoop002 和 hadoop003 上的 TaskManager 進程都已經完全啟動,才表示 Standalone Cluster HA 模式搭建成功。
4.3 常見異常
如果進程沒有啟動,可以通過查看 log
目錄下的日志來定位錯誤,常見的一個錯誤如下:
2019-11-05 09:18:35,877 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in
the classpath/dependencies.
......
可以看到是因為在 classpath 目錄下找不到 Hadoop 的相關依賴,此時需要檢查是否在環境變量中配置了 Hadoop 的安裝路徑,如果路徑已經配置但仍然存在上面的問題,可以從 Flink 官網下載對應版本的 Hadoop 組件包:
下載完成后,將該 JAR 包上傳至所有 Flink 安裝目錄的 lib
目錄即可。