Flink系列文章
- 第01講:Flink 的應用場景和架構模型
- 第02講:Flink 入門程序 WordCount 和 SQL 實現
- 第03講:Flink 的編程模型與其他框架比較
- 第04講:Flink 常用的 DataSet 和 DataStream API
- 第05講:Flink SQL & Table 編程和案例
- 第06講:Flink 集群安裝部署和 HA 配置
- 第07講:Flink 常見核心概念分析
- 第08講:Flink 窗口、時間和水印
- 第09講:Flink 狀態與容錯
我們在這一課時將講解 Flink 常見的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式,然后分別講解三種模式的使用場景和部署中常見的問題,最后將講解在生產環境中 Flink 集群的高可用配置。
Flink 常見的部署模式
環境准備
在絕大多數情況下,我們的 Flink 都是運行在 Unix 環境中的,推薦在 Mac OS 或者 Linux 環境下運行 Flink。如果是集群模式,那么可以在自己電腦上安裝虛擬機,保證有一個 master 節點和兩個 slave 節點。
同時,要注意在所有的機器上都應該安裝 JDK 和 SSH。JDK 是我們運行 JVM 語言程序必須的,而 SSH 是為了在服務器之間進行跳轉和執行命令所必須的。關於服務器之間通過 SSH 配置公鑰登錄,你可以直接搜索安裝和配置方法,我們不做過度展開。
Flink 的安裝包可以在這里下載。需要注意的是,如果你要和 Hadoop 進行集成,那么我們需要使用到對應的 Hadoop 依賴,下面將會詳細講解。
Local 模式
Local 模式是 Flink 提供的最簡單部署模式,一般用來本地測試和演示使用。
我們在這里下載 Apache Flink 1.10.0 for Scala 2.11 版本進行演示,該版本對應 Scala 2.11 版本。
將壓縮包下載到本地,並且直接進行解壓,使用 Flink 默認的端口配置,直接運行腳本啟動:
➜ [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz

上圖則為解壓完成后的目錄情況。
然后,我們可以直接運行腳本啟動 Flink :
復制代碼
➜ [flink-1.10.0]# ./bin/start-cluster.sh

上圖顯示我們的 Flink 啟動成功。
我們直接訪問本地的 8081 端口,可以看到 Flink 的后台管理界面,驗證 Flink 是否成功啟動。

可以看到 Flink 已經成功啟動。當然,我們也可以查看運行日志來確認 Flink 是不是成功啟動了,在 log 目錄下有程序的啟動日志:

我們嘗試提交一個測試任務:
復制代碼
./bin/flink run examples/batch/WordCount.jar

我們在控制台直接看到輸出。同樣,在 Flink 的后台管理界面 Completed Jobs 一欄可以看到剛才提交執行的程序:

Standalone 模式
Standalone 模式是集群模式的一種,但是這種模式一般並不運行在生產環境中,原因和 on yarn 模式相比:
- Standalone 模式的部署相對簡單,可以支持小規模,少量的任務運行;
- Stabdalone 模式缺少系統層面對集群中 Job 的管理,容易遭成資源分配不均勻;
- 資源隔離相對簡單,任務之間資源競爭嚴重。
我們在 3 台虛擬機之間搭建 standalone 集群:

在 master 節點,將 Apache Flink 1.10.0 for Scala 2.11 包進行解壓:
復制代碼
➜ [SoftWare]# tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
重點來啦,我們需要修改 Flink 的配置文件,並且將修改好的解壓目錄完整的拷貝到兩個從節點中去。在這里,我強烈建議主節點和從節點的目錄要保持一致。
我們修改 conf 目錄下的 flink-conf.yaml:

flink-conf.yaml 文件中有大量的配置參數,我們挑選其中必填的最基本參數進行修改:
復制代碼
jobmanager.rpc.address: master
jobmanager.heap.size: 1024m
jobmanager.rpc.port: 6123
taskmanager.memory.process.size: 1568m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
io.tmp.dirs: /tmp
它們分別代表:

如果你對其他的參數有興趣的話,可以直接參考官網。接下來我們修改 conf 目錄下的 master 和 slave 文件。vim master,將內容修改為:
master
vim slave,將內容修改為:
slave01
slave02
然后,將整個修改好的 Flink 解壓目錄使用 scp 遠程拷貝命令發送到從節點:
scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
在 master、slave01、slave02 上分別配置環境變量,vim /etc/profile,將內容修改為:
export FLINK_HOME=/SoftWare/flink-1.10.0
export PATH=$PATH:$FLINK_HOME/bin
到此為止,我們整個的基礎配置已經完成,下面需要啟動集群,登錄 master 節點執行:
/SoftWare/flink-1.10.0/bin/start-cluster.sh
可以在瀏覽器訪問:http://192.168.2.100:8081/ 檢查集群是否啟動成功。
集群搭建過程中,可能出現的問題:
- 端口被占用,我們需要手動殺掉占用端口的程序;
- 目錄找不到或者文件找不到,我們在 flink-conf.yaml 中配置過 io.tmp.dirs ,這個目錄需要手動創建。
On Yarn 模式和 HA 配置

上圖是 Flink on Yarn 模式下,Flink 和 Yarn 的交互流程。Yarn 是 Hadoop 三駕馬車之一,主要用來做資源管理。我們在 Flink on Yarn 模式中也是借助 Yarn 的資源管理優勢,需要在三個節點中配置 YARN_CONF_DIR、HADOOP_CONF_DIR、HADOOP_CONF_PATH 中的任意一個環境變量即可。
本課時中集群的高可用 HA 配置是基於獨立的 ZooKeeper 集群。當然,Flink 本身提供了內置 ZooKeeper 插件,可以直接修改 conf/zoo.cfg,並且使用 /bin/start-zookeeper-quorum.sh 直接啟動。
環境准備:
- ZooKeeper-3.x
- Flink-1.10.0
- Hadoop-2.6.5
我們使用 5 台虛擬機搭建 on yarn 的高可用集群:

如果你在使用 Flink 的最新版本 1.10.0 時,那么需要在本地安裝 Hadoop 環境並進行下面的操作。
首先,添加環境變量:
vi /etc/profile
# 添加環境變量
export HADOOP_CONF_DIR=/Software/hadoop-2.6.5/etc/hadoop
# 環境變量生效
source /etc/profile
其次,下載對應的的依賴包,並將對應的 Hadoop 依賴復制到 flink 的 lib 目錄下,對應的 hadoop 依賴可以在這里下載。

與 standalone 集群不同的是,我們需要修改 flink-conf.yaml 文件中的一些配置:
high-availability: zookeeper
high-availability.storageDir: hdfs://cluster/flinkha/
high-availability.zookeeper.quorum: slave01:2181,slave02:2181,slave03:2181
它們分別代表:

然后分別修改 master、slave、zoo.cfg 三個配置文件。
vim master,將內容修改為:
master01:8081
master02:8081
vim slave,將內容修改為:
slave01
slave02
slave03
vim zoo.cfg,將內容修改為:
server.1=slave01:2888:3888
server.2=slave02:2888:3888
server.3=slave03:2888:3888
然后,我們將整個修改好的 Flink 解壓目錄使用 scp 遠程拷貝命令發送到從節點:
scp -r /SoftWare/flink-1.10.0 slave01:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave02:/SoftWare/
scp -r /SoftWare/flink-1.10.0 slave03:/SoftWare/
分別啟動 Hadoop 和 ZooKeeper,然后在主節點,使用命令啟動集群:
/SoftWare/flink-1.10.0/bin/start-cluster.sh
我們同樣直接訪問 http://192.168.2.100:8081/ 端口,可以看到 Flink 的后台管理界面,驗證 Flink 是否成功啟動。
在 Flink on yarn 模式下,啟動集群的方式有兩種:
- 直接在 yarn 上運行任務
- yarn session 模式
直接在 yarn 上運行任務相當於將 job 直接提交到 yarn 上,每個任務會根據用戶的指定進行資源申請,任務之間互不影響。
./bin/flink run -yjm 1024m -ytm 4096m -ys 2 ./examples/batch/WordCount.jar
更多關於參數的含義,可以參考官網。使用 yarn session 模式,我們需要先啟動一個 yarn-session 會話,相當於啟動了一個 yarn 任務,這個任務所占用的資源不會變化,並且一直運行。我們在使用 flink run 向這個 session 任務提交作業時,如果 session 的資源不足,那么任務會等待,直到其他資源釋放。當這個 yarn-session 被殺死時,所有任務都會停止。
例如我們啟動一個 yarn session 任務,該任務擁有 8G 內存、32 個槽位。
./bin/yarn-session.sh -tm 8192 -s 32
我們在 yarn 的界面上可以看到這個任務的 ID,然后向這個 session ID 提交 Flink 任務:
./bin/flink run -m yarn-cluster -yid application_xxxx ./examples/batch/WordCount.jar
其中,application_xxxx 即為上述的 yarn session 任務 ID。
總結
本課時我們講解了 Flink 的三種部署模式和高可用配置,並且對這三種部署模式的適用場景進行了講解。在生產上,我們最常用的方式當然是 Flink on Yarn,借助 Yarn 在資源管理上的絕對優勢,確保集群和任務的穩定。
關注公眾號:
大數據技術派,回復資料,領取1024G資料。
