Flink on yarn


1、准備

准備集群

  • Zookeeper集群
  • Hadoop集群

准備flink jar包

官網地址:https://flink.apache.org/downloads.html

flink-1.8之后沒有集成hadoop,需要下載對應的hadoop jar包

1.8之前:

1.8之后:

需要下載對應hadoop的組件(然后放入flink的lib目錄下)

配置Hadoop的環境變量

配置flink配置文件

置jobmanager地址

jobmanager.rpc.address: bigdata-03

其他自己看情況配置

配置master和slaves

2、Flink yarn

Session Cluster模式

(1) 啟動hadoop集群(略)

(2)需要自己自定義配置的話,可以使用來查看參數:

bin/yarn-session.sh –help
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

 

yarn-session的參數介紹
  -n : 指定TaskManager的數量;
  -d: 以分離模式運行;
  -id:指定yarn的任務ID;
  -j:Flink jar文件的路徑;
  -jm:JobManager容器的內存(默認值:MB);
  -nl:為YARN應用程序指定YARN節點標簽;
  -nm:在YARN上為應用程序設置自定義名稱;
  -q:顯示可用的YARN資源(內存,內核);
  -qu:指定YARN隊列;
  -s:指定TaskManager中slot的數量;
  -st:以流模式啟動Flink;
  -tm:每個TaskManager容器的內存(默認值:MB);
  -z:命名空間,用於為高可用性模式創建Zookeeper子路徑;

 

(3)啟動yarn-session

./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
其中:
-n(--container):TaskManager的數量。
-s(--slots):	每個TaskManager的slot數量,默認一個slot一個core,默認每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做冗余。
-jm:JobManager的內存(單位MB)。
-tm:每個taskmanager的內存(單位MB)。
-nm:yarn 的appName(現在yarn的ui上的名字)。 
-d:后台執行。

(4)執行程序:

./flink run -c com.duoduo.WordCount  wordcount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

(5)yarn上查看

(6)取消yarn-session

yarn application --kill application_1577588252906_0001

Per Job Cluster模式

(1)啟動hadoop集群(略)

(2)不啟動yarn-session,直接執行job

./flink run –m yarn-cluster -c com.atguigu.WordCount  WordCount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

3、遇到的問題

3.1 java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

2020-05-21 16:09:11,255 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli   - Error while running the Flink session.
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

 

解決:

我的hadoop是2.73,放入flink-shaded-hadoop-2-uber-2.7.7-10.0.jar依舊報上面錯誤,換成高一個版本就好了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

3.2 在啟動日志中發現如下錯誤

org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more

 

可以參考以下Flink官網的提示https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html

解決辦法就是在環境變量中增加

export HADOOP_CLASSPATH=`hadoop classpath`

3.3 Couldn't deploy Yarn session cluster

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 10 exceeds the maximum number of virtual cores 7 available in the YarnCluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'
    at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:292)
    at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373)
    ... 7 more

 

解決:

yarn.containers.vcores設置了虛擬的cores=7,taskManager的slot我設置了10個,資源不夠
方法一:要么調大vcores參數

方法二:減少slot個數taskmanager.numberOfTaskSlots: 5

參考地址:https://www.jianshu.com/p/1b05202c4fb6
參考地址:https://blog.csdn.net/joseph25/article/details/88878350


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM