Flink的高可用集群環境


Flink的高可用集群環境

Flink簡介

       Flink核心是一個流式的數據流執行引擎,其針對數據流的分布式計算提供了數據分布,數據通信以及容錯機制等功能。

         因現在主要Flink這一塊做先關方面的學習,因此准備要開通Apache Flink專欄這一塊定期發布一些文章。今天在自己的博客因為專欄無法申請通過,所以先在此記錄第一篇關於Flink部署的文章。

         在這里順便打個小廣告,Flink社區第一季線下meetup,已在上海,北京舉辦。接下來分別會在成都和深圳舉辦接下來的幾期,也希望小伙伴們踴躍的加入到Flink社區來,下載釘釘,掃描下方二維碼即可加入大群。


     首先今天先介紹一下Flink的安裝,安裝部署最新1.6版本支持有8種安裝方式,詳細可以參考安裝部署方式【Clusters & Deployment】 。下面主要介紹Standalone Cluster模式和on yarn模式 。

軟件包下載地址

一.Flink獨立集群模式安裝(Cluster Standalone)

1.1.解壓安裝

[root@h001 soft]# tar -zxvf flink-1.2.0-bin-hadoop26-scala_2.11.tgz -C /usr/bigdata/
1
1.2.Flink配置(Configuring Flink)

對其進行相關的配置。主要涉及到的配置文件是conf/flink-conf.yaml

flink-conf.yaml配置

jobmanager.rpc.address:值設置成你master節點的IP地址
taskmanager.heap.mb:每個TaskManager可用的總內存
taskmanager.numberOfTaskSlots:每台機器上可用CPU的總數
parallelism.default:每個Job運行時默認的並行度(這個參數在文檔中介紹好像有問題)
taskmanager.tmp.dirs:臨時目錄
jobmanager.heap.mb:每個節點的JVM能夠分配的最大內存
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081

[root@h001 conf]# vim flink-conf.yaml
jobmanager.rpc.address:h001
taskmanager.heap.mb:2048
taskmanager.numberOfTaskSlots:4
parallelism.default:10
taskmanager.tmp.dirs:/tmp
jobmanager.heap.mb:2048
jobmanager.web.port: 8081
jobmanager.rpc.port: 6123
1
2
3
4
5
6
7
8
9
10
主節點與從節點配置

[root@h002 conf]# vim slaves
h002
h003
h004
h005
[root@h001 conf]# vim masters
h001:8082
1
2
3
4
5
6
7
1.3.Flink安裝包分發到所有的worker節點上

[root@h001 bigdata]# clush -v -w h[002-005] --copy flink-1.5.1 --dest /usr/bigdata/
1
1.4.啟動Flink(Starting Flink)

       在master節點上運行下面的腳本,那么這台機器上將會啟動一個JobManager,並通過SSH連接列在slaves文件中的所有節點以便在每個節點上啟動TaskManager

[root@h001 flink-1.5.1]# bin/start-cluster.sh
1
如果停止集群,可以在master節點上運行下面的命令

[root@h001 flink-1.5.1]# bin/stop-cluster.sh
1
1.5. 在已經運行的集群中添加JobManager/TaskManager

      通過bin/taskmanager.sh或者bin/jobmanager.sh腳本在已經運行的集群中添加JobManager或者TaskManager節點

[root@h001 flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)

[root@h001 flink-1.2.0]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
1
2
3
二.JobManager高可用(HA)

       JobManager協調每一個Flink集群環境,它負責作業調度和資源管理。默認情況下,一個Flink集群中只有一個JobManager實例,這很容易造成單點故障(SPOF)。如果JobManager奔潰了,那么將沒有新的程序被提交,同時運行的程序將失敗。
       對於JobManager高可用來說,我們可以從失敗的JobManager中恢復,因此可以消除單點故障的問題。我們可以配置Standalone模式和YARN集群模式下的高可用JobManager的HA,是通過Zookeeper實現的,因此需要先搭建好Zookeeper集群,同時HA的信息,還要存儲在HDFS中,因此也需要Hadoop集群,最后修改Flink中的配置文件。

根據部署方式不同,Flink Jobmanager HA配置分為2種:
        1、standalone cluster HA
        2、Yarn cluster HA

2.1. Standalone集群模式高可用

        對於Standalone集群模式下的JobManager高可用通常的方案是:Flink集群的任一時刻只有一個leading JobManager,並且有多個standby JobManager。當leader失敗后,standby通過選舉出一個JobManager作為新的leader。這個方案可以保證沒有單點故障的問題。對於standby和master JobManager實例來說,其實沒有明確的區別,每一個JobManager能夠當擔master或standby角色。

2.1.1.相關配置

      為了保證JobManager高可用,你需要設置Zookeeper為recovery mode(恢復模式),配置一個Zookeeper quorum並且對所有的JobManager節點和它們的Web UI端口號設置一個masters文件。

Flink引入Zookeeper的目的主要是讓JobManager實現高可用(leader選舉)
Flink使用Zookeeper在所有運行的JobManager實例中進行分布式調度的協調。Zookeeper在Flink中是一個獨立的服務,它能夠通過leader選舉和輕量級的一致性狀態存儲來提供高度可靠的分布式協調器
Master File(masters)
為了啟動一個HA-cluster,需要在conf/masters中配置masters。
masters文件:masters文件包含所有的hosts,每個host啟動都JobManager,並且指定綁定的Web UI端口號:
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
1
2
3
配置文件flink-conf.yaml

為了啟動一個HA-Cluster,需要在conf/flink-conf.yaml添加如下配置參數:

Recovery mode(必須的):recovery.mode: zookeeper
zookeeper quorum(必須的):recovery.zookeeper.quorum: address1:2181,...
Zookeeper root(推薦的):Flink在Zookeeper中的root節點,下面放置所有需要協調的數據recovery.zookeeper.path.root: /flink
1
2
3
      如果你運行多個Flink HA集群,那么你必須手工配置每個Flink集群使用獨立的root節點

State backend and storage directory(必須的):JobManager元數據在statebackend保持並且僅僅在Zookeeper中存儲,目前在HA模式中,僅支持filesystem。
state.backend: filesystem
state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.storageDir指定的路徑中存儲了所有的元數據,用來恢復失敗的JobManager
1
2
3
4
5
2.2. 兩個JobManager的Standalone模式下的集群

conf/flink-conf.yaml文件

配置恢復模式和Zookeeper quorum

[root@h001 conf]# vim flink-conf.yaml
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
recovery.zookeeper.path.root: /flink
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
1
2
3
4
5
6
7
在Hadoop文件系統創建文件夾

[root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
[root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
[root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/
1
2
3
配置conf/masters文件

[root@h001 conf]# vim masters
h001:8081
h002:8081
1
2
3
配置conf/zoo.cfg文件,添加Zookeeper集群節點

[root@h001 conf]# vim zoo.cfg
server.1=h002:2888:3888
server.2=h003:2888:3888
server.3=h004:2888:3888
1
2
3
4
啟動Zookeeper集群

[root@h001 flink-1.2.0]# bin/start-zookeeper-quorum.sh
1
啟動Flink集群

[root@h001 flink-1.2.0]# bin/start-cluster.sh
1
經過測試kill掉其中一個jobmanager可切換主備。

2.2. YARN集群模式高可用

        當運行一個高可用YARN集群時,我們不需要運行多個JobManager(ApplicationMaster)實例,只需要運行一個實例,如果失敗了通過YARN來進行重啟
        Flink部署在Yarn上,僅作為yarn上“多租戶”的一個service而存在。Flink在yarn中容器的概念分為2種:

用於啟動JobManager(AM)的容器
用於啟動TaskManager的容器
1
2
通過yarn-session.sh –help來看下啟動Flink On Yarn的參數信息

        其中-n代表taskmanager的容器數量,而不是taskmanager+jobmanager的容器數量
在配置HA前,先通過-q看一下我的yarn集群的資源情況:

      從圖中可以看出,我配置的每個NodeManager的內存是2048MB(yarn-site.xml),每個NodeManager的vcores數量是2。所以,當前yarn集群中可用內存總量為6144,總cores是6

2.2.1. FLINK ON YARN HA 配置

配置准備

   在配置Flink On Yarn之前,必須保證hdfs和yarn都已經開啟,可以通過HADOOPHOME/sbin/start−all.sh啟動hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOPHOME/sbin/start−all.sh啟動hdfs和yarn配置(yarn−site.xml)此配置需要在HADOOP_CONF_DIR 的yarn-site.xml添加

[root@h001 ~]# cd /usr/bigdata/hadoop/etc/hadoop/
[root@h001 hadoop]# vim yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
1
2
3
4
5
6
此配置代表application master在重啟時,嘗試的最大次數

[root@h001 hadoop]# clush -v -w h[002-005] --copy yarn-site.xml --dest /usr/bigdata/hadoop/etc/hadoop/
1
配置(flink-conf.yaml),此參數需要在$FLINK_HOME/conf 的flink-conf.yaml中配置

[root@h001 conf]# vim flink-conf.yaml
yarn.application-attempts: 10
1
2
      此參數代表Flink Job(yarn中稱為application)在Jobmanager(或者叫Application Master)恢復時,允許重啟的最大次數。
      注意,Flink On Yarn環境中,當Jobmanager(ApplicationMaster)失敗時,yarn會嘗試重啟JobManager(AM),重啟后,會重新啟動Flink的Job(application)。因此,yarn.application-attempts的設置不應該超過yarn.resourcemanager.am.max-attemps

[root@h001 conf]# clush -v -w h[002-005] --copy flink-conf.yaml --dest /usr/bigdata/flink-1.5.1/conf/
1
配置zookeeper信息
      雖然flink-on-yarn cluster HA依賴於Yarn自己的集群機制,但是Flink Job在恢復時,需要依賴檢查點產生的快照,而這些快照雖然配置在hdfs,但是其元數據信息保存在zookeeper中,所以我們還要配置zookeeper的HA信息。其中,recovery.zookeeper.path.namespace也可以在啟動Flink on Yarn時通過-z參數覆蓋。
      在yarn模式下,jobmanager.rpc.address不需要指定,因為哪一個容器作為jobManager由Yarn決定,而不由Flink配置決定;taskmanager.tmp.dirs也不需要指定,這個參數將被yarn的tmp參數指定,默認就是/tmp目錄下,保存一些用於上傳到ResourceManager的jar或lib文件。parrallelism.default也不需要指定,因為在啟動yarn時,通過-s指定每個taskmanager的slots數量。

完整的Flink配置信息如下:

root@h001 conf]# vim flink-conf.yaml
env.java.home: /usr/java/jdk1.8.0_111
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:2181,h003:2181,h004:2181
recovery.zookeeper.path.root: /flink
recovery.zookeeper.path.namespace: /cluster_yarn
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:9000/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:9000/flink/recovery
taskmanager.network.numberOfBuffers: 64000
fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop
1
2
3
4
5
6
7
8
9
10
11
以上的yarn HA配置可在Standalone集群模式下進一步添加幾個參數即可完成。

2.2.2.啟動FLINK YARN SESSION

在YARN上啟動一個Flink主要有兩種方式:

(1)、啟動一個YARN session(Start a long-running Flink cluster on YARN)
(2)、直接在YARN上提交運行Flink作業(Run a Flink job on YARN)
1
2
Flink YARN Session

啟動Flink Yarn Session有2種模式:

(1)、分離模式
(2)、客戶端模式
1
2
      通過-d指定分離模式,即客戶端在啟動Flink Yarn Session后,就不再屬於Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通過yarn application -kill 命令來停止
      這種模式下會啟動yarn session,並且會啟動Flink的兩個必要服務:JobManager和TaskManagers,然后你可以向集群提交作業。同一個Session中可以提交多個Flink作業。需要注意的是,這種模式下Hadoop的版本至少是2.2,而且必須安裝了HDFS(因為啟動YARN session的時候會向HDFS上提交相關的jar文件和配置文件)。我們可以通過./bin/yarn-session.sh腳本啟動YARN Session。
      在啟動的是可以指定TaskManager的個數以及內存(默認是1G),也可以指定JobManager的內存,但是JobManager的個數只能是一個。
采用客戶端模式來啟動Flink Yarn Session:

[root@h001 flink-1.5.1]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
或者
./bin/yarn-session.sh -n 4 -tm 8192 -s 8
1
2
3
參數說明:

-n:--container 指YARN container分配的個數(即TaskManagers的個數)
-jm:--jobManagerMemory 指JobManager Containe的內存大小,單位為MB
-tm:--taskManagerMemory 指每個TaskManagerContainer的內存大小,單位為MB
-s :指每個TaskManager的slot個數
1
2
3
4
可以通過yarn的webUI查看一下當前啟動的Application

通過ApplicationMaster tracking一下Flink的WebUI
http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview
提交作業
使用bin/flink腳本提交作業,同樣我們來看看這個腳本支持哪些參數:

[root@h001 flink-1.5.1]# bin/flink run
bin/flink run ./examples/batch/WordCount.jar \
--input hdfs:///user/test/LICENSE \
--output hdfs:///user/test/result.txt
1
2
3
4
后面相應的跟上參數提交作業即可。

Run a single Flink job on YARN

      上面的YARN session是在Hadoop YARN環境下啟動一個Flink cluster集群,里面的資源是可以共享給其他的Flink作業。我們還可以在YARN上啟動一個Flink作業。這里我們還是使用./bin/flink,但是不需要事先啟動YARN session:

[root@h001 flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar \
--input hdfs:///user/test/LICENSE \
--output hdfs:///user/test/result.txt
1
2
3
      上面的命令同樣會啟動一個類似於YARN session啟動的頁面。其中的-yn是指TaskManager的個數,必須指定。
---------------------
作者:獨行夏
來源:CSDN
原文:https://blog.csdn.net/u013368491/article/details/81610672
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM