flink1.12 on yarn的部署


Flink On Yarn的两种模式:

  • Session模式
  • Cluster模式

版本准备:

  • CentOS 7.8 ( 本人是三台,主节点名为:pmaster,备用节点名为:pnode1 ,子节点:pnode2 )
  • Zookeeper 3.6.3
  • Hadoop 2.10.1
  • Flink 1.12

   直接去华为镜像站下载就好了!

检查集群环境

  • 关闭防火墙 service iptables stop,关闭防火墙自启动 chkconfig iptables off
  • 配置各个节点之间的映射关系(vim /etc/hosts),并将文件同步到其余节点 scp xxx node1:`pwd`
  • 每台节点动态ip改为静态ip(云服务器不需要配置)
  • 配置主节点和备用节点向所有节点的免密登录
  • 每台节点的Java环境配置
  • 每台节点的时间进行同步

Zookeeper搭建

  1、上传解压安装包

  2、每台节点配置环境变量: vim /etc/profile

    编辑,插入以下内容:

export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin

    保存退出,source /etc/profile

  3、编辑 ${ZOOKEEPER_HOME}/conf/zoo.cfg (需要将zoo_sample.cfg 复制一份为 cp zoo_sample.cfg zoo.cfg

    添加如下代码(pmaster,pnode1,pnode2 对应节点名):

dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data
server.0=pmaster:2888:3888
server.1=pnode1:2888:3888
server.2=pnode2 :2888:3888

  4、cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data 文件下

    vim myid ,每台节点配置不同数值 数值依次对应到server.0server.1server.2中的0,1,2

  5、配置日志的目录,编辑 ${ZOOKEEPER_HOME}/bin/zkEnv.sh

    修改:ZOO_LOG_DIR=/data/zookeeper/logs

  5、zookeeper文件拷贝到其他服务器,修改对应的data下的myid(每台服务器的myid的数值必须不同)

  6、启动zookeeper

     每台服务器 执行 zkServer.sh start

    jps查看进程 要有QuorumPeerMain

    每台服务器 执行 zkServer.sh status,要有leader节点和follower节点 即分布式zk搭建成功

Hadoop  HA 的搭建

   1、上传解压安装包 (注:flink on yarn的hadoop版本最好在2.4.1以上 )

   2、每台节点配置环境变量:vim /etc/profile

    编辑,新增以下内容:

export HADOOP_HOME=/usr/local/hadoop/hadoop-2.10.1
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin

    保存退出,source /etc/profile 

  3、进入: ${HADOOP_HOME}/etc/hadoop 目录下,所有的更改都在此目录下

     编辑 slaves 文件(配置子节点):

      删除原有的内容,新增 pmaster,pnode1,pnode2 

    编辑 hadoop-env.sh 文件 (配置现有的JAVA环境变量)

export JAVA_HOME=/usr/local/java/jdk1.8.0_linux_111
export JAVA_HOME=${JAVA_HOME}

    编辑 core-site.xml 文件:

<configuration>

<!--集群模式HA下,不能显示的指定的master节点、、名称需要和hdfs-site.xml中一致 -->
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://icluster</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/local/hadoop/hadoop-2.10.1/tmp</value>
  </property>
  <property>
    <name>fs.trash.interval</name>
    <value>1440</value>
  </property>
<!--指定zk地址-->
  <property>
        <name>ha.zookeeper.quorum</name>
        <value>pmaster:2181,pnode1:2181,pnode2:2181</value>
  </property>

</configuration>

    编辑 hdfs-site.xml 文件:

<configuration>

<!-- 指定 namenode 元数据存储的路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hadooplogs/data/namenode</value>
</property>

<!-- 指定 datanode 元据存储的路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hadooplogs/data/datanode</value>
</property>

<!-- 设置副本个数 主要根据集群来进行设置 最多不要超过datanode的个数 一般为3  -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>

<!-- 开启权限验证 则外部程序无法访问hdfs -->
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>

<!-- 开启WebHDFS功能(基于REST的接口服务) -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

<!-- //////////////以下为HDFS HA的配置////////////// -->
<!-- 指定hdfs的nameservices名称为 icluster -->
<property>
<name>dfs.nameservices</name>
<value>icluster</value>
</property>

<!-- 指定cluster的两个namenode的名称分别为nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.icluster</name>
<value>nn1,nn2</value>
</property>

<!-- 配置nn1,nn2的rpc通信端口 -->
<property>
<name>dfs.namenode.rpc-address.icluster.nn1</name>
<value>pmaster:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.icluster.nn2</name>
<value>pnode1:8020</value>
</property>

<!-- 配置nn1,nn2的http通信端口 -->
<property>
<name>dfs.namenode.http-address.icluster.nn1</name>
<value>pmaster:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.icluster.nn2</name>
<value>pnode1:50070</value>
</property>

<!-- 指定namenode元数据存储在journalnode中的路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://pmaster:8485;pnode1:8485;pnode2:8485/icluster</value>
</property>

<!-- 指定journalnode日志文件存储的路径 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/hadooplogs/data/journal</value>
</property>

<!-- 指定HDFS客户端连接active namenode的java类 -->
<property>
<name>dfs.client.failover.proxy.provider.ibuscluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置隔离机制为ssh -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>

<!-- 指定秘钥的位置 生成的秘钥位置要和这个一致 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>

<!-- 开启自动故障转移 namenode挂的时候能自动进行转移 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

</configuration>

    编辑 yarn-site.xml 文件:

<configuration>
<!-- Site specific YARN configuration properties -->

<!-- NodeManager上运行的附属服务,需配置成mapreduce_shuffle才可运行MapReduce程序 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 开启日志聚合功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 配置nodemanager的日志目录 -->
<property>
<name>yarn.nodemanager.log-dirs</name>
<values>/data/yarnlogs</values>
</property>

<!-- 配置日志删除时间为7天,-1为禁用,单位为秒 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

<!-- 配置nodemanager可用的资源内存 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>15360</value>
</property>

<!-- 单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
</property>

<!-- 单个任务可申请的最多物理内存量,默认是8192(MB) -->
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>10240</value>
</property>

<!-- 任务每使用1MB物理内存,最多可使用虚拟内存量,默认是2.1  -->
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<!-- 是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- 配置nodemanafer的vcore -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>6</value>
</property>


<!--///////////////// 开启YARN HA////////////////// -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!-- 启用自动故障转移 -->
<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 指定YARN HA的名称 -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarncluster</value>
</property>

<!-- 指定两个resourcemanager的名称 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<!-- 配置rm1,rm2的主机 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>pmaster</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>pnode1</value>
</property>

<!-- 配置YARN的http端口 -->
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>pmaster:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>pnode1:8088</value>
</property>

<!-- 配置zookeeper的地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>pmaster:2181,pnode1:2181,pnode2:2181</value>
</property>

<!-- 配置zookeeper的存储位置 -->
<property>
<name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>

<!-- 开启yarn resourcemanager restart -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置resourcemanager的状态存储到zookeeper中 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

<!-- 开启yarn nodemanager restart -->
<property>
<name>yarn.nodemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置nodemanager IPC的通信端口 -->
<property>
<name>yarn.nodemanager.address</name>
<value>0.0.0.0:45454</value>
</property>

</configuration>

  4、Hadoop日志配置

    编辑 hadoop-env.sh

#新增hadoop输出日志
export HADOOP_LOG_DIR=/usr/local/soft/hadooplogs 

分离 resourcemanager日志
  编辑:yarn-env.sh
  修改 resourcemanager日志路径: export YARN_CONF_DIR = /data/yarnlogs

配置 nodemanager 日志:
  编辑 yarn-site.xml
  修改:
  <!-- 配置nodemanager的日志目录 -->
  <property>
  <name>yarn.nodemanager.log-dirs</name>
  <values>/usr/local/soft/yarnlogs</values>
  </property>

  4、将更改后的文件拷贝至其他节点

  5、启动Hadoop(前提是启动了zk)  

  • 在所有节点上启动 JournalNode 命令hadoop-daemon.sh start journalnode
    • 启动完成后 jps查看进程 ,所有节点会出现JournalNode进程
  • 启动namenode
    • 在任意一台namenode节点上先进行初始化hdfs namenode -format
    • 初始化完成 会出现success
    • 在当前节点上 启动namenode命令hadoop-daemon.sh start namenode
    • 命名执行成功后 jps 查看进程 会有NameNode进程
    • 在另外一台主节点上执行同步 命令:hdfs namenode -bootstrapStandby
  • 格式化ZK   在已经启动的namenode上面执行
    • 命令:hdfs zkfc -formatZK (第一次需要格式化)
  • 启动hdfs集群,在启动了namenode的节点上执行
    • 命令start-dfs.sh
  • Jps查看进程本地打开 hdfs网址 端口号 50070 两个主节点的hdfs都打开,一个是active,一个是standby killactive的那个namenode,另一个会替换上来。重新启动kill掉的namenode 命令:hadoop-daemon.sh start namenode
    • 主节点上要有namenodeDFSZKFailoverController进程
    • 子节点要有datanode进程
  • 如果启动出现问题或者更改了配置:
    • 更改配置后需要将文件同步至所有节点
    • 删除所有节点 存放元数据的目录 也就是配置的 data目录
    • 重新按照以上步骤进行操作
  • 启动yarn
    • 任意一台主节点上执行命令:start-yarn.sh
    • 另一台主节点上执行:yarn-daemon.sh start resourcemanager
  • Jps查看进程
    • 主节点要有ResourceManager
    • 从节点要有 NodeManager
    • 端口号 8088 查看web ui

 Flink1.12 搭建

  1、上传解压安装包

  2、每台节点配置环境变量: vim /etc/profile

    编辑,插入以下内容:

export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*
export FLINK_HOME=/usr/local/flink/flink-1.12.0
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${FLINK_HOME}/bin

    保存退出,source /etc/profile

  3、配置 ${FLINK_HOME}/conf/flink-conf.yaml 文件

high-availability: zookeeper
high-availability.storageDir: hdfs://ibuscluster/flink/ha/
high-availability.zookeeper.quorum: pmaster:2181,pnode1:2181,pnode2:2181
high-availability.zookeeper.path.root: /flink
#设置ck的状态后端
state.backend: filesystem
state.checkpoints.dir: hdfs://icluster/flink/checkpoints
#设置默认的savepoint的保存位置
state.savepoints.dir: hdfs://icluster/flink/savepoints
# 集群名称不能写错
jobmanager.archive.fs.dir: hdfs://icluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://icluster/flink/completed-jobs/

  4、 编辑zoo.cfg文件

server.0=pmaster:2888:3888
server.1=pnode1:2888:3888
server.2=pnode2 :2888:3888

  5、将flink文件拷贝至其它节点

  6、遇到的问题

     

     如果在没有配置环境变量的情况下直接运行 flink on yarncluster模式,会报以上错误,因为并没有绑定hadoop类,所以需要执行export HADOOP_CLASSPATH=`hadoop classpath` 或者 添加环境变量

    

  7、测试

    cluster模式:

      命令flink run -m yarn-cluster  ./examples/batch/WordCount.jar 

      

     session模式:

      命令 yarn-session.sh -tm 1024 -jm 1024 -s 3 -d

      能够进入到webui

      

       直接在web界面进行操作:

      


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM