Flink on yarn以及實現jobManager 高可用(HA)


on yarn:https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/yarn_setup.html


 flink on yarn兩種方式

第一種方式:yarn session 模式,在yarn上啟動一個長期運行的flink集群

使用 yarn session 模式,我們需要先啟動一個 yarn-session 會話,相當於啟動了一個 yarn 任務,這個任務所占用的資源不會變化,並且一直運行。我們在使用 flink run 向這個 session 任務提交作業時,如果 session 的資源不足,那么任務會等待,直到其他資源釋放。當這個 yarn-session 被殺死時,所有任務都會停止。

 

把yarn和hdfs相關配置文件拷貝到flink配置目錄下,或者直接指定yarn和hdfs配置文件對應的路徑

export HADOOP_CONF_DIR=/root/flink-1.8.2/conf
cd flink-1.8.2/ ./bin/yarn-session.sh -jm 1024m -tm 4096m -s 16

-jm:jobmanager的內存,-tm:每個taskmanager的內存,-s:the number of processing slots per Task Manager

日志如下

[root@master01 flink-1.8.2]# ./bin/yarn-session.sh -jm 1024m -tm 4096m -s 16
2019-12-10 10:05:40,010 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, master01.hadoop.xxx.cn
2019-12-10 10:05:40,012 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-12-10 10:05:40,012 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-12-10 10:05:40,012 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-12-10 10:05:40,012 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-12-10 10:05:40,012 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-12-10 10:05:40,067 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-12-10 10:05:40,399 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to root (auth:SIMPLE)
2019-12-10 10:05:40,459 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master01.hadoop.xxx.cn/xxx.xx.x.xxx:8032
2019-12-10 10:05:40,634 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, numberTaskManagers=1, slotsPerTaskManager=16}
2019-12-10 10:05:40,857 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-12-10 10:05:40,873 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/root/flink-1.8.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-12-10 10:05:42,434 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1570496850779_0463
2019-12-10 10:05:42,457 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1570496850779_0463
2019-12-10 10:05:42,457 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-12-10 10:05:42,458 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-12-10 10:05:46,234 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-12-10 10:05:46,597 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on worker03.hadoop.xxx.cn:38055 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://worker03.hadoop.xxx.cn:38055

 

查看web界面可以直接到yarn界面查看,也可以通過日志中給出的jobmanager界面查看

 

 

提交任務測試,提交任務使用./bin/flink

cd flink-1.8.2/
./bin/flink run ./examples/batch/WordCount.jar

日志如下:

[root@master01 flink-1.8.2]# ./bin/flink run ./examples/batch/WordCount.jar
2019-12-10 11:01:43,553 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root. 2019-12-10 11:01:43,553 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-root.
2019-12-10 11:01:43,785 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 16
2019-12-10 11:01:43,785 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 16
YARN properties set default parallelism to 16
2019-12-10 11:01:43,812 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master01.hadoop.xxx.cn/xxx.xx.x.211:8032
2019-12-10 11:01:43,904 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-10 11:01:43,904 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-10 11:01:43,956 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'worker02.hadoop.xxx.cn' and port '39095' from supplied application id 'application_1570496850779_0467'
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
......

問題:在提交flink任務時候,flink是怎么找到對應的集群呢?

看日志高亮部分,查看/tmp/.yarn-properties-root文件內容

[root@master01 flink-1.8.2]# more /tmp/.yarn-properties-root
#Generated YARN properties file
#Tue Dec 10 10:40:29 CST 2019
parallelism=16
dynamicPropertiesString= applicationID=application_1570496850779_0467

這個applicationID不就是我們提交到yarn上flink集群對應的id嘛。

 

到flink web ui查看任務記錄

 

 此外,在啟動on yarn flink集群時候可以使用-d or --detached實現類似后台運行的形式執行,此方式下,如果想停止集群,使用yarn application -kill <appId>

 

第二種方式:Run a single Flink job on YARN

 上面第一種方式是在yarn上啟動一個flink集群,然后提交任務時候向這個集群提交。此外,也可以在yarn上直接執行一個flink任務,有點類似spark-submit的感覺。

[root@master01 flink-1.8.2]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar

日志:

2019-12-10 11:44:56,912 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master01.hadoop.xxx.cn/xxx.xx.x.xxx:8032
2019-12-10 11:44:57,004 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-10 11:44:57,004 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-10 11:44:57,101 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-12-10 11:44:57,379 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/root/flink-1.8.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-12-10 11:45:01,058 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1570496850779_0470
2019-12-10 11:45:01,093 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1570496850779_0470
2019-12-10 11:45:01,093 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-12-10 11:45:01,094 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-12-10 11:45:05,621 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
......

可以看到,第一件事是連接yarn的resourcemanager。

 ./bin/flink run 命令解析:

run [OPTIONS] <jar-file> <arguments>  
"run" 操作參數:  
-c,--class <classname>  如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定  
-m,--jobmanager <host:port>  指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同於配置文件中的jobmanager  
-p,--parallelism <parallelism>   指定程序的並行度。可以覆蓋配置文件中的默認值。
默認查找當前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
連接指定host和port的jobmanager:
./bin/flink run -m hadoop100:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
啟動一個新的yarn-session:
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的前綴
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 

 

Flink on yarn的內部實現

 

既然是on yarn,那必然需要知道yarn以及hdfs的相關配置,獲取相關配置流程如下:

1,先檢查有沒有設置 YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH環境變量,如果其中之一設置了的話,那就通過此方式讀取環境信息。

2,如果第一部分沒有設置任何內容,那么客戶端會去找HADOOP_HOME環境變量,然后訪問$HADOOP_HOME/etc/hadoop路徑下的配置文件。

當flink在提交一個任務時,客戶端首先會檢查資源是否可用(內存和cpu),然后上傳flink jar包到hdfs。

然后客戶端申請container啟動applicationMaster,被選中的nodeManager初始化container,比如下載相關文件,然后啟動applicationMaster。

JobManager和AM在同一個container中運行。AM也就知道JobManager的地址。然后為taskManager生成一個新的Flink配置文件(以便它們可以連接到JobManager)。文件也被上傳到HDFS。此外,AM container還提供Flink的web接口。(yarn分配的所有端口都是臨時端口。並且允許用戶並行執行多個Flink任務)

之后,AM開始為Flink的taskManager分配container,后者將從HDFS下載jar包和修改后的配置文件。即可接收job然后執行


 

HA

官網參考

因為單點故障的存在(single point of failure (SPOF))所以要做HA,實現HA又分flink standalone模式和on yarn模式

flink standalone模式下的HA

運行多個jobManager,其中一個為leader,其他為standby,通過zookeeper實現故障切換。如下圖:

 

相關配置:

1.在conf/masters文件中添加多個jobManager主機和端口號,我這里環境如下

[root@master01 conf]# more masters 
master01.hadoop.xxx.cn:8081
worker03.hadoop.xxx.cn:8081

2.修改conf/flink-conf.yaml文件,主要是指定通過zookeeper來實現HA

(我這里已有運行正常的cdh集群)

high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: master01.hadoop.xxx.cn:2181,worker01.hadoop.xxx.cn:2181,worker03.hadoop.xxx.cn:2181

此外,zookeeper是在/flink目錄下存儲對應的元數據(類似hbase),並且zk存儲的並不是真正做recovery的元數據,數據其實是存儲在hdfs上的,zk存儲的只是指向hdfs路徑的一個標識。

3.發flink包到各個節點

4.執行bin/start-cluster.sh

 

看wei界面

 

 

可以看到已經啟用HA以及使用的zk集群,目前leader為master01節點。zk目錄結構存儲如下:

[zk: localhost:2181(CONNECTED) 0] ls /
[flink, hive_zookeeper_namespace_hive, zookeeper, solr]
[zk: localhost:2181(CONNECTED) 1] ls /flink
[default]
[zk: localhost:2181(CONNECTED) 2] ls /flink/default
[jobgraphs, leader, leaderlatch]

 

kill掉master01節點的jobManager進程看能否實現切換,進程如下:

83819 StandaloneSessionClusterEntrypoint

再訪問web界面,如下:

 

 Flink on yarn HA實現

官網介紹:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/jobmanager_high_availability.html#yarn-cluster-high-availability

 

與 Standalone 集群不同的是,Flink on Yarn 的高可用配置只需要一個 JobManager。當 JobManager 發生失敗時,Yarn 負責將其重新啟動。

我們需要修改 yarn-site.yaml 文件中的配置,如下所示:

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

yarn.resourcemanager.am.max-attempts 表示 Yarn 的 application master 的最大重試次數。

除了上述 HA 配置之外,還需要配置 flink-conf.yaml 中的最大重試次數(默認為2):

yarn.application-attempts: 10

當 yarn.application-attempts 配置為 10 的時候:

這意味着如果程序啟動失敗,YARN 會再重試 9 次(9 次重試 + 1 次啟動),如果 YARN 啟動 10 次作業還失敗,則 YARN 才會將該任務的狀態置為失敗。如果發生進程搶占,節點硬件故障或重啟,NodeManager 重新同步等,YARN 會繼續嘗試啟動應用。 這些重啟不計入 yarn.application-attempts 個數中。

 

同時官網給出了重要提示,不同 Yarn 版本的容器關閉行為不同:

  • YARN 2.3.0 < YARN 版本 < 2.4.0。如果 application master 進程失敗,則所有的 container 都會重啟。

  • YARN 2.4.0 < YARN 版本 < 2.6.0。TaskManager container 在 application master 故障期間,會繼續工作。這樣的優點是:啟動時間更快,且縮短了所有 task manager 啟動時申請資源的時間。

  • YARN 2.6.0 <= YARN 版本:失敗重試的間隔會被設置為 Akka 的超時時間。在一次時間間隔內達到最大失敗重試次數才會被置為失敗。

另外,需要注意的是,假如你的 ZooKeeper 集群使用 Kerberos 安全模式運行,那么可以根據需要添加下面的配置:

zookeeper.sasl.service-name
zookeeper.sasl.login-context-name

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM