https://mp.weixin.qq.com/s/noD2Jv6m-somEMtjWTJh3w
本文是根據 Apache Flink 系列直播課程整理而成,由阿里巴巴高級開發工程師沙晟陽分享,主要面向於初次接觸 Flink、或者對 Flink 有了解但是沒有實際操作過的同學。希望幫助大家更順利地上手使用 Flink,並着手相關開發調試工作。
主要內容:
-
Flink 開發環境的部署和配置
-
運行 Flink 應用
-
單機 Standalone 模式
-
多機 Standalone 模式
-
Yarn 集群模式
一. Flink 開發環境部署和配置
Flink 是一個以 Java 及 Scala 作為開發語言的開源大數據項目,代碼開源在 GitHub 上,並使用 Maven 來編譯和構建項目。對於大部分使用 Flink 的同學來說,Java、Maven 和 Git 這三個工具是必不可少的,另外一個強大的 IDE 有助於我們更快的閱讀代碼、開發新功能以及修復 Bug。因為篇幅所限,我們不會詳述每個工具的安裝細節,但會給出必要的安裝建議。
關於開發測試環境,Mac OS、Linux 系統或者 Windows 都可以。如果使用的是 Windows 10 系統,建議使用 Windows 10 系統的 Linux 子系統來編譯和運行。
工具 | 注釋 |
---|---|
Java | Java 版本至少是 Java 8,且最好選用 Java 8u51 及以上版本 |
Maven | 必須使用 Maven 3,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功,但是在 Shade 一些 Dependencies 的過程中有些問題 |
Git | Flink 的代碼倉庫是: https://github.com/apache/flink |
建議選用社區已發布的穩定分支,比如 Release-1.6 或者 Release-1.7。
1. 編譯 Flink 代碼
在我們配置好之前的幾個工具后,編譯 Flink 就非常簡單了,執行如下命令即可:
mvn clean install -DskipTests
# 或者
mvn clean package -DskipTests
常用編譯參數:
Dfast 主要是忽略QA plugins和JavaDocs的編譯
Dhadoop.version=2.6.1 指定hadoop版本
settings=${maven_file_path} 顯式指定maven settings.xml配置文件
當成功編譯完成后,能在當前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下文件(不同的 Flink 代碼分支編譯出的版本號不同,這里的版本號是 Flink 1.5.1):
其中有三個文件可以留意一下:
版本 | 注釋 |
---|---|
flink-1.5.1.tar.gz | Binary 的壓縮包 |
flink-1.5.1-bin/flink-1.5.1 | 解壓后的 Flink binary 目錄 |
flink-dist_2.11-1.5.1.jar | 包含 Flink 核心功能的 jar 包 |
注意: 國內用戶在編譯時可能遇到編譯失敗“Build Failure”(且有 MapR 相關報錯),一般都和 MapR 相關依賴的下載失敗有關,即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專門為 MapR 相關依賴做了代理),還是可能出現下載失敗的情況。問題主要和 MapR 的 Jar 包比較大有關。遇到這些問題時,重試即可。在重試之前,要先根據失敗信息刪除 Maven local repository 中對應的目錄,否則需要等待 Maven 下載的超時時間才能再次出發下載依賴到本地。
2. 開發環境准備
推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不兼容。
如果你需要做一些 Flink 代碼的開發工作,則需要根據 Flink 代碼的 tools/maven/目錄 下的配置文件來配置 Checkstyle ,因為 Flink 在編譯時會強制代碼風格的檢查,如果代碼風格不符合規范,可能會直接編譯失敗。
二、運行 Flink 應用
1. 基本概念
運行 Flink 應用其實非常簡單,但是在運行 Flink 應用之前,還是有必要了解 Flink 運行時的各個組件,因為這涉及到 Flink 應用的配置問題。圖 1 所示,這是用戶用 DataStream API 寫的一個數據處理程序。可以看到,在一個 DAG 圖中不能被 Chain 在一起的 Operator 會被分隔到不同的 Task 中,也就是說 Task 是 Flink 中資源調度的最小單位。
圖 1 Parallel Dataflows
圖 2 所示,Flink 實際運行時包括兩類進程:
-
JobManager(又稱為 JobMaster):協調 Task 的分布式執行,包括調度 Task、協調創 Checkpoint 以及當 Job failover 時協調各個 Task 從 Checkpoint 恢復等。
-
TaskManager(又稱為 Worker):執行 Dataflow 中的 Tasks,包括內存 Buffer 的分配、Data Stream 的傳遞等。
圖 2 Flink Runtime 架構圖
圖 3 所示,Task Slot 是一個 TaskManager 中的最小資源分配單位,一個 TaskManager 中有多少個 Task Slot 就意味着能支持多少並發的 Task 處理。需要注意的是,一個 Task Slot 中可以執行多個 Operator,一般這些 Operator 是能被 Chain 在一起處理的。
圖 3 Process
2. 運行環境准備
-
准備 Flink binary
-
直接從 Flink 官網上下載 Flink binary 的壓縮包
-
或者從 Flink 源碼編譯而來
-
安裝 Java,並配置 JAVA_HOME 環境變量
3. 單機 Standalone 的方式運行 Flink
(1)基本的啟動流程
最簡單的運行 Flink 應用的方法就是以單機 Standalone 的方式運行。
啟動集群:
./bin/start-cluster.sh
打開 http://127.0.0.1:8081/ 就能看到 Flink 的 Web 界面。嘗試提交 Word Count 任務:
./bin/flink run examples/streaming/WordCount.jar
大家可以自行探索 Web 界面中展示的信息,比如,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計算結果。
我們還可以嘗試通過“–input”參數指定我們自己的本地文件作為輸入,然后執行:
./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}
停止集群:
./bin/stop-cluster.sh
(2)常用配置介紹
-
conf / slaves
conf / slaves 用於配置 TaskManager 的部署,默認配置下只會啟動一個 TaskManager 進程,如果想增加一個 TaskManager 進程的,只需要文件中追加一行“localhost”。
也可以直接通過“ ./bin/taskmanager.sh start ”這個命令來追加一個新的 TaskManager:
./bin/taskmanager.sh start|start-foreground|stop|stop-all
conf/flink-conf.yaml
-
conf/flink-conf.yaml
用於配置 JM 和 TM 的運行參數,常用配置有:
# The heap size for the JobManager JVM
jobmanager.heap.mb: 1024
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 1024
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 4
# the managed memory size for each task manager.
taskmanager.managed.memory.size: 256
Standalone 集群啟動后,我們可以嘗試分析一下 Flink 相關進程的運行情況。執行 jps 命令,可以看到 Flink 相關的進程主要有兩個,一個是 JobManager 進程,另一個是 TaskManager 進程。我們可以進一步用 ps 命令看看進程的啟動參數中“-Xmx”和“-Xms”的配置。然后我們可以嘗試修改 flink-conf.yaml 中若干配置,然后重啟 Standalone 集群看看發生了什么變化。
需要補充的是,在 Blink 開源分支上,TaskManager 的內存計算上相對於現在的社區版本要更精細化,TaskManager 進程的堆內存限制(-Xmx)一般的計算方法是:
TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(默認值為128MB)
而最新的 Flink 社區版本 Release-1.7 中 JobManager 和 TaskManager 默認內存配置方式為:
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
Flink 社區 Release-1.7 版本中的“taskmanager.heap.size”配置實際上指的不是 Java heap 的內存限制,而是 TaskManager 進程總的內存限制。我們可以同樣用上述方法查看 Release-1.7 版本的 Flink binary 啟動的 TaskManager 進程的 -Xmx 配置,會發現實際進程上的 -Xmx 要小於配置的“taskmanager.heap.size”的值,原因在於從中扣除了 Network buffer 用的內存,因為 Network buffer 用的內存一定是 Direct memory,所以不應該算在堆內存限制中。
(3)日志的查看和配置
JobManager 和 TaskManager 的啟動日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-user−standalonesession−{id}-${hostname}”為前綴的文件對應的是 JobManager 的輸出,其中有三個文件:
-
flink-user−standalonesession−{id}-${hostname}.log:代碼中的日志輸出
-
flink-user−standalonesession−{id}-${hostname}.out:進程執行時的 stdout 輸出
-
flink-user−standalonesession−{id}-${hostname}-gc.log:JVM 的 GC 的日志
Log 目錄中以“flink-user−taskexecutor−{id}-${hostname}”為前綴的文件對應的是 TaskManager 的輸出,也包括三個文件,和 JobManager 的輸出一致。
日志的配置文件在 Flink binary 目錄的 conf 子目錄下,其中:
-
log4j-cli.properties:用 Flink 命令行時用的 log 配置,比如執行“ flink run”命令
-
log4j-yarn-session.properties:用 yarn-session.sh 啟動時命令行執行時用的 log 配置
-
log4j.properties:無論是 Standalone 還是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties
這三個“log4j.*properties”文件分別有三個“logback.*xml”文件與之對應,如果想使用 Logback 的同學,之需要把與之對應的“log4j.*properties”文件刪掉即可,對應關系如下:
-
log4j-cli.properties -> logback-console.xml
-
log4j-yarn-session.properties -> logback-yarn.xml
-
log4j.properties -> logback.xml
需要注意的是,“flink-user−standalonesession−{id}-hostname”和“flink−{user}-taskexecutor-id−{hostname}”都帶有“id”,“{id}”表示本進程在本機上該角色(JobManager 或 TaskManager)的所有進程中的啟動順序,默認從 0 開始。
(4)進一步探索
嘗試重復執行“./bin/start-cluster.sh”命令,然后看看 Web 頁面(或者執行 jps 命令),看看會發生什么?可以嘗試看看啟動腳本,分析一下原因。接着可以重復執行“./bin/stop-cluster.sh”,每次執行完后,看看會發生什么。
4. 多機部署 Flink Standalone 集群
部署前要注意的要點:
-
每台機器上配置好 Java 以及 JAVA_HOME 環境變量
-
每台機器上部署的 Flink binary 的目錄要保證是同一個目錄
-
如果需要用 HDFS,需要配置 HADOOP_CONF_DIR 環境變量配置
根據你的集群信息修改 conf/masters 和 conf/slaves 配置。
修改 conf/flink-conf.yaml 配置,注意要確保和 Masters 文件中的地址一致:
jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net
確保所有機器的 Flink binary 目錄中 conf 中的配置文件相同,特別是以下三個:
conf/masters
conf/slaves
conf/flink-conf.yaml
然后啟動 Flink 集群:
./bin/start-cluster.sh
提交 WordCount 作業:
./bin/flink run examples/streaming/WordCount.jar
上傳 WordCount 的 Input 文件:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
提交讀寫 HDFS 的 WordCount 作業:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
增加 WordCount 作業的並發度(注意輸出文件重名會提交失敗):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20
5. Standalone 模式的 HighAvailability(HA)部署和配置
通過圖 2 Flink Runtime 架構圖,我們可以看到 JobManager 是整個系統中最可能導致系統不可用的角色。如果一個 TaskManager 掛了,在資源足夠的情況下,只需要把相關 Task 調度到其他空閑 TaskSlot 上,然后 Job 從 Checkpoint 中恢復即可。而如果當前集群中只配置了一個 JobManager,則一旦 JobManager 掛了,就必須等待這個 JobManager 重新恢復,如果恢復時間過長,就可能導致整個 Job 失敗。
因此如果在生產業務使用 Standalone 模式,則需要部署配置 HighAvailability,這樣同時可以有多個 JobManager 待命,從而使得 JobManager 能夠持續服務。
圖 4 Flink JobManager HA 示意圖
注意:
如果想使用 Flink standalone HA 模式,需要確保基於 Flink Release-1.6.1 及以上版本,因為這里社區有個 bug 會導致這個模式下主 JobManager 不能正常工作。
接下來的實驗中需要用到 HDFS,所以需要下載帶有 Hadoop 支持的 Flink Binary 包。
(1)(可選)使用 Flink 自帶的腳本部署 Zookeeper
Flink 目前支持基於 Zookeeper 的 HA。如果你的集群中沒有部署 ZK,Flink 提供了啟動 Zookeeper 集群的腳本。首先修改配置文件“conf/zoo.cfg”,根據你要部署的 Zookeeper Server 的機器數來配置“server.X=addressX:peerPort:leaderPort”,其中“X”是一個 Zookeeper Server 的唯一 ID,且必須是數字。
# The port at which the clients will connect
clientPort=3181
server.1=z05f06378.sqa.zth.tbsite.net:4888:5888
server.2=z05c19426.sqa.zth.tbsite.net:4888:5888
server.3=z05f10219.sqa.zth.tbsite.net:4888:5888
然后啟動 Zookeeper:
./bin/start-zookeeper-quorum.sh
jps 命令看到 Zookeeper 進程已經啟動:
停掉 Zookeeper 集群的命令:
./bin/stop-zookeeper-quorum.sh
(2)修改 Flink Standalone 集群的配置
修改 conf/masters 文件,增加一個 JobManager:
$cat conf/masters
z05f06378.sqa.zth.tbsite.net:8081
z05c19426.sqa.zth.tbsite.net:8081
之前修改過的 conf/slaves 文件保持不變:
$cat conf/slaves
z05f06378.sqa.zth.tbsite.net
z05c19426.sqa.zth.tbsite.net
z05f10219.sqa.zth.tbsite.net
修改 conf/flink-conf.yaml 文件:
# 配置 high-availability mode
high-availability: zookeeper
# 配置 zookeeper quorum(hostname 和端口需要依據對應 zk 的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
# (可選)設置 zookeeper 的 root 目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# (可選)相當於是這個 standalone 集群中創建的 zk node 的 namespace
high-availability.cluster-id: /test_dir/test_standalone2
# JobManager 的 meta 信息放在 dfs,在 zk 上主要會保存一個指向 dfs 路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的這兩個配置都失效了(想想為什么)。
jobmanager.rpc.address
jobmanager.rpc.port
修改完成后,確保配置同步到其他機器。
啟動 Zookeeper 集群:
./bin/start-zookeeper-quorum.sh
再啟動 Standalone 集群(要確保之前的 Standalone 集群已經停掉):
./bin/start-cluster.sh
分別打開兩個 Master 節點上的 JobManager Web 頁面:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以看到兩個頁面最后都轉到了同一個地址上,這個地址就是當前主 JobManager 所在機器,另一個就是 Standby JobManager。以上我們就完成了 Standalone 模式下 HA 的配置。
接下來我們可以測試驗證 HA 的有效性。當我們知道主 JobManager 的機器后,我們可以把主 JobManager 進程 Kill 掉,比如當前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個機器上,就把這個進程殺掉。
接着,再打開這兩個鏈接:
http://z05f06378.sqa.zth.tbsite.net:8081
http://z05c19426.sqa.zth.tbsite.net:8081
可以發現后一個鏈接已經不能展示了,而前一個鏈接可以展示,說明發生主備切換。
然后我們再重啟前一次的主 JobManager:
./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081
再打開 (http://z05c19426.sqa.zth.tbsite.net:8081) 這個鏈接,會發現現在這個鏈接可以轉到 (http://z05f06378.sqa.zth.tbsite.net:8081) 這個頁面上了。說明這個 JobManager 完成了一個 Failover Recovery。
6. 使用 Yarn 模式跑 Flink job
圖 5 Flink Yarn 部署流程圖
相對於 Standalone 模式,Yarn 模式允許 Flink job 的好處有:
-
資源按需使用,提高集群的資源利用率
-
任務有優先級,根據優先級運行作業
-
基於 Yarn 調度系統,能夠自動化地處理各個角色的 Failover
-
JobManager 進程和 TaskManager 進程都由 Yarn NodeManager 監控
-
如果 JobManager 進程異常退出,則 Yarn ResourceManager 會重新調度 JobManager 到其他機器
-
如果 TaskManager 進程異常退出,JobManager 會收到消息並重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager
(1)在 Yarn 上啟動 Long Running 的 Flink 集群(Session Cluster 模式)
查看命令參數:
./bin/yarn-session.sh -h
創建一個 Yarn 模式的 Flink 集群:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
其中用到的參數是:
-
-n,–container Number of TaskManagers
-
-jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
-
-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
-
-qu,–queue Specify YARN queue.
-
-s,–slots Number of slots per TaskManager
-
-t,–ship Ship files in the specified directory (t for transfer)
提交一個 Flink job 到 Flink 集群:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
這次提交 Flink job,雖然沒有指定對應 Yarn application 的信息,卻可以提交到對應的 Flink 集群,原因在於“/tmp/.yarn-properties-${user}”文件中保存了上一次創建 Yarn session 的集群信息。所以如果同一用戶在同一機器上再次創建一個 Yarn session,則這個文件會被覆蓋掉。
-
如果刪掉“/tmp/.yarn-properties-${user}”或者在另一個機器上提交作業能否提交到預期到 yarn session 中呢?
可以配置了“high-availability.cluster-id”參數,據此從 Zookeeper 上獲取到 JobManager 的地址和端口,從而提交作業。
-
如果 Yarn session 沒有配置 HA,又該如何提交呢?
這個時候就必須要在提交 Flink job 的命令中指明 Yarn 上的 Application ID,通過“-yid”參數傳入:
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
我們可以發現,每次跑完任務不久,TaskManager 就被釋放了,下次在提交任務的時候,TaskManager 又會重新拉起來。如果希望延長空閑 TaskManager 的超時時間,可以在 conf/flink-conf.yaml 文件中配置下面這個參數,單位是 milliseconds:
slotmanager.taskmanager-timeout: 30000L # deprecated, used in release-1.5
resourcemanager.taskmanager-timeout: 30000L
(2)在 Yarn 上運行單個 Flink job(Job Cluster 模式)
如果你只想運行單個 Flink Job 后就退出,那么可以用下面這個命令:
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
常用的配置有:
-
-yn,–yarncontainer Number of Task Managers
-
-yqu,–yarnqueue Specify YARN queue.
-
-ys,–yarnslots Number of slots per TaskManager
-
-yqu,–yarnqueue Specify YARN queue.
可以通過 Help 命令查看 Run 的可用參數:
./bin/flink run -h
我們可以看到,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“–yarn”為前綴的參數其實和“./bin/yarn-session.sh -h”命令是一一對應的,語義上也基本一致。
關於“-n”(在 yarn session 模式下)、“-yn”在(yarn single job 模式下)與“-p”參數的關系:
-
“-n”和“-yn”在社區版本中(Release-1.5 ~ Release-1.7)中沒有實際的控制作用,實際的資源是根據“-p”參數來申請的,並且 TM 使用完后就會歸還
-
在 Blink 的開源版本中,“-n”(在 Yarn Session 模式下)的作用就是一開始啟動指定數量的 TaskManager,之后即使 Job 需要更多的 Slot,也不會申請新的 TaskManager
-
在 Blink 的開源版本中,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數量,不設置 TaskManager 的上限。(需要特別注意的是,只有加上“-yd”參數才能用 Single job 模式(例如:命令“./bin/flink run -yd -m yarn-cluster xxx”)
7. Yarn 模式下的 HighAvailability 配置
首先要確保啟動 Yarn 集群用的“yarn-site.xml”文件中的這個配置,這個是 Yarn 集群級別 AM 重啟的上限。
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>100</value>
</property>
然后在 conf/flink-conf.yaml 文件中配置這個 Flink job 的 JobManager 能夠重啟的次數。
yarn.application-attempts: 10 # 1+ 9 retries
最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相關配置,這幾個配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示。
# 配置 high-availability mode
high-availability: zookeeper
# 配置 zookeeper quorum(hostname 和端口需要依據對應 zk 的實際配置)
high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181
# (可選)設置 zookeeper 的 root 目錄
high-availability.zookeeper.path.root: /test_dir/test_standalone2_root
# 刪除這個配置
# high-availability.cluster-id: /test_dir/test_standalone2
# JobManager 的 meta 信息放在 dfs,在 zk 上主要會保存一個指向 dfs 路徑的指針
high-availability.storageDir: hdfs:///test_dir/recovery2/
需要特別注意的是:“high-availability.cluster-id”這個配置最好去掉,因為在 Yarn(以及 Mesos)模式下,cluster-id 如果不配置的話,會配置成 Yarn 上的 Application ID ,從而可以保證唯一性。