flink1.12 on yarn的部署


Flink On Yarn的兩種模式:

  • Session模式
  • Cluster模式

版本准備:

  • CentOS 7.8 ( 本人是三台,主節點名為:pmaster,備用節點名為:pnode1 ,子節點:pnode2 )
  • Zookeeper 3.6.3
  • Hadoop 2.10.1
  • Flink 1.12

   直接去華為鏡像站下載就好了!

檢查集群環境

  • 關閉防火牆 service iptables stop,關閉防火牆自啟動 chkconfig iptables off
  • 配置各個節點之間的映射關系(vim /etc/hosts),並將文件同步到其余節點 scp xxx node1:`pwd`
  • 每台節點動態ip改為靜態ip(雲服務器不需要配置)
  • 配置主節點和備用節點向所有節點的免密登錄
  • 每台節點的Java環境配置
  • 每台節點的時間進行同步

Zookeeper搭建

  1、上傳解壓安裝包

  2、每台節點配置環境變量: vim /etc/profile

    編輯,插入以下內容:

export ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin

    保存退出,source /etc/profile

  3、編輯 ${ZOOKEEPER_HOME}/conf/zoo.cfg (需要將zoo_sample.cfg 復制一份為 cp zoo_sample.cfg zoo.cfg

    添加如下代碼(pmaster,pnode1,pnode2 對應節點名):

dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data
server.0=pmaster:2888:3888
server.1=pnode1:2888:3888
server.2=pnode2 :2888:3888

  4、cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data 文件下

    vim myid ,每台節點配置不同數值 數值依次對應到server.0server.1server.2中的0,1,2

  5、配置日志的目錄,編輯 ${ZOOKEEPER_HOME}/bin/zkEnv.sh

    修改:ZOO_LOG_DIR=/data/zookeeper/logs

  5、zookeeper文件拷貝到其他服務器,修改對應的data下的myid(每台服務器的myid的數值必須不同)

  6、啟動zookeeper

     每台服務器 執行 zkServer.sh start

    jps查看進程 要有QuorumPeerMain

    每台服務器 執行 zkServer.sh status,要有leader節點和follower節點 即分布式zk搭建成功

Hadoop  HA 的搭建

   1、上傳解壓安裝包 (注:flink on yarn的hadoop版本最好在2.4.1以上 )

   2、每台節點配置環境變量:vim /etc/profile

    編輯,新增以下內容:

export HADOOP_HOME=/usr/local/hadoop/hadoop-2.10.1
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin

    保存退出,source /etc/profile 

  3、進入: ${HADOOP_HOME}/etc/hadoop 目錄下,所有的更改都在此目錄下

     編輯 slaves 文件(配置子節點):

      刪除原有的內容,新增 pmaster,pnode1,pnode2 

    編輯 hadoop-env.sh 文件 (配置現有的JAVA環境變量)

export JAVA_HOME=/usr/local/java/jdk1.8.0_linux_111
export JAVA_HOME=${JAVA_HOME}

    編輯 core-site.xml 文件:

<configuration>

<!--集群模式HA下,不能顯示的指定的master節點、、名稱需要和hdfs-site.xml中一致 -->
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://icluster</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/local/hadoop/hadoop-2.10.1/tmp</value>
  </property>
  <property>
    <name>fs.trash.interval</name>
    <value>1440</value>
  </property>
<!--指定zk地址-->
  <property>
        <name>ha.zookeeper.quorum</name>
        <value>pmaster:2181,pnode1:2181,pnode2:2181</value>
  </property>

</configuration>

    編輯 hdfs-site.xml 文件:

<configuration>

<!-- 指定 namenode 元數據存儲的路徑 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hadooplogs/data/namenode</value>
</property>

<!-- 指定 datanode 元據存儲的路徑 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hadooplogs/data/datanode</value>
</property>

<!-- 設置副本個數 主要根據集群來進行設置 最多不要超過datanode的個數 一般為3  -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>

<!-- 開啟權限驗證 則外部程序無法訪問hdfs -->
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>

<!-- 開啟WebHDFS功能(基於REST的接口服務) -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>

<!-- //////////////以下為HDFS HA的配置////////////// -->
<!-- 指定hdfs的nameservices名稱為 icluster -->
<property>
<name>dfs.nameservices</name>
<value>icluster</value>
</property>

<!-- 指定cluster的兩個namenode的名稱分別為nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.icluster</name>
<value>nn1,nn2</value>
</property>

<!-- 配置nn1,nn2的rpc通信端口 -->
<property>
<name>dfs.namenode.rpc-address.icluster.nn1</name>
<value>pmaster:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.icluster.nn2</name>
<value>pnode1:8020</value>
</property>

<!-- 配置nn1,nn2的http通信端口 -->
<property>
<name>dfs.namenode.http-address.icluster.nn1</name>
<value>pmaster:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.icluster.nn2</name>
<value>pnode1:50070</value>
</property>

<!-- 指定namenode元數據存儲在journalnode中的路徑 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://pmaster:8485;pnode1:8485;pnode2:8485/icluster</value>
</property>

<!-- 指定journalnode日志文件存儲的路徑 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/hadooplogs/data/journal</value>
</property>

<!-- 指定HDFS客戶端連接active namenode的java類 -->
<property>
<name>dfs.client.failover.proxy.provider.ibuscluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置隔離機制為ssh -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>

<!-- 指定秘鑰的位置 生成的秘鑰位置要和這個一致 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>

<!-- 開啟自動故障轉移 namenode掛的時候能自動進行轉移 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

</configuration>

    編輯 yarn-site.xml 文件:

<configuration>
<!-- Site specific YARN configuration properties -->

<!-- NodeManager上運行的附屬服務,需配置成mapreduce_shuffle才可運行MapReduce程序 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 開啟日志聚合功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 配置nodemanager的日志目錄 -->
<property>
<name>yarn.nodemanager.log-dirs</name>
<values>/data/yarnlogs</values>
</property>

<!-- 配置日志刪除時間為7天,-1為禁用,單位為秒 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>

<!-- 配置nodemanager可用的資源內存 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>15360</value>
</property>

<!-- 單個任務可申請的最少物理內存量,默認是1024(MB),如果一個任務申請的物理內存量少於該值,則該對應的值改為這個數 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>256</value>
</property>

<!-- 單個任務可申請的最多物理內存量,默認是8192(MB) -->
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>10240</value>
</property>

<!-- 任務每使用1MB物理內存,最多可使用虛擬內存量,默認是2.1  -->
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

<!--是否啟動一個線程檢查每個任務正使用的物理內存量,如果任務超出分配值,則直接將其殺掉,默認是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<!-- 是否啟動一個線程檢查每個任務正使用的虛擬內存量,如果任務超出分配值,則直接將其殺掉,默認是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- 配置nodemanafer的vcore -->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>6</value>
</property>


<!--///////////////// 開啟YARN HA////////////////// -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<!-- 啟用自動故障轉移 -->
<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 指定YARN HA的名稱 -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarncluster</value>
</property>

<!-- 指定兩個resourcemanager的名稱 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<!-- 配置rm1,rm2的主機 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>pmaster</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>pnode1</value>
</property>

<!-- 配置YARN的http端口 -->
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>pmaster:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>pnode1:8088</value>
</property>

<!-- 配置zookeeper的地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>pmaster:2181,pnode1:2181,pnode2:2181</value>
</property>

<!-- 配置zookeeper的存儲位置 -->
<property>
<name>yarn.resourcemanager.zk-state-store.parent-path</name>
<value>/rmstore</value>
</property>

<!-- 開啟yarn resourcemanager restart -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置resourcemanager的狀態存儲到zookeeper中 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

<!-- 開啟yarn nodemanager restart -->
<property>
<name>yarn.nodemanager.recovery.enabled</name>
<value>true</value>
</property>

<!-- 配置nodemanager IPC的通信端口 -->
<property>
<name>yarn.nodemanager.address</name>
<value>0.0.0.0:45454</value>
</property>

</configuration>

  4、Hadoop日志配置

    編輯 hadoop-env.sh

#新增hadoop輸出日志
export HADOOP_LOG_DIR=/usr/local/soft/hadooplogs 

分離 resourcemanager日志
  編輯:yarn-env.sh
  修改 resourcemanager日志路徑: export YARN_CONF_DIR = /data/yarnlogs

配置 nodemanager 日志:
  編輯 yarn-site.xml
  修改:
  <!-- 配置nodemanager的日志目錄 -->
  <property>
  <name>yarn.nodemanager.log-dirs</name>
  <values>/usr/local/soft/yarnlogs</values>
  </property>

  4、將更改后的文件拷貝至其他節點

  5、啟動Hadoop(前提是啟動了zk)  

  • 在所有節點上啟動 JournalNode 命令hadoop-daemon.sh start journalnode
    • 啟動完成后 jps查看進程 ,所有節點會出現JournalNode進程
  • 啟動namenode
    • 在任意一台namenode節點上先進行初始化hdfs namenode -format
    • 初始化完成 會出現success
    • 在當前節點上 啟動namenode命令hadoop-daemon.sh start namenode
    • 命名執行成功后 jps 查看進程 會有NameNode進程
    • 在另外一台主節點上執行同步 命令:hdfs namenode -bootstrapStandby
  • 格式化ZK   在已經啟動的namenode上面執行
    • 命令:hdfs zkfc -formatZK (第一次需要格式化)
  • 啟動hdfs集群,在啟動了namenode的節點上執行
    • 命令start-dfs.sh
  • Jps查看進程本地打開 hdfs網址 端口號 50070 兩個主節點的hdfs都打開,一個是active,一個是standby killactive的那個namenode,另一個會替換上來。重新啟動kill掉的namenode 命令:hadoop-daemon.sh start namenode
    • 主節點上要有namenodeDFSZKFailoverController進程
    • 子節點要有datanode進程
  • 如果啟動出現問題或者更改了配置:
    • 更改配置后需要將文件同步至所有節點
    • 刪除所有節點 存放元數據的目錄 也就是配置的 data目錄
    • 重新按照以上步驟進行操作
  • 啟動yarn
    • 任意一台主節點上執行命令:start-yarn.sh
    • 另一台主節點上執行:yarn-daemon.sh start resourcemanager
  • Jps查看進程
    • 主節點要有ResourceManager
    • 從節點要有 NodeManager
    • 端口號 8088 查看web ui

 Flink1.12 搭建

  1、上傳解壓安裝包

  2、每台節點配置環境變量: vim /etc/profile

    編輯,插入以下內容:

export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*
export FLINK_HOME=/usr/local/flink/flink-1.12.0
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${FLINK_HOME}/bin

    保存退出,source /etc/profile

  3、配置 ${FLINK_HOME}/conf/flink-conf.yaml 文件

high-availability: zookeeper
high-availability.storageDir: hdfs://ibuscluster/flink/ha/
high-availability.zookeeper.quorum: pmaster:2181,pnode1:2181,pnode2:2181
high-availability.zookeeper.path.root: /flink
#設置ck的狀態后端
state.backend: filesystem
state.checkpoints.dir: hdfs://icluster/flink/checkpoints
#設置默認的savepoint的保存位置
state.savepoints.dir: hdfs://icluster/flink/savepoints
# 集群名稱不能寫錯
jobmanager.archive.fs.dir: hdfs://icluster/flink/completed-jobs/
historyserver.archive.fs.dir: hdfs://icluster/flink/completed-jobs/

  4、 編輯zoo.cfg文件

server.0=pmaster:2888:3888
server.1=pnode1:2888:3888
server.2=pnode2 :2888:3888

  5、將flink文件拷貝至其它節點

  6、遇到的問題

     

     如果在沒有配置環境變量的情況下直接運行 flink on yarncluster模式,會報以上錯誤,因為並沒有綁定hadoop類,所以需要執行export HADOOP_CLASSPATH=`hadoop classpath` 或者 添加環境變量

    

  7、測試

    cluster模式:

      命令flink run -m yarn-cluster  ./examples/batch/WordCount.jar 

      

     session模式:

      命令 yarn-session.sh -tm 1024 -jm 1024 -s 3 -d

      能夠進入到webui

      

       直接在web界面進行操作:

      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM