flink on yarn


為什么要用yarn?

如果不用yarn.假設有10個job運行在flink集群上,如果有一個出問題.發生了OOM,最后導致taskmanager掛掉.那么jobmanager會調度任務到其他的taskmanager上面.最后是連鎖反應,會造成所有的taskmanager都掛掉.集群掛掉.

所以要用yarn.

flink on yarn 有兩種形式

  • yarn per job
  • yarn session

yarn per job.

意思就是,一個任務一個集群.如果你這個任務出問題了,那就你自己掛掉,其他集群會活的好好的.

但是缺點是,每個任務都要一個jobmanager.都要單獨的內存.

如果一個jobmanager占用1g內存.那么要是有50個任務就會占用50g內存.浪費.

適合執行時間長的作業

yarn session

因為上一個的缺點,jobmanager浪費內存.因此有了這個模式.

意思就是我在yarn中啟動一個集群,然后不重要的任務,都放在這里面運行,或者在這里面測試.

相當於一個沙箱.

  • 適合規模小,執行時間短的作業
  • 離線處理
  • 共享資源
  • 可用增加taskmanager 和終止空閑的taskmanager

yarn-per-job啟動參數

./flink run \
-m yarn-cluster 
-yn 1 
-yjm 1024 
-ytm 4096 
-ynm FlinkOnYarnSession-MemberLogInfoProducer
-c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer 
/home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
參數名	含義
-m	固定為yarn-cluster
-yjm	指定JobManager所在的Container內存。單位:MB
-ytm	每一個TaskManager Container的內存,單位MB。
-ys	每一個TaskManager中slots的數量。
-ynm	YARN中application的名稱。
-c	指定Job對應的jar包中主函數所在類名。

說明

這里只有一個taskmanager.

假設你的任務並行度是5.那么 -ys 就是5 .

假設你的任務需要8G內存.那么 -ytm 就是 8192

-yjm 固定為 1024

例子

/data/flink-1.10.1/bin/flink run \
-m yarn-cluster \
-yjm 1024 \
-ytm 18432 \
-ys 3 \
-p 6 \
-yD env.java.opts="-XX:+UseG1GC" \
-ynm huadan_active_job \
-c com.xxxxxx.FlinkHuadanActiveJob \
/data/hadoop/data/xxxxx-analysis-1.0-SNAPSHOT.jar \

使用 G1 收集器
啟動 2 個 tm ,每個3slot,18G 內存.

Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -yat,--yarnapplicationType <arg>     Set a custom application type for the
                                          application on YARN
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

yarn session 啟動參數

先啟動 yarn-session

例子:

#!/bin/bash

/data/flink-1.10.1/bin/yarn-session.sh \
-s 4 \
-jm 1g \
-tm 8g \
-d \
-ynm yarn-flink

yarn-session.sh -n 5 -jm 1024 -tm 2048 -s 2 -d
參數解釋:
// -jm 1024 表示jobmanager 1024M內存
// -tm 1024表示taskmanager 1024M內存
// -s  每一個TaskManager上的slots數量。
//-d 任務后台運行
//-nm,--name YARN上為一個自定義的應用設置一個名字


意思:
啟動一個集群.有5個taskmanager,每個taskmanager 內存2G,2個slot.
但是在開始的時候,是沒有的,在你提交任務的時候,就會創建taskmanager.
比如你提交一個 3並行度的任務,就會創建出來2個taskmanager,一共有4個slot.剩余1個.
你再啟動一個2並行度的任務,那么就會再啟動1個taskmanager,一共 4個slot,還剩1個slot.
 
因此在yarn-session中,taskmanager的slot不要設置的過多.
n可以大一點,后面可以擴充.

提交到yarn-session

/data/flink-1.10.1/bin/flink run \
-yid {application id} \
-p 3 \
-yD env.java.opts="-XX:+UseG1GC" \
-c com.xxxxxxx.FlinkBatchTimeLengthJob \
/data/hadoop/data/xxxxxxx.jar \

Usage:
   Optional
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -nm,--name                      Set a custom name for the application on YARN
     -at,--applicationType           Set a custom application type on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM