Flink On Yarn的兩種模式:
- Session模式
- Cluster模式
版本准備:
- CentOS 7.8 ( 本人是三台,主節點名為:pmaster,備用節點名為:pnode1 ,子節點:pnode2 )
- Zookeeper 3.6.3
- Hadoop 2.10.1
- Flink 1.12
直接去華為鏡像站下載就好了!
檢查集群環境
- 關閉防火牆 service iptables stop,關閉防火牆自啟動 chkconfig iptables off
- 配置各個節點之間的映射關系(vim /etc/hosts),並將文件同步到其余節點 scp xxx node1:`pwd`
- 每台節點動態ip改為靜態ip(雲服務器不需要配置)
- 配置主節點和備用節點向所有節點的免密登錄
- 每台節點的Java環境配置
- 每台節點的時間進行同步
Zookeeper搭建
1、上傳解壓安裝包
2、每台節點配置環境變量: vim /etc/profile
編輯,插入以下內容:
export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin
保存退出,source /etc/profile
3、編輯 ${ZOOKEEPER_HOME}/conf/zoo.cfg (需要將zoo_sample.cfg 復制一份為 cp zoo_sample.cfg zoo.cfg)
添加如下代碼(pmaster,pnode1,pnode2 對應節點名):
dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data server.0=pmaster:2888:3888 server.1=pnode1:2888:3888 server.2=pnode2 :2888:3888
4、cd 到/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data 文件下
vim myid ,每台節點配置不同數值 數值依次對應到server.0,server.1,server.2中的0,1,2
5、配置日志的目錄,編輯 ${ZOOKEEPER_HOME}/bin/zkEnv.sh
修改:ZOO_LOG_DIR=/data/zookeeper/logs
5、將zookeeper文件拷貝到其他服務器,修改對應的data下的myid(每台服務器的myid的數值必須不同)
6、啟動zookeeper
每台服務器 執行 zkServer.sh start
jps查看進程 要有QuorumPeerMain
每台服務器 執行 zkServer.sh status,要有leader節點和follower節點 即分布式zk搭建成功
Hadoop HA 的搭建
1、上傳解壓安裝包 (注:flink on yarn的hadoop版本最好在2.4.1以上 )
2、每台節點配置環境變量:vim /etc/profile
編輯,新增以下內容:
export HADOOP_HOME=/usr/local/hadoop/hadoop-2.10.1 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin
保存退出,source /etc/profile
3、進入: ${HADOOP_HOME}/etc/hadoop 目錄下,所有的更改都在此目錄下
編輯 slaves 文件(配置子節點):
刪除原有的內容,新增 pmaster,pnode1,pnode2
編輯 hadoop-env.sh 文件 (配置現有的JAVA環境變量)
export JAVA_HOME=/usr/local/java/jdk1.8.0_linux_111 export JAVA_HOME=${JAVA_HOME}
編輯 core-site.xml 文件:
<configuration> <!--集群模式HA下,不能顯示的指定的master節點、、名稱需要和hdfs-site.xml中一致 --> <property> <name>fs.defaultFS</name> <value>hdfs://icluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/usr/local/hadoop/hadoop-2.10.1/tmp</value> </property> <property> <name>fs.trash.interval</name> <value>1440</value> </property> <!--指定zk地址--> <property> <name>ha.zookeeper.quorum</name> <value>pmaster:2181,pnode1:2181,pnode2:2181</value> </property> </configuration>
編輯 hdfs-site.xml 文件:
<configuration> <!-- 指定 namenode 元數據存儲的路徑 --> <property> <name>dfs.namenode.name.dir</name> <value>/data/hadooplogs/data/namenode</value> </property> <!-- 指定 datanode 元據存儲的路徑 --> <property> <name>dfs.datanode.data.dir</name> <value>/data/hadooplogs/data/datanode</value> </property> <!-- 設置副本個數 主要根據集群來進行設置 最多不要超過datanode的個數 一般為3 --> <property> <name>dfs.replication</name> <value>1</value> </property> <!-- 開啟權限驗證 則外部程序無法訪問hdfs --> <property> <name>dfs.permissions</name> <value>true</value> </property> <!-- 開啟WebHDFS功能(基於REST的接口服務) --> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> <!-- //////////////以下為HDFS HA的配置////////////// --> <!-- 指定hdfs的nameservices名稱為 icluster --> <property> <name>dfs.nameservices</name> <value>icluster</value> </property> <!-- 指定cluster的兩個namenode的名稱分別為nn1,nn2 --> <property> <name>dfs.ha.namenodes.icluster</name> <value>nn1,nn2</value> </property> <!-- 配置nn1,nn2的rpc通信端口 --> <property> <name>dfs.namenode.rpc-address.icluster.nn1</name> <value>pmaster:8020</value> </property> <property> <name>dfs.namenode.rpc-address.icluster.nn2</name> <value>pnode1:8020</value> </property> <!-- 配置nn1,nn2的http通信端口 --> <property> <name>dfs.namenode.http-address.icluster.nn1</name> <value>pmaster:50070</value> </property> <property> <name>dfs.namenode.http-address.icluster.nn2</name> <value>pnode1:50070</value> </property> <!-- 指定namenode元數據存儲在journalnode中的路徑 --> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://pmaster:8485;pnode1:8485;pnode2:8485/icluster</value> </property> <!-- 指定journalnode日志文件存儲的路徑 --> <property> <name>dfs.journalnode.edits.dir</name> <value>/data/hadooplogs/data/journal</value> </property> <!-- 指定HDFS客戶端連接active namenode的java類 --> <property> <name>dfs.client.failover.proxy.provider.ibuscluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <!-- 配置隔離機制為ssh --> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <!-- 指定秘鑰的位置 生成的秘鑰位置要和這個一致 --> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property> <!-- 開啟自動故障轉移 namenode掛的時候能自動進行轉移 --> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> </configuration>
編輯 yarn-site.xml 文件:
<configuration> <!-- Site specific YARN configuration properties --> <!-- NodeManager上運行的附屬服務,需配置成mapreduce_shuffle才可運行MapReduce程序 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 開啟日志聚合功能 --> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 配置nodemanager的日志目錄 --> <property> <name>yarn.nodemanager.log-dirs</name> <values>/data/yarnlogs</values> </property> <!-- 配置日志刪除時間為7天,-1為禁用,單位為秒 --> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>604800</value> </property> <!-- 配置nodemanager可用的資源內存 --> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>15360</value> </property> <!-- 單個任務可申請的最少物理內存量,默認是1024(MB),如果一個任務申請的物理內存量少於該值,則該對應的值改為這個數 --> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>256</value> </property> <!-- 單個任務可申請的最多物理內存量,默認是8192(MB) --> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>10240</value> </property> <!-- 任務每使用1MB物理內存,最多可使用虛擬內存量,默認是2.1 --> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <!--是否啟動一個線程檢查每個任務正使用的物理內存量,如果任務超出分配值,則直接將其殺掉,默認是true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!-- 是否啟動一個線程檢查每個任務正使用的虛擬內存量,如果任務超出分配值,則直接將其殺掉,默認是true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <!-- 配置nodemanafer的vcore --> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>6</value> </property> <!--///////////////// 開啟YARN HA////////////////// --> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 啟用自動故障轉移 --> <property> <name>yarn.resourcemanager.ha.automatic-failover.enabled</name> <value>true</value> </property> <!-- 指定YARN HA的名稱 --> <property> <name>yarn.resourcemanager.cluster-id</name> <value>yarncluster</value> </property> <!-- 指定兩個resourcemanager的名稱 --> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <!-- 配置rm1,rm2的主機 --> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>pmaster</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>pnode1</value> </property> <!-- 配置YARN的http端口 --> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>pmaster:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>pnode1:8088</value> </property> <!-- 配置zookeeper的地址 --> <property> <name>yarn.resourcemanager.zk-address</name> <value>pmaster:2181,pnode1:2181,pnode2:2181</value> </property> <!-- 配置zookeeper的存儲位置 --> <property> <name>yarn.resourcemanager.zk-state-store.parent-path</name> <value>/rmstore</value> </property> <!-- 開啟yarn resourcemanager restart --> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!-- 配置resourcemanager的狀態存儲到zookeeper中 --> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <!-- 開啟yarn nodemanager restart --> <property> <name>yarn.nodemanager.recovery.enabled</name> <value>true</value> </property> <!-- 配置nodemanager IPC的通信端口 --> <property> <name>yarn.nodemanager.address</name> <value>0.0.0.0:45454</value> </property> </configuration>
4、Hadoop日志配置
編輯 hadoop-env.sh
#新增hadoop輸出日志 export HADOOP_LOG_DIR=/usr/local/soft/hadooplogs
分離 resourcemanager日志
編輯:yarn-env.sh
修改 resourcemanager日志路徑: export YARN_CONF_DIR = /data/yarnlogs
配置 nodemanager 日志:
編輯 yarn-site.xml
修改:
<!-- 配置nodemanager的日志目錄 -->
<property>
<name>yarn.nodemanager.log-dirs</name>
<values>/usr/local/soft/yarnlogs</values>
</property>
4、將更改后的文件拷貝至其他節點
5、啟動Hadoop(前提是啟動了zk)
- 在所有節點上啟動 JournalNode 命令hadoop-daemon.sh start journalnode
- 啟動完成后 jps查看進程 ,所有節點會出現JournalNode進程
- 啟動namenode
- 在任意一台namenode節點上先進行初始化hdfs namenode -format
- 初始化完成 會出現success
- 在當前節點上 啟動namenode,命令hadoop-daemon.sh start namenode
- 命名執行成功后 jps 查看進程 會有NameNode進程
- 在另外一台主節點上執行同步 命令:hdfs namenode -bootstrapStandby
- 格式化ZK 在已經啟動的namenode上面執行
- 命令:hdfs zkfc -formatZK (第一次需要格式化)
- 啟動hdfs集群,在啟動了namenode的節點上執行
- 命令start-dfs.sh
- Jps查看進程本地打開 hdfs網址 端口號 50070 兩個主節點的hdfs都打開,一個是active,一個是standby 。kill掉active的那個namenode,另一個會替換上來。重新啟動kill掉的namenode 命令:hadoop-daemon.sh start namenode
- 主節點上要有namenode、DFSZKFailoverController進程
- 子節點要有datanode進程
- 如果啟動出現問題或者更改了配置:
- 更改配置后需要將文件同步至所有節點
- 刪除所有節點 存放元數據的目錄 也就是配置的 data目錄
- 重新按照以上步驟進行操作
- 啟動yarn
- 任意一台主節點上執行命令:start-yarn.sh
- 另一台主節點上執行:yarn-daemon.sh start resourcemanager
- Jps查看進程
- 主節點要有ResourceManager
- 從節點要有 NodeManager
- 端口號 8088 查看web ui
Flink1.12 搭建
1、上傳解壓安裝包
2、每台節點配置環境變量: vim /etc/profile
編輯,插入以下內容:
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/* export FLINK_HOME=/usr/local/flink/flink-1.12.0 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${FLINK_HOME}/bin
保存退出,source /etc/profile
3、配置 ${FLINK_HOME}/conf/flink-conf.yaml 文件
high-availability: zookeeper high-availability.storageDir: hdfs://ibuscluster/flink/ha/ high-availability.zookeeper.quorum: pmaster:2181,pnode1:2181,pnode2:2181 high-availability.zookeeper.path.root: /flink #設置ck的狀態后端 state.backend: filesystem state.checkpoints.dir: hdfs://icluster/flink/checkpoints #設置默認的savepoint的保存位置 state.savepoints.dir: hdfs://icluster/flink/savepoints # 集群名稱不能寫錯 jobmanager.archive.fs.dir: hdfs://icluster/flink/completed-jobs/ historyserver.archive.fs.dir: hdfs://icluster/flink/completed-jobs/
4、 編輯zoo.cfg文件
server.0=pmaster:2888:3888 server.1=pnode1:2888:3888 server.2=pnode2 :2888:3888
5、將flink文件拷貝至其它節點
6、遇到的問題
如果在沒有配置環境變量的情況下直接運行 flink on yarn的cluster模式,會報以上錯誤,因為並沒有綁定hadoop類,所以需要執行export HADOOP_CLASSPATH=`hadoop classpath` 或者 添加環境變量
7、測試
cluster模式:
命令flink run -m yarn-cluster ./examples/batch/WordCount.jar
session模式:
命令 yarn-session.sh -tm 1024 -jm 1024 -s 3 -d
能夠進入到webui
直接在web界面進行操作: