Flink安裝部署


1. 概述

   Flink采用的穩定版本為flink-1.12.1。以往我們所熟知的Map Reduce,Storm,Spark等框架可能在某些場景下已經沒法完全地滿足用戶的需求,或者是實現需求所付出的代價,無論是代碼量和架構的復雜程度可能都沒法滿足預期的需求。新場景的出現催產出新的技術,Flink即為實時流提供了新的選擇。Flink相對簡單的編程模型加上其高吞吐、低延遲、高性能以及支持exactly-once語義的特性,讓它在工業生產中較為出眾。

2. Flink Standalone HA安裝

2.1 環境准備

  軟件配置選擇如下:

  • jdk:jdk-8u144-linux-x64.tar.gz
  • zookeeper:zookeeper-3.4.5.tar.gz
  • hadoop:hadoop-2.7.3.tar.gz
  • flink:flink-1.12.1-bin-scala_2.11.tgz

2.2 安裝規划

  3台centos7機器如下:

ip 主機名 flink角色
192.168.1.105 node1 StandaloneSessionClusterEntrypoint、TaskManagerRunner
192.168.1.106 node2 StandaloneSessionClusterEntrypoint、TaskManagerRunner
192.168.1.107 node3 TaskManagerRunner

   StandaloneSessionClusterEntrypoint為jobmnager,taskManagerRunner為taskManager。

  在Flink運行時涉及到的進程主要有以下兩個:

  • JobManager:主要負責調度task,協調checkpoint已經錯誤恢復等。當客戶端將打包好的任務提交到JobManager之后,JobManager就會根據注冊的TaskManager資源信息將任務分配給有資源的TaskManager,然后啟動運行任務。
  • TaskManager:執行數據流的task,一個task通過設置並行度,可能會有多個subtask。每個TaskManager都是作為一個獨立的JVM進程運行的,他主要負責在獨立的線程執行的operator。其中能執行多少個operator取決於每個taskManager指定的slots數量。Task slot是Flink中最小的資源單位。假如一個taskManager有3個slot,他就會給每個slot分配1/3的內存資源,目前slot不會對cpu進行隔離。同一個taskManager中的slot會共享網絡資源和心跳信息。

2.3 安裝 

  首先選定一台機器節點,解壓flink-1.12.1-bin-scala_2.11.tgz,並進入conf目錄修改flink-conf.yaml和master文件,修改完畢之后把flink安裝包往其他兩台機器復制

2.3.1 配置flink-conf.yaml

jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir: hdfs://node/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
state.backend: filesystem
state.checkpoints.dir: hdfs://node/flink/checkpoints
state.savepoints.dir: hdfs://node/flink/checkpoints
jobmanager.execution.failover-strategy: region
io.tmp.dirs: /tmp/flink/tmp
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://node/flink/completed-jobs/
historyserver.archive.fs.refresh-interval: 10000

  注:

    紅色字體即為需要修改的內容,node1為每台節點的hostname,每台節點需要適應變動。

2.3.2 配置masters

  這里配置node1和node2為高可用節點

[root@node1 conf]# vim masters 

node1:8081
node2:8082

2.3.3 把flink安裝包往其余兩台

[root@node1 app]# scp flink-1.12.1/ root@node2:/opt/app
[root@node1 app]# scp flink-1.12.1/ root@node2:/opt/app

2.3.4 啟動集群

  啟動順序:先啟動zk和hdfs,再啟動flink

  啟動集群:

start-cluster.sh

2.3.5 查看Flink webUI

  在瀏覽器輸入:http://node1:8081/或http://node1:8082/,即可看到flink任務執行情況

3. Flink On Yarn安裝

3.1 軟件配置選擇

JDK:1.8 (jdk1.8.0_151)
Hadoop:2.7.6 (hadoop-2.7.6.tar.gz)
HBase:2.1.2 (hbase-2.1.1-bin.tar.gz)

這里jdk、hadoop、zookeeper假定均已安裝完畢。

3.2 安裝 

3.2.1 配置flink-conf.yaml文件

high-availability: zookeeper
high-availability.storageDir: hdfs://node/flink/ha/
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
high-availability.zookeeper.path.root: /flink
# high-availability.cluster-id: /cluster_one       on yarn不配置,配了任務會啟動不來

3.2.2 配置FLINK_HOME和HADOOP_CLASSPATH環境變量

vi /etc/profile 或者 vi /etc/bashrc

export FLINK_HOME=/home/redpeak/app/flink-1.12.3 export PATH=$FLINK_HOME/bin:$PATH export HADOOP_CLASSPATH=`hadoop classpath`

  需要在每台flink節點配置FLINK_HOME和HADOOP_CLASSPATH環境變量

3.3 flink on yarn三種部署模式

3.3.1 Session模式

   Session模式是預分配資源的,也就是提前根據指定的資源參數初始化一個Flink集群,並常駐在YARN系統中,擁有固定數量的JobManager和TaskManager(注意JobManager只有一個)。提交到這個集群的作業可以直接運行,免去每次分配資源的overhead。但是Session的資源總量有限,多個作業之間又不是隔離的,故可能會造成資源的爭用;如果有一個TaskManager宕機,它上面承載着的所有作業也都會失敗。另外,啟動的作業越多,JobManager的負載也就越大。所以,Session模式一般用來部署那些對延遲非常敏感但運行時長較短的作業。

  提交任務命令:

./bin/flink run -t yarn-session \
  -Dyarn.application.id=application_XXXX_YY \
  ./examples/streaming/TopSpeedWindowing.jar

  再次連接YARN session命令:

./bin/yarn-session.sh -id application_XXXX_YY

3.3.2 Per_Job模式

  顧名思義,在Per-Job模式下,每個提交到YARN上的作業會各自形成單獨的Flink集群,擁有專屬的JobManager和TaskManager。可見,以Per-Job模式提交作業的啟動延遲可能會較高,但是作業之間的資源完全隔離,一個作業的TaskManager失敗不會影響其他作業的運行,JobManager的負載也是分散開的,不存在單點問題。當作業運行完成,與它關聯的集群也就被銷毀,資源被釋放。所以,Per-Job模式一般用來部署那些長時間運行的作業。

  提交任務命令:

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

  注:--detached,以分離模式運行作業,detached模式在提交完任務后就退出client

  任務列表和取消任務命令:

# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

3.3.3 Session與Per_Job模式帶來的問題

  Session模式和Per_Job模式可以用如下簡圖表示,其中紅色、藍色和綠色的圖代表不同的作業。

 

 

   Deployer表示向YARN集群發起部署請求的節點,一般來講在生產環境中,也總有這樣一個節點作為所有作業的提交入口(即客戶端)。在main()方法開始執行直到env.execute()方法之前,客戶端也需要做一些工作,即:

  • 獲取作業所需的依賴項;
  • 通過執行環境分析並取得邏輯計划,即StreamGraph->JobGraph;
  • 將依賴項和JobGraph上傳到集群中。

  只有在這些都完成之后,才會通過env.execute()方法觸發Flink運行時真正地開始執行作業。如果所有用戶都在Deployer上提交作業,較大的依賴會消耗更多的帶寬,而較復雜的作業邏輯翻譯成JobGraph也需要吃掉更多的CPU和內存,客戶端的資源反而會成為瓶頸。不管Session還是Per-Job模式都存在此問題。為了解決,社區在傳統部署模式的基礎上實現了Application模式。

3.3.4 Application模式

   此模式的作業提交框圖如下:

 

 

   可見,原本需要客戶端做的三件事被轉移到了JobManager里,也就是說main()方法在集群中執行(入口點位於ApplicationClusterEntryPoint),Deployer只需要負責發起部署請求了。另外如果一個main()方法中有多個env.execute/executeAsync()調用,在Application模式下,這些作業會被視為同一個應用,在同一個集群中執行(如果在Per-Job模式下,就會啟動多個集群)。可見,Application模式本質上是Session和Per-Job模式的折衷。

  用Application模式提交作業的示例命令如下:

bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="MyFlinkApp" \
/path/to/my/flink-app/MyFlinkApp.jar

  注:-t參數用來指定部署目標,目前支持YARN(yarn-application)和K8S(kubernetes-application)。-D參數則用來指定與作業相關的各項參數。

  查看集群的任務Job列表以及取消任務命令,如下:

# List running job on the cluster
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

  那么如何解決傳輸依賴項造成的帶寬占用問題呢?Flink作業必須的依賴是發行版flink-dist.jar,還有擴展卡(位於\$FLINK_HOME/lib)和插件庫(位於\$FLINK_HOME/plugin),我們將它們預先上傳到像HDFS這樣的共享存儲,再通過yarn.provided.lib.dirs參數指定存儲的路徑即可。

-Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-common-deps/lib;hdfs://myhdfs/flink-common-deps/plugins"

  這樣所有作業就不必各自上傳依賴,可以直接從HDFS拉取,並且YARN NodeManager也會緩存這些依賴,進一步加快作業的提交過程。同理,包含Flink作業的用戶JAR包也可以上傳到HDFS,並指定遠程路徑進行提交。

./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" hdfs://myhdfs/jars/my-application.jar

4. 總結

【參考資料】

https://www.sohu.com/a/406387061_100109711

https://zhuanlan.zhihu.com/p/354511839

https://www.jianshu.com/p/90d9f1f24937


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM