1. 概述
Flink采用的穩定版本為flink-1.12.1。以往我們所熟知的Map Reduce,Storm,Spark等框架可能在某些場景下已經沒法完全地滿足用戶的需求,或者是實現需求所付出的代價,無論是代碼量和架構的復雜程度可能都沒法滿足預期的需求。新場景的出現催產出新的技術,Flink即為實時流提供了新的選擇。Flink相對簡單的編程模型加上其高吞吐、低延遲、高性能以及支持exactly-once語義的特性,讓它在工業生產中較為出眾。
2. Flink Standalone HA安裝
2.1 環境准備
軟件配置選擇如下:
- jdk:jdk-8u144-linux-x64.tar.gz
- zookeeper:zookeeper-3.4.5.tar.gz
- hadoop:hadoop-2.7.3.tar.gz
- flink:flink-1.12.1-bin-scala_2.11.tgz
2.2 安裝規划
3台centos7機器如下:
ip | 主機名 | flink角色 |
192.168.1.105 | node1 | StandaloneSessionClusterEntrypoint、TaskManagerRunner |
192.168.1.106 | node2 | StandaloneSessionClusterEntrypoint、TaskManagerRunner |
192.168.1.107 | node3 | TaskManagerRunner |
StandaloneSessionClusterEntrypoint為jobmnager,taskManagerRunner為taskManager。
在Flink運行時涉及到的進程主要有以下兩個:
- JobManager:主要負責調度task,協調checkpoint已經錯誤恢復等。當客戶端將打包好的任務提交到JobManager之后,JobManager就會根據注冊的TaskManager資源信息將任務分配給有資源的TaskManager,然后啟動運行任務。
- TaskManager:執行數據流的task,一個task通過設置並行度,可能會有多個subtask。每個TaskManager都是作為一個獨立的JVM進程運行的,他主要負責在獨立的線程執行的operator。其中能執行多少個operator取決於每個taskManager指定的slots數量。Task slot是Flink中最小的資源單位。假如一個taskManager有3個slot,他就會給每個slot分配1/3的內存資源,目前slot不會對cpu進行隔離。同一個taskManager中的slot會共享網絡資源和心跳信息。
2.3 安裝
首先選定一台機器節點,解壓flink-1.12.1-bin-scala_2.11.tgz,並進入conf目錄修改flink-conf.yaml和master文件,修改完畢之后把flink安裝包往其他兩台機器復制
2.3.1 配置flink-conf.yaml
jobmanager.rpc.address: node1 jobmanager.rpc.port: 6123 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m taskmanager.numberOfTaskSlots: 1 parallelism.default: 1 high-availability: zookeeper high-availability.storageDir: hdfs://node/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 high-availability.zookeeper.path.root: /flink state.backend: filesystem state.checkpoints.dir: hdfs://node/flink/checkpoints state.savepoints.dir: hdfs://node/flink/checkpoints jobmanager.execution.failover-strategy: region io.tmp.dirs: /tmp/flink/tmp historyserver.web.address: 0.0.0.0 historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://node/flink/completed-jobs/ historyserver.archive.fs.refresh-interval: 10000
注:
紅色字體即為需要修改的內容,node1為每台節點的hostname,每台節點需要適應變動。
2.3.2 配置masters
這里配置node1和node2為高可用節點
[root@node1 conf]# vim masters node1:8081 node2:8082
2.3.3 把flink安裝包往其余兩台
[root@node1 app]# scp flink-1.12.1/ root@node2:/opt/app [root@node1 app]# scp flink-1.12.1/ root@node2:/opt/app
2.3.4 啟動集群
啟動順序:先啟動zk和hdfs,再啟動flink
啟動集群:
start-cluster.sh
2.3.5 查看Flink webUI
在瀏覽器輸入:http://node1:8081/或http://node1:8082/,即可看到flink任務執行情況
3. Flink On Yarn安裝
3.1 軟件配置選擇
JDK:1.8 (jdk1.8.0_151)
Hadoop:2.7.6 (hadoop-2.7.6.tar.gz)
HBase:2.1.2 (hbase-2.1.1-bin.tar.gz)
這里jdk、hadoop、zookeeper假定均已安裝完畢。
3.2 安裝
3.2.1 配置flink-conf.yaml文件
high-availability: zookeeper high-availability.storageDir: hdfs://node/flink/ha/ high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 high-availability.zookeeper.path.root: /flink # high-availability.cluster-id: /cluster_one on yarn不配置,配了任務會啟動不來
3.2.2 配置FLINK_HOME和HADOOP_CLASSPATH環境變量
vi /etc/profile 或者 vi /etc/bashrc
export FLINK_HOME=/home/redpeak/app/flink-1.12.3 export PATH=$FLINK_HOME/bin:$PATH export HADOOP_CLASSPATH=`hadoop classpath`
需要在每台flink節點配置FLINK_HOME和HADOOP_CLASSPATH環境變量
3.3 flink on yarn三種部署模式
3.3.1 Session模式
Session模式是預分配資源的,也就是提前根據指定的資源參數初始化一個Flink集群,並常駐在YARN系統中,擁有固定數量的JobManager和TaskManager(注意JobManager只有一個)。提交到這個集群的作業可以直接運行,免去每次分配資源的overhead。但是Session的資源總量有限,多個作業之間又不是隔離的,故可能會造成資源的爭用;如果有一個TaskManager宕機,它上面承載着的所有作業也都會失敗。另外,啟動的作業越多,JobManager的負載也就越大。所以,Session模式一般用來部署那些對延遲非常敏感但運行時長較短的作業。
提交任務命令:
./bin/flink run -t yarn-session \ -Dyarn.application.id=application_XXXX_YY \ ./examples/streaming/TopSpeedWindowing.jar
再次連接YARN session命令:
./bin/yarn-session.sh -id application_XXXX_YY
3.3.2 Per_Job模式
顧名思義,在Per-Job模式下,每個提交到YARN上的作業會各自形成單獨的Flink集群,擁有專屬的JobManager和TaskManager。可見,以Per-Job模式提交作業的啟動延遲可能會較高,但是作業之間的資源完全隔離,一個作業的TaskManager失敗不會影響其他作業的運行,JobManager的負載也是分散開的,不存在單點問題。當作業運行完成,與它關聯的集群也就被銷毀,資源被釋放。所以,Per-Job模式一般用來部署那些長時間運行的作業。
提交任務命令:
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
注:--detached,以分離模式運行作業,detached模式在提交完任務后就退出client
任務列表和取消任務命令:
# List running job on the cluster ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY # Cancel running job ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
3.3.3 Session與Per_Job模式帶來的問題
Session模式和Per_Job模式可以用如下簡圖表示,其中紅色、藍色和綠色的圖代表不同的作業。
Deployer表示向YARN集群發起部署請求的節點,一般來講在生產環境中,也總有這樣一個節點作為所有作業的提交入口(即客戶端)。在main()方法開始執行直到env.execute()方法之前,客戶端也需要做一些工作,即:
- 獲取作業所需的依賴項;
- 通過執行環境分析並取得邏輯計划,即StreamGraph->JobGraph;
- 將依賴項和JobGraph上傳到集群中。
只有在這些都完成之后,才會通過env.execute()方法觸發Flink運行時真正地開始執行作業。如果所有用戶都在Deployer上提交作業,較大的依賴會消耗更多的帶寬,而較復雜的作業邏輯翻譯成JobGraph也需要吃掉更多的CPU和內存,客戶端的資源反而會成為瓶頸。不管Session還是Per-Job模式都存在此問題。為了解決,社區在傳統部署模式的基礎上實現了Application模式。
3.3.4 Application模式
此模式的作業提交框圖如下:
可見,原本需要客戶端做的三件事被轉移到了JobManager里,也就是說main()方法在集群中執行(入口點位於ApplicationClusterEntryPoint),Deployer只需要負責發起部署請求了。另外如果一個main()方法中有多個env.execute/executeAsync()調用,在Application模式下,這些作業會被視為同一個應用,在同一個集群中執行(如果在Per-Job模式下,就會啟動多個集群)。可見,Application模式本質上是Session和Per-Job模式的折衷。
用Application模式提交作業的示例命令如下:
bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=10 \ -Dyarn.application.name="MyFlinkApp" \ /path/to/my/flink-app/MyFlinkApp.jar
注:-t參數用來指定部署目標,目前支持YARN(yarn-application)和K8S(kubernetes-application)。-D參數則用來指定與作業相關的各項參數。
查看集群的任務Job列表以及取消任務命令,如下:
# List running job on the cluster ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY # Cancel running job ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
那么如何解決傳輸依賴項造成的帶寬占用問題呢?Flink作業必須的依賴是發行版flink-dist.jar,還有擴展卡(位於\$FLINK_HOME/lib)和插件庫(位於\$FLINK_HOME/plugin),我們將它們預先上傳到像HDFS這樣的共享存儲,再通過yarn.provided.lib.dirs參數指定存儲的路徑即可。
-Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-common-deps/lib;hdfs://myhdfs/flink-common-deps/plugins"
這樣所有作業就不必各自上傳依賴,可以直接從HDFS拉取,並且YARN NodeManager也會緩存這些依賴,進一步加快作業的提交過程。同理,包含Flink作業的用戶JAR包也可以上傳到HDFS,並指定遠程路徑進行提交。
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" hdfs://myhdfs/jars/my-application.jar
4. 總結
【參考資料】
https://www.sohu.com/a/406387061_100109711
https://zhuanlan.zhihu.com/p/354511839
https://www.jianshu.com/p/90d9f1f24937