flink on yarn模式下兩種提交job方式


 yarn集群搭建,參見hadoop 完全分布式集群搭建

 

通過yarn進行資源管理,flink的任務直接提交到hadoop集群

1、hadoop集群啟動,yarn需要運行起來。確保配置HADOOP_HOME環境變量。

2、flink on yarn的交互圖解

 

 
 
3、flink運行在yarn模式下,有兩種任務提交模式,資源消耗各不相同。
第一種yarn seesion(Start a long-running Flink cluster on YARN)這種方式需要先啟動集群,然后在提交作業,接着會向yarn申請一塊空間后,資源永遠保持不變。
如果資源滿了,下一個作業就無法提交,只能等到yarn中的其中一個作業執行完成后,釋放了資源,那下一個作業才會正常提交.
這種方式資源被限制在session中,不能超過,比較適合特定的運行環境或者測試環境。

第二種Flink run直接在YARN上提交運行Flink作業(Run a Flink job on YARN),這種方式的好處是一個任務會對應一個job,即沒提交一個作業會根據自身的情況,向yarn申請資源,直到作業執行完成,並不會影響下一個作業的正常運行,除非是yarn上面沒有任何資源的情況下。
一般生產環境是采用此種方式運行。這種方式就需要確保集群資源足夠。
注意:Jobmanager和AM運行於同一個container。一旦創建成功,AM就知道了Jobmanager的地址。它會生成一個新的flink配置文件,這個配置文件是給將要啟動的taskManager用的,該配置文件也會上傳到hdfs。
另外,AM的container也提供了Flink的web接口。Yarn代碼申請的端口都是臨時端口,目的是為了讓用戶並行啟動多個Flink YARN Session。

 

4、Flink yarn session部署

配置flink目錄下conf/flink-conf.yaml

jobmanager.rpc.address: vmhome10.com

配置slaves

加入taskmanager節點ip或主機名

 PS:這個步驟對yarn-session沒有效果,配置會被覆蓋。

4.1 客戶端模式

直接執行bin/yarn-session.sh啟動,其缺省配置是:
masterMemoryMB=1024
taskManagerMemoryMB=1024
numberTaskManagers=1
slotsPerTaskManager=1

yarn-session的參數

bin/yarn-session.sh –help

Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

啟動一個session:1個taskmanager,jobmanager內存1G,taskmanager內存1G

yarn-session.sh -n 1 -jm 1024 -tm 1024

flink的管理頁面在session啟動后,最后會顯示,也可以通過yarn的管理頁面里applicationMaster鏈接進去。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

運行yarn-session的主機上會運行FlinkYarnSessionCli和YarnSessionClusterEntrypoint兩個進程。

在yarn-session提交的主機上必然運行FlinkYarnSessionCli,這個進場代表本節點可以命令方式提交job,而且可以不用指定-m參數。

YarnSessionClusterEntrypoint進場代表yarn-session集群入口,實際就是jobmanager節點,也是yarn的ApplicationMaster節點。

這兩個進程可能會出現在同一節點上。

 

系統默認使用con/flink-conf.yaml里的配置。
Flink on yarn將會覆蓋掉幾個參數:jobmanager.rpc.address因為jobmanager的在集群的運行位置並不是實現確定的,它就是am的地址;taskmanager.tmp.dirs使用yarn給定的臨時目錄;
parallelism.default也會被覆蓋掉,如果在命令行里指定了slot數。

點擊yarn頁面里的ApplicationMaster可以直接鏈接到flink的管理頁面

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

提交任務:

在提交yarn-session的這個主機上提交:
bin/flink run ~/flink-demo-wordcount.jar

注意:其他節點主機不能提交

可以在flink的web管理頁面上提交
可以啟動多個yarn session,一個yarn session模式對應一個JobManager,並按照需求提交作業,同一個Session中可以提交多個Flink作業。

 

PS:如果需要在其他主機節點提交JOB,需要在命令行中用-m參數指定yarn-session啟動后,系統自動分配的ApplicationMaster主機和節點和端口

如下圖中yarn-session啟動成功后,會提示一個主機和端口后,這個就是JobManager(也是ApplicationMaster)

 

使用-m參數可以在任意集群主機提交JOB。

bin/flink run -m vmhome10.com:43258 examples/batch/WordCount.jar

 

停止作業:

yarn application -kill命令

[hadoop@vmhome11 ~]$ yarn application -kill application_1565315695164_0001
19/08/09 11:55:24 INFO client.RMProxy: Connecting to ResourceManager at vmhome11.com/192.168.44.11:8032
Killing application application_1565315695164_0001
19/08/09 11:55:24 INFO impl.YarnClientImpl: Killed application application_1565315695164_0001

對於客戶端模式而言,可以啟動多個yarn session,一個yarn session模式對應一個JobManager,並按照需求提交作業,同一個Session中可以提交多個Flink作業。

 

4.2、分離模式運行yarn-session

yarn-session的參數介紹
  -n : 指定TaskManager的數量;
  -d: 以分離模式運行;
  -id:指定yarn的任務ID;
  -j:Flink jar文件的路徑;
  -jm:JobManager容器的內存(默認值:MB);
  -nl:為YARN應用程序指定YARN節點標簽;
  -nm:在YARN上為應用程序設置自定義名稱;
  -q:顯示可用的YARN資源(內存,內核);
  -qu:指定YARN隊列;
  -s:指定TaskManager中slot的數量;
  -st:以流模式啟動Flink;
  -tm:每個TaskManager容器的內存(默認值:MB);
  -z:命名空間,用於為高可用性模式創建Zookeeper子路徑;
對於分離式模式,並不像客戶端那樣可以啟動多個yarn session,如果啟動多個,會出現下面的session一直處在等待狀態。JobManager的個數只能是一個,同一個Session中可以提交多個Flink作業。
如果想要停止Flink Yarn Application,需要通過yarn application -kill命令來停止。通過-d指定分離模式,即客戶端在啟動Flink Yarn Session后,就不再屬於Yarn Cluster的一部分。
啟動分離模式
yarn-session.sh -n 2 -s 6 -jm 1024 -tm 1024 -nm test -d

5、Flink run 方式提交(推薦模式)

yarn session需要先啟動一個集群,然后在提交作業。
對於Flink run直接提交作業就相對比較簡單,不需要額外的去啟動一個集群,直接提交作業,即可完成Flink作業。

命令: bin/flink run -m yarn-cluster examples/batch/WordCount.jar,注意使用參數-m yarn-cluster提交到yarn集群。

[hadoop@vmhome12 flink-1.8.0]$ bin/flink run -m yarn-cluster examples/batch/WordCount.jar
2019-08-09 15:05:05,200 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at vmhome11.com/192.168.44.11:8032
2019-08-09 15:05:05,359 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-08-09 15:05:05,359 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-08-09 15:05:05,552 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of these to 
   be set to properly load the Hadoop configuration for accessing YARN. 2019-08-09 15:05:05,573 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification:
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} 2019-08-09 15:05:06,147 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/opt/flink-1.8.0/conf') contains both LOG4J and Logback configuration files.
Please delete or rename one of them. 2019-08-09 15:05:07,858 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1565315695164_0005 2019-08-09 15:05:07,875 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1565315695164_0005 2019-08-09 15:05:07,875 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-08-09 15:05:07,877 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-08-09 15:05:12,915 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. Starting execution of program

在yarn的web監控頁面上可以看到:

 

 6、附加到某個yarn-session上(運行到指定的yarn session)

可以指定 -yid,--yarnapplicationId <arg> Attach to running YARN session來附加到到特定的yarn session上運行

先啟動一個yarn-session

[hadoop@vmhome10 flink-1.8.0]$ bin/yarn-session.sh -nm vmhome10_test
2019-08-09 15:21:06,690 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, 192.168.44.10
2019-08-09 15:21:06,691 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-08-09 15:21:06,691 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-08-09 15:21:06,691 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-08-09 15:21:06,691 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-08-09 15:21:06,692 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-08-09 15:21:07,252 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-08-09 15:21:07,416 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to hadoop (auth:SIMPLE)
2019-08-09 15:21:07,502 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at vmhome11.com/192.168.44.11:8032
2019-08-09 15:21:07,756 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. 
                                The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2019-08-09 15:21:07,780 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification:
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1} 2019-08-09 15:21:08,157 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/opt/flink-1.8.0/conf') contains both LOG4J and Logback configuration files.
                                         Please delete or rename one of them. 2019-08-09 15:21:09,717 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1565315695164_0006 2019-08-09 15:21:09,738 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1565315695164_0006 2019-08-09 15:21:09,739 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-08-09 15:21:09,746 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-08-09 15:21:14,603 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-08-09 15:21:15,004 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on vmhome11.com:37392 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://vmhome11.com:37392

 

 

 

 

 

 

附加到0006這個application id上運行job

[hadoop@vmhome12 flink-1.8.0]$ bin/flink run -yid 

 7、關於啟動yarn-session或者flink run -m yarn-cluster報錯

如果報錯信息有如下內容,並且其他報錯信息中還包含內存或者虛擬內存不足的情況:
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143


這個報錯是因為yarn強制檢查虛擬內存是否符合配置導致的,當我們的服務器或者虛擬機的內存達不到配置要求,可能就會報這個錯誤
在測試環境可以關閉虛擬內存的強制檢查。在生產環境我們需要分析配置文件以及服務器真實虛擬內存具體情況。
解決辦法:
在etc/hadoop/yarn-site.xml文件中,修改檢查虛擬內存的屬性為false,如下:
 

    <property>  
        <name>yarn.nodemanager.vmem-check-enabled</name>  
        <value>false</value>  
    </property>  

 

 

YarnSessionClusterEntrypoint


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM