JobManager協調每個flink應用的部署,它負責執行定時任務和資源管理。
每一個Flink集群都有一個jobManager, 如果jobManager出現問題之后,將不能提交新的任務和運行新任務失敗,這樣會造成單點失敗,所以需要構建高可用的JobMangager。
類似zookeeper一樣,構建好了高可用的jobManager之后,如果其中一個出現問題之后,其他可用的jobManager將會接管任務,變為leader。不會造成flink的任務執行失敗。可以在單機版和集群版構建jobManager。
下面開始構建一個單機版flink的JobManger高可用HA版。
首先需要設置SSH免密登錄,因為啟動的時候程序會通過遠程登錄訪問並且啟動程序。
執行命令,就可以免密登錄自己的機器了。如果不進行免密登錄的話,那么啟動的hadoop的時候會報 "start port 22 connection refused"。
ssh-keygen -t rsa ssh-copy-id -i ~/.ssh/id_rsa.pub huangqingshi@localhost
接下來在官網上下載hadoop的binary文件,然后開始解壓,我下載的版本為hadoop-3.1.3版本。安裝Hadoop的目的是用hadoop存儲flink的JobManager高可用的元數據信息。
我安裝的是hadoop的單機版,可以構建hadoop集群版。接下來進行hadoop的配置。
配置etc/hadoop/coresite.xml,指定namenode的hdfs協議文件系統的通信地址及臨時文件目錄。
<configuration> <property> <!--指定namenode的hdfs協議文件系統的通信地址--> <name>fs.defaultFS</name> <value>hdfs://127.0.0.1:9000</value> </property> <property> <!--指定hadoop集群存儲臨時文件的目錄--> <name>hadoop.tmp.dir</name> <value>/tmp/hadoop/tmp</value> </property> </configuration>
配置etc/hadoop/hdfs-site.xml, 設置元數據的存放位置,數據塊的存放位置,DFS監聽端口。
<configuration> <property> <!--namenode 節點數據(即元數據)的存放位置,可以指定多個目錄實現容錯,多個目錄用逗號分隔--> <name>dfs.namenode.name.dir</name> <value>/tmp/hadoop/namenode/data</value> </property> <property> <!--datanode 節點數據(即數據塊)的存放位置--> <name>dfs.datanode.data.dir</name> <value>/tmp/hadoop/datanode/data</value> </property> <property> <!--手動設置DFS監聽端口--> <name>dfs.http.address</name> <value>127.0.0.1:50070</value> </property> </configuration>
配置etc/hadoop/yarn-site.xml,配置NodeManager上運行的附屬服務以及resourceManager主機名。
<configuration> <!-- Site specific YARN configuration properties --> <property> <!--配置NodeManger上運行的附屬服務。需要配置成mapreduce_shuffle后才可以在Yarn上運行MapReduce程序--> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <!--resourcemanager 的主機名--> <name>yarn.resourcemanager.hostname</name> <value>localhost</value> </property> </configuration>
配置etc/hadoop/mapred-site.xml,指定mapreduce作業運行在yarn上。
<property> <!--指定mapreduce作業運行在yarn上--> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
需要執行nameNode的format操作,不執行直接啟動會報“NameNode is not formatted.”。
bin/hdfs namenode -format
接下來啟動hadoop,如果成功的話,可以訪問如下URL:
http://localhost:8088/ 查看構成cluster的節點
http://localhost:8042/node 查看node的相關信息。
以上說明hadoop單機版搭建完成。
接下來需要下載一個flink的hadoop插件,要不然flink啟動的時候會報錯的。
把下載的插件放到flink文件的lib文件夾中。
配置一下flink文件夾的conf/flink-conf.yaml。指定HA高可用模式為zookeeper,元數據存儲路徑用於恢復master,zookeeper用於flink的 checkpoint 以及 leader 選舉。最后一條為zookeeper單機或集群的地址。
high-availability: zookeeper
high-availability.storageDir: hdfs://127.0.0.1:9000/flink/ha
high-availability.zookeeper.quorum: localhost:2181
其他的采用默認配置,比如JobManager的最大堆內存為1G,每一個TaskManager提供一個task slot,執行串行的任務。
接下來配置flink的 conf/masters 用於啟動兩個主節點JobManager。
localhost:8081
localhost:8082
配置flink的 conf/slaver 用於配置三個從節點TaskManager。
localhost
localhost
localhost
進入zookeeper路徑並且啟動zookeeper
bin/zkServer.sh start
進入flink路徑並啟動flink。
bin/start-cluster.sh conf/flink-conf.yaml
啟動截圖說明啟動了兩個節點的HA集群。
執行jps,兩個JobManager節點和三個TaskManager節點:
瀏覽器訪問 http://localhost:8081 和 http://localhost:8082,查看里邊的日志,搜索granted leadership的說明是主JobManager,如下圖。8082端口說明為主JobMaster
一個JobManager, 里邊有三個TaskManager,兩個JobManager共享這三個TaskManager:
接下來我們來驗證一下集群的HA功能,我們已經知道8082為主JobManager,然后我們找到它的PID,使用如下命令:
ps -ef | grep StandaloneSession
我們將其kill掉,執行命令kill -9 51963,此時在訪問localhost:8082 就不能訪問了。localhost:8081 還可以訪問,還可以提供服務。接下來咱們重新 啟動flink的JobManager 8082 端口。
bin/jobmanager.sh start localhost 8082
此時8081已經成為leader了,繼續提供高可用的HA了。
好了,到此就算搭建完成了。