Flink On Yarn的两种模式:
- Session模式
- Cluster模式
版本准备:
- CentOS 7.8 ( 本人是三台,主节点名为:pmaster,备用节点名为:pnode1 ,子节点:pnode2 )
- Zookeeper 3.6.3
- Hadoop 2.10.1
- Flink 1.12
直接去华为镜像站下载就好了!
检查集群环境
- 关闭防火墙 service iptables stop,关闭防火墙自启动 chkconfig iptables off
- 配置各个节点之间的映射关系(vim /etc/hosts),并将文件同步到其余节点 scp xxx node1:`pwd`
- 每台节点动态ip改为静态ip(云服务器不需要配置)
- 配置主节点和备用节点向所有节点的免密登录
- 每台节点的Java环境配置
- 每台节点的时间进行同步
Zookeeper搭建
1、上传解压安装包
2、每台节点配置环境变量: vim /etc/profile
编辑,插入以下内容:
export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin
保存退出,source /etc/profile
3、编辑 ${ZOOKEEPER_HOME}/conf/zoo.cfg (需要将zoo_sample.cfg 复制一份为 cp zoo_sample.cfg zoo.cfg)
添加如下代码(pmaster,pnode1,pnode2 对应节点名):
dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data server.0=pmaster:2888:3888 server.1=pnode1:2888:3888 server.2=pnode2 :2888:3888
4、cd 到/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data 文件下
vim myid ,每台节点配置不同数值 数值依次对应到server.0,server.1,server.2中的0,1,2
5、配置日志的目录,编辑 ${ZOOKEEPER_HOME}/bin/zkEnv.sh
修改:ZOO_LOG_DIR=/data/zookeeper/logs
5、将zookeeper文件拷贝到其他服务器,修改对应的data下的myid(每台服务器的myid的数值必须不同)
6、启动zookeeper
每台服务器 执行 zkServer.sh start
jps查看进程 要有QuorumPeerMain
每台服务器 执行 zkServer.sh status,要有leader节点和follower节点 即分布式zk搭建成功
Hadoop HA 的搭建
1、上传解压安装包 (注:flink on yarn的hadoop版本最好在2.4.1以上 )
2、每台节点配置环境变量:vim /etc/profile
编辑,新增以下内容:
export HADOOP_HOME=/usr/local/hadoop/hadoop-2.10.1 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin
保存退出,source /etc/profile
3、进入: ${HADOOP_HOME}/etc/hadoop 目录下,所有的更改都在此目录下
编辑 slaves 文件(配置子节点):
删除原有的内容,新增 pmaster,pnode1,pnode2
编辑 hadoop-env.sh 文件 (配置现有的JAVA环境变量)
export JAVA_HOME=/usr/local/java/jdk1.8.0_linux_111 export JAVA_HOME=${JAVA_HOME}
编辑 core-site.xml 文件:
<configuration> <!--集群模式HA下,不能显示的指定的master节点、、名称需要和hdfs-site.xml中一致 --> <property> <name>fs.defaultFS</name> <value>hdfs://icluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/usr/local/hadoop/hadoop-2.10.1/tmp</value> </property> <property> <name>fs.trash.interval</name> <value>1440</value> </property> <!--指定zk地址--> <property> <name>ha.zookeeper.quorum</name> <value>pmaster:2181,pnode1:2181,pnode2:2181</value> </property> </configuration>
编辑 hdfs-site.xml 文件:
<configuration> <!-- 指定 namenode 元数据存储的路径 --> <property> <name>dfs.namenode.name.dir</name> <value>/data/hadooplogs/data/namenode</value> </property> <!-- 指定 datanode 元据存储的路径 --> <property> <name>dfs.datanode.data.dir</name> <value>/data/hadooplogs/data/datanode</value> </property> <!-- 设置副本个数 主要根据集群来进行设置 最多不要超过datanode的个数 一般为3 --> <property> <name>dfs.replication</name> <value>1</value> </property> <!-- 开启权限验证 则外部程序无法访问hdfs --> <property> <name>dfs.permissions</name> <value>true</value> </property> <!-- 开启WebHDFS功能(基于REST的接口服务) --> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> <!-- //////////////以下为HDFS HA的配置////////////// --> <!-- 指定hdfs的nameservices名称为 icluster --> <property> <name>dfs.nameservices</name> <value>icluster</value> </property> <!-- 指定cluster的两个namenode的名称分别为nn1,nn2 --> <property> <name>dfs.ha.namenodes.icluster</name> <value>nn1,nn2</value> </property> <!-- 配置nn1,nn2的rpc通信端口 --> <property> <name>dfs.namenode.rpc-address.icluster.nn1</name> <value>pmaster:8020</value> </property> <property> <name>dfs.namenode.rpc-address.icluster.nn2</name> <value>pnode1:8020</value> </property> <!-- 配置nn1,nn2的http通信端口 --> <property> <name>dfs.namenode.http-address.icluster.nn1</name> <value>pmaster:50070</value> </property> <property> <name>dfs.namenode.http-address.icluster.nn2</name> <value>pnode1:50070</value> </property> <!-- 指定namenode元数据存储在journalnode中的路径 --> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://pmaster:8485;pnode1:8485;pnode2:8485/icluster</value> </property> <!-- 指定journalnode日志文件存储的路径 --> <property> <name>dfs.journalnode.edits.dir</name> <value>/data/hadooplogs/data/journal</value> </property> <!-- 指定HDFS客户端连接active namenode的java类 --> <property> <name>dfs.client.failover.proxy.provider.ibuscluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <!-- 配置隔离机制为ssh --> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <!-- 指定秘钥的位置 生成的秘钥位置要和这个一致 --> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property> <!-- 开启自动故障转移 namenode挂的时候能自动进行转移 --> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> </configuration>
编辑 yarn-site.xml 文件:
<configuration> <!-- Site specific YARN configuration properties --> <!-- NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序 --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- 开启日志聚合功能 --> <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> <!-- 配置nodemanager的日志目录 --> <property> <name>yarn.nodemanager.log-dirs</name> <values>/data/yarnlogs</values> </property> <!-- 配置日志删除时间为7天,-1为禁用,单位为秒 --> <property> <name>yarn.log-aggregation.retain-seconds</name> <value>604800</value> </property> <!-- 配置nodemanager可用的资源内存 --> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>15360</value> </property> <!-- 单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数 --> <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>256</value> </property> <!-- 单个任务可申请的最多物理内存量,默认是8192(MB) --> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>10240</value> </property> <!-- 任务每使用1MB物理内存,最多可使用虚拟内存量,默认是2.1 --> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <!-- 配置nodemanafer的vcore --> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>6</value> </property> <!--///////////////// 开启YARN HA////////////////// --> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 启用自动故障转移 --> <property> <name>yarn.resourcemanager.ha.automatic-failover.enabled</name> <value>true</value> </property> <!-- 指定YARN HA的名称 --> <property> <name>yarn.resourcemanager.cluster-id</name> <value>yarncluster</value> </property> <!-- 指定两个resourcemanager的名称 --> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <!-- 配置rm1,rm2的主机 --> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>pmaster</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>pnode1</value> </property> <!-- 配置YARN的http端口 --> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>pmaster:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>pnode1:8088</value> </property> <!-- 配置zookeeper的地址 --> <property> <name>yarn.resourcemanager.zk-address</name> <value>pmaster:2181,pnode1:2181,pnode2:2181</value> </property> <!-- 配置zookeeper的存储位置 --> <property> <name>yarn.resourcemanager.zk-state-store.parent-path</name> <value>/rmstore</value> </property> <!-- 开启yarn resourcemanager restart --> <property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value> </property> <!-- 配置resourcemanager的状态存储到zookeeper中 --> <property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value> </property> <!-- 开启yarn nodemanager restart --> <property> <name>yarn.nodemanager.recovery.enabled</name> <value>true</value> </property> <!-- 配置nodemanager IPC的通信端口 --> <property> <name>yarn.nodemanager.address</name> <value>0.0.0.0:45454</value> </property> </configuration>
4、Hadoop日志配置
编辑 hadoop-env.sh
#新增hadoop输出日志 export HADOOP_LOG_DIR=/usr/local/soft/hadooplogs
分离 resourcemanager日志
编辑:yarn-env.sh
修改 resourcemanager日志路径: export YARN_CONF_DIR = /data/yarnlogs
配置 nodemanager 日志:
编辑 yarn-site.xml
修改:
<!-- 配置nodemanager的日志目录 -->
<property>
<name>yarn.nodemanager.log-dirs</name>
<values>/usr/local/soft/yarnlogs</values>
</property>
4、将更改后的文件拷贝至其他节点
5、启动Hadoop(前提是启动了zk)
- 在所有节点上启动 JournalNode 命令hadoop-daemon.sh start journalnode
- 启动完成后 jps查看进程 ,所有节点会出现JournalNode进程
- 启动namenode
- 在任意一台namenode节点上先进行初始化hdfs namenode -format
- 初始化完成 会出现success
- 在当前节点上 启动namenode,命令hadoop-daemon.sh start namenode
- 命名执行成功后 jps 查看进程 会有NameNode进程
- 在另外一台主节点上执行同步 命令:hdfs namenode -bootstrapStandby
- 格式化ZK 在已经启动的namenode上面执行
- 命令:hdfs zkfc -formatZK (第一次需要格式化)
- 启动hdfs集群,在启动了namenode的节点上执行
- 命令start-dfs.sh
- Jps查看进程本地打开 hdfs网址 端口号 50070 两个主节点的hdfs都打开,一个是active,一个是standby 。kill掉active的那个namenode,另一个会替换上来。重新启动kill掉的namenode 命令:hadoop-daemon.sh start namenode
- 主节点上要有namenode、DFSZKFailoverController进程
- 子节点要有datanode进程
- 如果启动出现问题或者更改了配置:
- 更改配置后需要将文件同步至所有节点
- 删除所有节点 存放元数据的目录 也就是配置的 data目录
- 重新按照以上步骤进行操作
- 启动yarn
- 任意一台主节点上执行命令:start-yarn.sh
- 另一台主节点上执行:yarn-daemon.sh start resourcemanager
- Jps查看进程
- 主节点要有ResourceManager
- 从节点要有 NodeManager
- 端口号 8088 查看web ui
Flink1.12 搭建
1、上传解压安装包
2、每台节点配置环境变量: vim /etc/profile
编辑,插入以下内容:
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/* export FLINK_HOME=/usr/local/flink/flink-1.12.0 export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${FLINK_HOME}/bin
保存退出,source /etc/profile
3、配置 ${FLINK_HOME}/conf/flink-conf.yaml 文件
high-availability: zookeeper high-availability.storageDir: hdfs://ibuscluster/flink/ha/ high-availability.zookeeper.quorum: pmaster:2181,pnode1:2181,pnode2:2181 high-availability.zookeeper.path.root: /flink #设置ck的状态后端 state.backend: filesystem state.checkpoints.dir: hdfs://icluster/flink/checkpoints #设置默认的savepoint的保存位置 state.savepoints.dir: hdfs://icluster/flink/savepoints # 集群名称不能写错 jobmanager.archive.fs.dir: hdfs://icluster/flink/completed-jobs/ historyserver.archive.fs.dir: hdfs://icluster/flink/completed-jobs/
4、 编辑zoo.cfg文件
server.0=pmaster:2888:3888 server.1=pnode1:2888:3888 server.2=pnode2 :2888:3888
5、将flink文件拷贝至其它节点
6、遇到的问题
如果在没有配置环境变量的情况下直接运行 flink on yarn的cluster模式,会报以上错误,因为并没有绑定hadoop类,所以需要执行export HADOOP_CLASSPATH=`hadoop classpath` 或者 添加环境变量
7、测试
cluster模式:
命令flink run -m yarn-cluster ./examples/batch/WordCount.jar
session模式:
命令 yarn-session.sh -tm 1024 -jm 1024 -s 3 -d
能够进入到webui
直接在web界面进行操作: