flink on yarn
為什么要用yarn?
如果不用yarn.假設有10個job運行在flink集群上,如果有一個出問題.發生了OOM,最后導致taskmanager掛掉.那么jobmanager會調度任務到其他的taskmanager上面.最后是連鎖反應,會造成所有的taskmanager都掛掉.集群掛掉.
所以要用yarn.
flink on yarn 有兩種形式
- yarn per job
- yarn session
yarn per job.
意思就是,一個任務一個集群.如果你這個任務出問題了,那就你自己掛掉,其他集群會活的好好的.
但是缺點是,每個任務都要一個jobmanager.都要單獨的內存.
如果一個jobmanager占用1g內存.那么要是有50個任務就會占用50g內存.浪費.
適合執行時間長的作業
yarn session
因為上一個的缺點,jobmanager浪費內存.因此有了這個模式.
意思就是我在yarn中啟動一個集群,然后不重要的任務,都放在這里面運行,或者在這里面測試.
相當於一個沙箱.
- 適合規模小,執行時間短的作業
- 離線處理
- 共享資源
- 可用增加taskmanager 和終止空閑的taskmanager
yarn-per-job啟動參數
./flink run \
-m yarn-cluster
-yn 1
-yjm 1024
-ytm 4096
-ynm FlinkOnYarnSession-MemberLogInfoProducer
-c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer
/home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
參數名 含義
-m 固定為yarn-cluster
-yjm 指定JobManager所在的Container內存。單位:MB
-ytm 每一個TaskManager Container的內存,單位MB。
-ys 每一個TaskManager中slots的數量。
-ynm YARN中application的名稱。
-c 指定Job對應的jar包中主函數所在類名。
說明
這里只有一個taskmanager.
假設你的任務並行度是5.那么 -ys 就是5 .
假設你的任務需要8G內存.那么 -ytm 就是 8192
-yjm 固定為 1024
例子
/data/flink-1.10.1/bin/flink run \
-m yarn-cluster \
-yjm 1024 \
-ytm 18432 \
-ys 3 \
-p 6 \
-yD env.java.opts="-XX:+UseG1GC" \
-ynm huadan_active_job \
-c com.xxxxxx.FlinkHuadanActiveJob \
/data/hadoop/data/xxxxx-analysis-1.0-SNAPSHOT.jar \
使用 G1 收集器
啟動 2 個 tm ,每個3slot,18G 內存.
Options for yarn-cluster mode:
-d,--detached If present, runs the job in detached
mode
-m,--jobmanager <arg> Address of the JobManager (master) to
which to connect. Use this flag to
connect to a different JobManager than
the one specified in the
configuration.
-yat,--yarnapplicationType <arg> Set a custom application type for the
application on YARN
-yD <property=value> use value for given property
-yd,--yarndetached If present, runs the job in detached
mode (deprecated; use non-YARN
specific option instead)
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with
optional unit (default: MB)
-ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN
application
-ynm,--yarnname <arg> Set a custom name for the application
on YARN
-yq,--yarnquery Display available YARN resources
(memory, cores)
-yqu,--yarnqueue <arg> Specify YARN queue.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
yarn session 啟動參數
先啟動 yarn-session
例子:
#!/bin/bash
/data/flink-1.10.1/bin/yarn-session.sh \
-s 4 \
-jm 1g \
-tm 8g \
-d \
-ynm yarn-flink
yarn-session.sh -n 5 -jm 1024 -tm 2048 -s 2 -d
參數解釋:
// -jm 1024 表示jobmanager 1024M內存
// -tm 1024表示taskmanager 1024M內存
// -s 每一個TaskManager上的slots數量。
//-d 任務后台運行
//-nm,--name YARN上為一個自定義的應用設置一個名字
意思:
啟動一個集群.有5個taskmanager,每個taskmanager 內存2G,2個slot.
但是在開始的時候,是沒有的,在你提交任務的時候,就會創建taskmanager.
比如你提交一個 3並行度的任務,就會創建出來2個taskmanager,一共有4個slot.剩余1個.
你再啟動一個2並行度的任務,那么就會再啟動1個taskmanager,一共 4個slot,還剩1個slot.
因此在yarn-session中,taskmanager的slot不要設置的過多.
n可以大一點,后面可以擴充.
提交到yarn-session
/data/flink-1.10.1/bin/flink run \
-yid {application id} \
-p 3 \
-yD env.java.opts="-XX:+UseG1GC" \
-c com.xxxxxxx.FlinkBatchTimeLengthJob \
/data/hadoop/data/xxxxxxx.jar \
Usage:
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-at,--applicationType Set a custom application type on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode