大數據基礎---Flink_Standalone_集群部署


一、部署模式

Flink 支持使用多種部署模式來滿足不同規模應用的需求,常見的有單機模式,Standalone Cluster 模式,同時 Flink 也支持部署在其他第三方平台上,如 YARN,Mesos,Docker,Kubernetes 等。以下主要介紹其單機模式和 Standalone Cluster 模式的部署。

二、單機模式

單機模式是一種開箱即用的模式,可以在單台服務器上運行,適用於日常的開發和調試。具體操作步驟如下:

2.1 安裝部署

1. 前置條件

Flink 的運行依賴 JAVA 環境,故需要預先安裝好 JDK,具體步驟可以參考:Linux 環境下 JDK 安裝

2. 下載 & 解壓 & 運行

Flink 所有版本的安裝包可以直接從其官網進行下載,這里我下載的 Flink 的版本為 1.9.1 ,要求的 JDK 版本為 1.8.x +。 下載后解壓到指定目錄:

tar -zxvf flink-1.9.1-bin-scala_2.12.tgz  -C /usr/app

不需要進行任何配置,直接使用以下命令就可以啟動單機版本的 Flink:

bin/start-cluster.sh

3. WEB UI 界面

Flink 提供了 WEB 界面用於直觀的管理 Flink 集群,訪問端口為 8081

Flink 的 WEB UI 界面支持大多數常用功能,如提交作業,取消作業,查看各個節點運行情況,查看作業執行情況等,大家可以在部署完成后,進入該頁面進行詳細的瀏覽。

2.2 作業提交

啟動后可以運行安裝包中自帶的詞頻統計案例,具體步驟如下:

1. 開啟端口

nc -lk 9999

2. 提交作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999

該 JAR 包的源碼可以在 Flink 官方的 GitHub 倉庫中找到,地址為 :SocketWindowWordCount ,可選傳參有 hostname, port,對應的詞頻數據需要使用空格進行分割。

3. 輸入測試數據

a a b b c c c a e

4. 查看控制台輸出

可以通過 WEB UI 的控制台查看作業統運行情況:

也可以通過 WEB 控制台查看到統計結果:

2.3 停止作業

可以直接在 WEB 界面上點擊對應作業的 Cancel Job 按鈕進行取消,也可以使用命令行進行取消。使用命令行進行取消時,需要先獲取到作業的 JobId,可以使用 flink list 命令查看,輸出如下:

[root@hadoop001 flink-1.9.1]# ./bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
05.11.2019 08:19:53 : ba2b1cc41a5e241c32d574c93de8a2bc : Socket Window WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

獲取到 JobId 后,就可以使用 flink cancel 命令取消作業:

bin/flink cancel ba2b1cc41a5e241c32d574c93de8a2bc

命令如下:

bin/stop-cluster.sh

三、Standalone Cluster

Standalone Cluster 模式是 Flink 自帶的一種集群模式,具體配置步驟如下:

3.1 前置條件

使用該模式前,需要確保所有服務器間都已經配置好 SSH 免密登錄服務。這里我以三台服務器為例,主機名分別為 hadoop001,hadoop002,hadoop003 , 其中 hadoop001 為 master 節點,其余兩台為 slave 節點,搭建步驟如下:

3.2 搭建步驟

修改 conf/flink-conf.yaml 中 jobmanager 節點的通訊地址為 hadoop001:

jobmanager.rpc.address: hadoop001

修改 conf/slaves 配置文件,將 hadoop002 和 hadoop003 配置為 slave 節點:

hadoop002
hadoop003

將配置好的 Flink 安裝包分發到其他兩台服務器上:

 scp -r /usr/app/flink-1.9.1 hadoop002:/usr/app
 scp -r /usr/app/flink-1.9.1 hadoop003:/usr/app

在 hadoop001 上使用和單機模式相同的命令來啟動集群:

bin/start-cluster.sh

此時控制台輸出如下:

啟動完成后可以使用 Jps 命令或者通過 WEB 界面來查看是否啟動成功。

3.3 可選配置

除了上面介紹的 jobmanager.rpc.address 是必選配置外,Flink h還支持使用其他可選參數來優化集群性能,主要如下:

  • jobmanager.heap.size:JobManager 的 JVM 堆內存大小,默認為 1024m 。
  • taskmanager.heap.size:Taskmanager 的 JVM 堆內存大小,默認為 1024m 。
  • taskmanager.numberOfTaskSlots:Taskmanager 上 slots 的數量,通常設置為 CPU 核心的數量,或其一半。
  • parallelism.default:任務默認的並行度。
  • io.tmp.dirs:存儲臨時文件的路徑,如果沒有配置,則默認采用服務器的臨時目錄,如 LInux 的 /tmp 目錄。

更多配置可以參考 Flink 的官方手冊:Configuration

四、Standalone Cluster HA

上面我們配置的 Standalone 集群實際上只有一個 JobManager,此時是存在單點故障的,所以官方提供了 Standalone Cluster HA 模式來實現集群高可用。

4.1 前置條件

在 Standalone Cluster HA 模式下,集群可以由多個 JobManager,但只有一個處於 active 狀態,其余的則處於備用狀態,Flink 使用 ZooKeeper 來選舉出 Active JobManager,並依賴其來提供一致性協調服務,所以需要預先安裝 ZooKeeper 。

另外在高可用模式下,還需要使用分布式文件系統來持久化存儲 JobManager 的元數據,最常用的就是 HDFS,所以 Hadoop 也需要預先安裝。關於 Hadoop 集群和 ZooKeeper 集群的搭建可以參考:

4.2 搭建步驟

修改 conf/flink-conf.yaml 文件,增加如下配置:

# 配置使用zookeeper來開啟高可用模式
high-availability: zookeeper
# 配置zookeeper的地址,采用zookeeper集群時,可以使用逗號來分隔多個節點地址
high-availability.zookeeper.quorum: hadoop003:2181
# 在zookeeper上存儲flink集群元信息的路徑
high-availability.zookeeper.path.root: /flink
# 集群id
high-availability.cluster-id: /standalone_cluster_one
# 持久化存儲JobManager元數據的地址,zookeeper上存儲的只是指向該元數據的指針信息
high-availability.storageDir: hdfs://hadoop001:8020/flink/recovery

修改 conf/masters 文件,將 hadoop001 和 hadoop002 都配置為 master 節點:

hadoop001:8081
hadoop002:8081

確保 Hadoop 和 ZooKeeper 已經啟動后,使用以下命令來啟動集群:

bin/start-cluster.sh

此時輸出如下:

可以看到集群已經以 HA 的模式啟動,此時還需要在各個節點上使用 jps 命令來查看進程是否啟動成功,正常情況如下:

只有 hadoop001 和 hadoop002 的 JobManager 進程,hadoop002 和 hadoop003 上的 TaskManager 進程都已經完全啟動,才表示 Standalone Cluster HA 模式搭建成功。

4.3 常見異常

如果進程沒有啟動,可以通過查看 log 目錄下的日志來定位錯誤,常見的一個錯誤如下:

2019-11-05 09:18:35,877 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint      
- Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
.......
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file 
system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no 
Hadoop file system to support this scheme could be loaded.
.....
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in 
the classpath/dependencies.
......

可以看到是因為在 classpath 目錄下找不到 Hadoop 的相關依賴,此時需要檢查是否在環境變量中配置了 Hadoop 的安裝路徑,如果路徑已經配置但仍然存在上面的問題,可以從 Flink 官網下載對應版本的 Hadoop 組件包:

下載完成后,將該 JAR 包上傳至所有 Flink 安裝目錄的 lib 目錄即可。

參考資料

系列傳送門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM