Apache Flink 開發環境搭建和應用的配置、部署及運行


https://mp.weixin.qq.com/s/noD2Jv6m-somEMtjWTJh3w

本文是根據 Apache Flink 系列直播課程整理而成,由阿里巴巴高級開發工程師沙晟陽分享,主要面向於初次接觸 Flink、或者對 Flink 有了解但是沒有實際操作過的同學。希望幫助大家更順利地上手使用 Flink,並着手相關開發調試工作。

 

主要內容:

  • Flink 開發環境的部署和配置

  • 運行 Flink 應用

  • 單機 Standalone 模式

  • 多機 Standalone 模式

  • Yarn 集群模式

 

一. Flink 開發環境部署和配置

 

 

Flink 是一個以 Java 及 Scala 作為開發語言的開源大數據項目,代碼開源在 GitHub 上,並使用 Maven 來編譯和構建項目。對於大部分使用 Flink 的同學來說,Java、Maven 和 Git 這三個工具是必不可少的,另外一個強大的 IDE 有助於我們更快的閱讀代碼、開發新功能以及修復 Bug。因為篇幅所限,我們不會詳述每個工具的安裝細節,但會給出必要的安裝建議。

 

關於開發測試環境,Mac OS、Linux 系統或者 Windows 都可以。如果使用的是 Windows 10 系統,建議使用 Windows 10 系統的 Linux 子系統來編譯和運行。

 

工具 注釋
Java Java 版本至少是 Java 8,且最好選用 Java 8u51 及以上版本
Maven 必須使用 Maven 3,建議使用 Maven 3.2.5。Maven 3.3.x 能夠編譯成功,但是在 Shade 一些 Dependencies 的過程中有些問題
Git Flink 的代碼倉庫是: https://github.com/apache/flink

 

建議選用社區已發布的穩定分支,比如 Release-1.6 或者 Release-1.7。

 

1. 編譯 Flink 代碼

 

在我們配置好之前的幾個工具后,編譯 Flink 就非常簡單了,執行如下命令即可:

 

mvn clean install -DskipTests# 或者mvn clean package -DskipTests 

 

常用編譯參數:

 

 

Dfast  主要是忽略QA plugins和JavaDocs的編譯Dhadoop.version=2.6.1  指定hadoop版本settings=${maven_file_path}  顯式指定maven settings.xml配置文件 

 

當成功編譯完成后,能在當前 Flink 代碼目錄下的 flink-dist/target/子目錄 中看到如下文件(不同的 Flink 代碼分支編譯出的版本號不同,這里的版本號是 Flink 1.5.1):

 

 

其中有三個文件可以留意一下:

 

版本 注釋
flink-1.5.1.tar.gz Binary 的壓縮包
flink-1.5.1-bin/flink-1.5.1 解壓后的 Flink binary 目錄
flink-dist_2.11-1.5.1.jar 包含 Flink 核心功能的 jar 包

 

 

注意: 國內用戶在編譯時可能遇到編譯失敗“Build Failure”(且有 MapR 相關報錯),一般都和 MapR 相關依賴的下載失敗有關,即使使用了推薦的 settings.xml 配置(其中 Aliyun Maven 源專門為 MapR 相關依賴做了代理),還是可能出現下載失敗的情況。問題主要和 MapR 的 Jar 包比較大有關。遇到這些問題時,重試即可。在重試之前,要先根據失敗信息刪除 Maven local repository 中對應的目錄,否則需要等待 Maven 下載的超時時間才能再次出發下載依賴到本地。

 

2. 開發環境准備

 

推薦使用 IntelliJ IDEA IDE 作為 Flink 的 IDE 工具。官方不建議使用 Eclipse IDE,主要原因是 Eclipse 的 Scala IDE 和 Flink 用 Scala 的不兼容。

 

如果你需要做一些 Flink 代碼的開發工作,則需要根據 Flink 代碼的 tools/maven/目錄 下的配置文件來配置 Checkstyle ,因為 Flink 在編譯時會強制代碼風格的檢查,如果代碼風格不符合規范,可能會直接編譯失敗。 

 

二、運行 Flink 應用

 

1. 基本概念

 

運行 Flink 應用其實非常簡單,但是在運行 Flink 應用之前,還是有必要了解 Flink 運行時的各個組件,因為這涉及到 Flink 應用的配置問題。圖 1 所示,這是用戶用 DataStream API 寫的一個數據處理程序。可以看到,在一個 DAG 圖中不能被 Chain 在一起的 Operator 會被分隔到不同的 Task 中,也就是說 Task 是 Flink 中資源調度的最小單位。

 

圖 1 Parallel Dataflows

 

圖 2 所示,Flink 實際運行時包括兩類進程:

 

  • JobManager(又稱為 JobMaster):協調 Task 的分布式執行,包括調度 Task、協調創 Checkpoint 以及當 Job failover 時協調各個 Task 從 Checkpoint 恢復等。

  • TaskManager(又稱為 Worker):執行 Dataflow 中的 Tasks,包括內存 Buffer 的分配、Data Stream 的傳遞等。

 

 

圖 2 Flink Runtime 架構圖

 

圖 3 所示,Task Slot 是一個 TaskManager 中的最小資源分配單位,一個 TaskManager 中有多少個 Task Slot 就意味着能支持多少並發的 Task 處理。需要注意的是,一個 Task Slot 中可以執行多個 Operator,一般這些 Operator 是能被 Chain 在一起處理的。

 

圖 3 Process

 

2. 運行環境准備

 

  • 准備 Flink binary 

  • 直接從 Flink 官網上下載 Flink binary 的壓縮包 

  • 或者從 Flink 源碼編譯而來

  • 安裝 Java,並配置 JAVA_HOME 環境變量

 

3. 單機 Standalone 的方式運行 Flink

 

(1)基本的啟動流程

 

最簡單的運行 Flink 應用的方法就是以單機 Standalone 的方式運行。

 

啟動集群:

 

./bin/start-cluster.sh

 

打開 http://127.0.0.1:8081/ 就能看到 Flink 的 Web 界面。嘗試提交 Word Count 任務:

 

./bin/flink run examples/streaming/WordCount.jar

 

大家可以自行探索 Web 界面中展示的信息,比如,我們可以看看 TaskManager 的 stdout 日志,就可以看到 Word Count 的計算結果。

 

我們還可以嘗試通過“–input”參數指定我們自己的本地文件作為輸入,然后執行:

 

./bin/flink run examples/streaming/WordCount.jar --input ${your_source_file}

 

停止集群:

 

./bin/stop-cluster.sh

 

(2)常用配置介紹

 

  • conf / slaves

 

conf / slaves 用於配置 TaskManager 的部署,默認配置下只會啟動一個 TaskManager 進程,如果想增加一個 TaskManager 進程的,只需要文件中追加一行“localhost”。

 

也可以直接通過“ ./bin/taskmanager.sh start ”這個命令來追加一個新的 TaskManager:

 

 

./bin/taskmanager.sh start|start-foreground|stop|stop-allconf/flink-conf.yaml

 

  • conf/flink-conf.yaml 

 

用於配置 JM 和 TM 的運行參數,常用配置有:

 

 

# The heap size for the JobManager JVMjobmanager.heap.mb: 1024
# The heap size for the TaskManager JVMtaskmanager.heap.mb: 1024
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.taskmanager.numberOfTaskSlots: 4# the managed memory size for each task manager.taskmanager.managed.memory.size: 256

 

Standalone 集群啟動后,我們可以嘗試分析一下 Flink 相關進程的運行情況。執行 jps 命令,可以看到 Flink 相關的進程主要有兩個,一個是 JobManager 進程,另一個是 TaskManager 進程。我們可以進一步用 ps 命令看看進程的啟動參數中“-Xmx”和“-Xms”的配置。然后我們可以嘗試修改 flink-conf.yaml 中若干配置,然后重啟 Standalone 集群看看發生了什么變化。

 

需要補充的是,在 Blink 開源分支上,TaskManager 的內存計算上相對於現在的社區版本要更精細化,TaskManager 進程的堆內存限制(-Xmx)一般的計算方法是:

 

TotalHeapMemory = taskmanager.heap.mb + taskmanager.managed.memory.size + taskmanager.process.heap.memory.mb(默認值為128MB)

 

而最新的 Flink 社區版本 Release-1.7 中 JobManager 和 TaskManager 默認內存配置方式為:

 

 

# The heap size for the JobManager JVMjobmanager.heap.size: 1024m
# The heap size for the TaskManager JVMtaskmanager.heap.size: 1024m

 

Flink 社區 Release-1.7 版本中的“taskmanager.heap.size”配置實際上指的不是 Java heap 的內存限制,而是 TaskManager 進程總的內存限制。我們可以同樣用上述方法查看 Release-1.7 版本的 Flink binary 啟動的 TaskManager 進程的 -Xmx 配置,會發現實際進程上的 -Xmx 要小於配置的“taskmanager.heap.size”的值,原因在於從中扣除了 Network buffer 用的內存,因為 Network buffer 用的內存一定是 Direct memory,所以不應該算在堆內存限制中。

 

(3)日志的查看和配置

 

JobManager 和 TaskManager 的啟動日志可以在 Flink binary 目錄下的 Log 子目錄中找到。Log 目錄中以“flink-user−standalonesession−{id}-${hostname}”為前綴的文件對應的是 JobManager 的輸出,其中有三個文件:

 

  • flink-user−standalonesession−{id}-${hostname}.log:代碼中的日志輸出

  • flink-user−standalonesession−{id}-${hostname}.out:進程執行時的 stdout 輸出

  • flink-user−standalonesession−{id}-${hostname}-gc.log:JVM 的 GC 的日志

 

Log 目錄中以“flink-user−taskexecutor−{id}-${hostname}”為前綴的文件對應的是 TaskManager 的輸出,也包括三個文件,和 JobManager 的輸出一致。

日志的配置文件在 Flink binary 目錄的 conf 子目錄下,其中:

 

  • log4j-cli.properties:用 Flink 命令行時用的 log 配置,比如執行“ flink run”命令

  • log4j-yarn-session.properties:用 yarn-session.sh 啟動時命令行執行時用的 log 配置

  • log4j.properties:無論是 Standalone 還是 Yarn 模式,JobManager 和 TaskManager 上用的 log 配置都是 log4j.properties

 

這三個“log4j.*properties”文件分別有三個“logback.*xml”文件與之對應,如果想使用 Logback 的同學,之需要把與之對應的“log4j.*properties”文件刪掉即可,對應關系如下:

 

  • log4j-cli.properties -> logback-console.xml

  • log4j-yarn-session.properties -> logback-yarn.xml

  • log4j.properties -> logback.xml

 

需要注意的是,“flink-user−standalonesession−{id}-hostname”和“flink−{user}-taskexecutor-id−{hostname}”都帶有“id”,“{id}”表示本進程在本機上該角色(JobManager 或 TaskManager)的所有進程中的啟動順序,默認從 0 開始。

 

(4)進一步探索

 

嘗試重復執行“./bin/start-cluster.sh”命令,然后看看 Web 頁面(或者執行 jps 命令),看看會發生什么?可以嘗試看看啟動腳本,分析一下原因。接着可以重復執行“./bin/stop-cluster.sh”,每次執行完后,看看會發生什么。

 

4. 多機部署 Flink Standalone 集群

 

部署前要注意的要點:

 

  • 每台機器上配置好 Java 以及 JAVA_HOME 環境變量

  • 每台機器上部署的 Flink binary 的目錄要保證是同一個目錄

  • 如果需要用 HDFS,需要配置 HADOOP_CONF_DIR 環境變量配置

 

根據你的集群信息修改 conf/masters 和 conf/slaves 配置。

 

修改 conf/flink-conf.yaml 配置,注意要確保和 Masters 文件中的地址一致:

 

jobmanager.rpc.address: z05f06378.sqa.zth.tbsite.net

 

確保所有機器的 Flink binary 目錄中 conf 中的配置文件相同,特別是以下三個:

 

 

conf/mastersconf/slavesconf/flink-conf.yaml

 

然后啟動 Flink 集群:

 

./bin/start-cluster.sh

 

提交 WordCount 作業:

 

./bin/flink run examples/streaming/WordCount.jar

 

上傳 WordCount 的 Input 文件:

 

hdfs dfs -copyFromLocal story /test_dir/input_dir/story

 

提交讀寫 HDFS 的 WordCount 作業:

 

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

 

增加 WordCount 作業的並發度(注意輸出文件重名會提交失敗):

 

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output --parallelism 20

 

5. Standalone 模式的 HighAvailability(HA)部署和配置

 

通過圖 2 Flink Runtime 架構圖,我們可以看到 JobManager 是整個系統中最可能導致系統不可用的角色。如果一個 TaskManager 掛了,在資源足夠的情況下,只需要把相關 Task 調度到其他空閑 TaskSlot 上,然后 Job 從 Checkpoint 中恢復即可。而如果當前集群中只配置了一個 JobManager,則一旦 JobManager 掛了,就必須等待這個 JobManager 重新恢復,如果恢復時間過長,就可能導致整個 Job 失敗。

 

因此如果在生產業務使用 Standalone 模式,則需要部署配置 HighAvailability,這樣同時可以有多個 JobManager 待命,從而使得 JobManager 能夠持續服務。

 

圖 4 Flink JobManager HA 示意圖

 

注意:

  • 如果想使用 Flink standalone HA 模式,需要確保基於 Flink Release-1.6.1 及以上版本,因為這里社區有個 bug 會導致這個模式下主 JobManager 不能正常工作。

  • 接下來的實驗中需要用到 HDFS,所以需要下載帶有 Hadoop 支持的 Flink Binary 包。

 

(1)(可選)使用 Flink 自帶的腳本部署 Zookeeper

 

Flink 目前支持基於 Zookeeper 的 HA。如果你的集群中沒有部署 ZK,Flink 提供了啟動 Zookeeper 集群的腳本。首先修改配置文件“conf/zoo.cfg”,根據你要部署的 Zookeeper Server 的機器數來配置“server.X=addressX:peerPort:leaderPort”,其中“X”是一個 Zookeeper Server 的唯一 ID,且必須是數字。

 

 

# The port at which the clients will connectclientPort=3181server.1=z05f06378.sqa.zth.tbsite.net:4888:5888server.2=z05c19426.sqa.zth.tbsite.net:4888:5888server.3=z05f10219.sqa.zth.tbsite.net:4888:5888

 

然后啟動 Zookeeper:

 

./bin/start-zookeeper-quorum.sh

 

jps 命令看到 Zookeeper 進程已經啟動:

 

 

停掉 Zookeeper 集群的命令:

 

./bin/stop-zookeeper-quorum.sh

 

(2)修改 Flink Standalone 集群的配置

 

修改 conf/masters 文件,增加一個 JobManager:

 

 

$cat conf/mastersz05f06378.sqa.zth.tbsite.net:8081z05c19426.sqa.zth.tbsite.net:8081

 

之前修改過的 conf/slaves 文件保持不變:

 

 

$cat conf/slavesz05f06378.sqa.zth.tbsite.netz05c19426.sqa.zth.tbsite.netz05f10219.sqa.zth.tbsite.net

 

修改 conf/flink-conf.yaml 文件:

 

 

# 配置 high-availability modehigh-availability: zookeeper# 配置 zookeeper quorum(hostname 和端口需要依據對應 zk 的實際配置)high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181# (可選)設置 zookeeper 的 root 目錄high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# (可選)相當於是這個 standalone 集群中創建的 zk node 的 namespacehigh-availability.cluster-id: /test_dir/test_standalone2# JobManager 的 meta 信息放在 dfs,在 zk 上主要會保存一個指向 dfs 路徑的指針high-availability.storageDir: hdfs:///test_dir/recovery2/

 

需要注意的是,在 HA 模式下 conf/flink-conf.yaml 中的這兩個配置都失效了(想想為什么)。

 

 

jobmanager.rpc.addressjobmanager.rpc.port

 

修改完成后,確保配置同步到其他機器。

 

啟動 Zookeeper 集群:

 

./bin/start-zookeeper-quorum.sh

 

再啟動 Standalone 集群(要確保之前的 Standalone 集群已經停掉):

 

./bin/start-cluster.sh

 

分別打開兩個 Master 節點上的 JobManager Web 頁面:

 

http://z05f06378.sqa.zth.tbsite.net:8081

http://z05c19426.sqa.zth.tbsite.net:8081

 

可以看到兩個頁面最后都轉到了同一個地址上,這個地址就是當前主 JobManager 所在機器,另一個就是 Standby JobManager。以上我們就完成了 Standalone 模式下 HA 的配置。

 

接下來我們可以測試驗證 HA 的有效性。當我們知道主 JobManager 的機器后,我們可以把主 JobManager 進程 Kill 掉,比如當前主 JobManager 在 z05c19426.sqa.zth.tbsite.net 這個機器上,就把這個進程殺掉。

 

接着,再打開這兩個鏈接:

 

http://z05f06378.sqa.zth.tbsite.net:8081

http://z05c19426.sqa.zth.tbsite.net:8081

 

可以發現后一個鏈接已經不能展示了,而前一個鏈接可以展示,說明發生主備切換。

 

然后我們再重啟前一次的主 JobManager:

 

./bin/jobmanager.sh start z05c19426.sqa.zth.tbsite.net 8081

 

再打開 (http://z05c19426.sqa.zth.tbsite.net:8081) 這個鏈接,會發現現在這個鏈接可以轉到 (http://z05f06378.sqa.zth.tbsite.net:8081) 這個頁面上了。說明這個 JobManager 完成了一個 Failover Recovery。

 

6. 使用 Yarn 模式跑 Flink job

 

圖 5 Flink Yarn 部署流程圖

 

相對於 Standalone 模式,Yarn 模式允許 Flink job 的好處有:

 

  • 資源按需使用,提高集群的資源利用率

  • 任務有優先級,根據優先級運行作業

  • 基於 Yarn 調度系統,能夠自動化地處理各個角色的 Failover

  • JobManager 進程和 TaskManager 進程都由 Yarn NodeManager 監控

  • 如果 JobManager 進程異常退出,則 Yarn ResourceManager 會重新調度 JobManager 到其他機器

  • 如果 TaskManager 進程異常退出,JobManager 會收到消息並重新向 Yarn ResourceManager 申請資源,重新啟動 TaskManager

 

(1)在 Yarn 上啟動 Long Running 的 Flink 集群(Session Cluster 模式)

 

查看命令參數:

 

 

./bin/yarn-session.sh -h

 

創建一個 Yarn 模式的 Flink 集群:

 

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

 

其中用到的參數是:

 

  • -n,–container Number of TaskManagers

  • -jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)

  • -tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)

  • -qu,–queue Specify YARN queue.

  • -s,–slots Number of slots per TaskManager

  • -t,–ship Ship files in the specified directory (t for transfer)

 

提交一個 Flink job 到 Flink 集群:

 

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

 

這次提交 Flink job,雖然沒有指定對應 Yarn application 的信息,卻可以提交到對應的 Flink 集群,原因在於“/tmp/.yarn-properties-${user}”文件中保存了上一次創建 Yarn session 的集群信息。所以如果同一用戶在同一機器上再次創建一個 Yarn session,則這個文件會被覆蓋掉。

 

  • 如果刪掉“/tmp/.yarn-properties-${user}”或者在另一個機器上提交作業能否提交到預期到 yarn session 中呢?

    可以配置了“high-availability.cluster-id”參數,據此從 Zookeeper 上獲取到 JobManager 的地址和端口,從而提交作業。

  • 如果 Yarn session 沒有配置 HA,又該如何提交呢?

 

這個時候就必須要在提交 Flink job 的命令中指明 Yarn 上的 Application ID,通過“-yid”參數傳入:

 

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

 

我們可以發現,每次跑完任務不久,TaskManager 就被釋放了,下次在提交任務的時候,TaskManager 又會重新拉起來。如果希望延長空閑 TaskManager 的超時時間,可以在 conf/flink-conf.yaml 文件中配置下面這個參數,單位是 milliseconds:

 

slotmanager.taskmanager-timeout: 30000L         # deprecated, used in release-1.5resourcemanager.taskmanager-timeout: 30000L

 

(2)在 Yarn 上運行單個 Flink job(Job Cluster 模式)

 

如果你只想運行單個 Flink Job 后就退出,那么可以用下面這個命令:

 

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

 

常用的配置有:

 

  • -yn,–yarncontainer Number of Task Managers

  • -yqu,–yarnqueue Specify YARN queue.

  • -ys,–yarnslots Number of slots per TaskManager

  • -yqu,–yarnqueue Specify YARN queue.

 

可以通過 Help 命令查看 Run 的可用參數:

 

 

./bin/flink run -h

 

我們可以看到,“./bin/flink run -h”看到的“Options for yarn-cluster mode”中的“-y”和“–yarn”為前綴的參數其實和“./bin/yarn-session.sh -h”命令是一一對應的,語義上也基本一致。

 

關於“-n”(在 yarn session 模式下)、“-yn”在(yarn single job 模式下)與“-p”參數的關系:

 

  • “-n”和“-yn”在社區版本中(Release-1.5 ~ Release-1.7)中沒有實際的控制作用,實際的資源是根據“-p”參數來申請的,並且 TM 使用完后就會歸還

  • 在 Blink 的開源版本中,“-n”(在 Yarn Session 模式下)的作用就是一開始啟動指定數量的 TaskManager,之后即使 Job 需要更多的 Slot,也不會申請新的 TaskManager

  • 在 Blink 的開源版本中,Yarn single job 模式“-yn”表示的是初始 TaskManager 的數量,不設置 TaskManager 的上限。(需要特別注意的是,只有加上“-yd”參數才能用 Single job 模式(例如:命令“./bin/flink run -yd -m yarn-cluster xxx”)

 

7. Yarn 模式下的 HighAvailability 配置

 

首先要確保啟動 Yarn 集群用的“yarn-site.xml”文件中的這個配置,這個是 Yarn 集群級別 AM 重啟的上限。

 

 <property> <name>yarn.resourcemanager.am.max-attempts</name> <value>100</value> </property>

 

然后在 conf/flink-conf.yaml 文件中配置這個 Flink job 的 JobManager 能夠重啟的次數。

 

yarn.application-attempts: 10 # 1+ 9 retries

 

最后再在 conf/flink-conf.yaml 文件中配置上 ZK 相關配置,這幾個配置的配置方法和 Standalone 的 HA 配置方法基本一致,如下所示。

 

 

# 配置 high-availability modehigh-availability: zookeeper# 配置 zookeeper quorum(hostname 和端口需要依據對應 zk 的實際配置)high-availability.zookeeper.quorum: z05f02321.sqa.zth.tbsite.net:2181,z05f10215.sqa.zth.tbsite.net:2181# (可選)設置 zookeeper 的 root 目錄high-availability.zookeeper.path.root: /test_dir/test_standalone2_root# 刪除這個配置# high-availability.cluster-id: /test_dir/test_standalone2# JobManager 的 meta 信息放在 dfs,在 zk 上主要會保存一個指向 dfs 路徑的指針high-availability.storageDir: hdfs:///test_dir/recovery2/

 

需要特別注意的是:“high-availability.cluster-id”這個配置最好去掉,因為在 Yarn(以及 Mesos)模式下,cluster-id 如果不配置的話,會配置成 Yarn 上的 Application ID ,從而可以保證唯一性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM