一、Yarn的簡介
ResourceManager
ResourceManager 負責整個集群的資源管理和分配,是一個全局的資源管理系統。 NodeManager 以心跳的方式向 ResourceManager 匯報資源使用情況(目前主要是 CPU 和內存的使用情況)。RM 只接受 NM 的資源回報信息,對於具體的資源處理則交給 NM 自己處理。
NodeManager
NodeManager 是每個節點上的資源和任務管理器,它是管理這台機器的代理,負責該節點程序的運行,以及該節點資源的管理和監控。YARN 集群每個節點都運行一個NodeManager。
NodeManager 定時向 ResourceManager 匯報本節點資源(CPU、內存)的使用情況和Container 的運行狀態。當 ResourceManager 宕機時 NodeManager 自動連接 RM 備用節點。
NodeManager 接收並處理來自 ApplicationMaster 的 Container 啟動、停止等各種請求。
ApplicationMaster
負責與 RM 調度器協商以獲取資源(用 Container 表示)。
將得到的任務進一步分配給內部的任務(資源的二次分配)。
與 NM 通信以啟動/停止任務。
監控所有任務運行狀態,並在任務運行失敗時重新為任務申請資源以重啟任務
Flink on yarn 集群啟動步驟 :
- 用戶向YARN中提交應用程序,其中包括ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等。
- ResourceManager為該應用程序分配第一個Container,並與對應的Node-Manager通信,要求它在這個Container中啟動應用程序的ApplicationMaster。
- ApplicationMaster首先向ResourceManager注冊,這樣用戶可以直接通過ResourceManager查看應用程序的運行狀態,然后它將為各個任務申請資源,並監控它的運行狀態,直到運行結束,即重復步驟4~7。
- ApplicationMaster采用輪詢的方式通過RPC協議向ResourceManager申請和領取資源。
- 一旦ApplicationMaster申請到資源后,便與對應的NodeManager通信,要求它啟動任務。
- NodeManager為任務設置好運行環境(包括環境變量、JAR包、二進制程序等)后,將任務啟動命令寫到一個腳本中,並通過運行該腳本啟動任務。
- 各個任務通過某個RPC協議向ApplicationMaster匯報自己的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務。 在應用程序運行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應用程序的當前運行狀態。
- 應用程序運行完成后,ApplicationMaster向ResourceManager注銷並關閉自己。
Flink on YARN 有兩種模式:Session模式和Per-Job模式。
在Session模式中多個 JobManager 共享 Dispatcher 和 YarnResourceManager。在這種模式下,需要先向 YARN 申請資源,初始化一個常駐服務在 YARN 上,后續提交的Job都將運行在這個Session上:
而Per-Job模式則相反,一個 JobManager 獨享 Dispatcher 和 YarnResourceManager。也就是說每提交一個Job都新建一個Session,不同Job之間的資源是隔離的,不會互相影響:
二、啟動命令
1.我們開啟一個YARN session:
./bin/yarn-session.sh -n 4 -tm 8192 -s 8
上面命令啟動了4個TaskManager,每個TaskManager內存為8G且占用了8個核(是每個TaskManager,默認是1個核)。在啟動YARN session的時候會加載conf/flink-config.yaml配置文件,我們可以根據自己的需求去修改里面的相關參數。
2.YARN session啟動之后就可以使用bin/flink來啟動提交作業:
./bin/flink run -c com.demo.wangzhiwu.WordCount $DEMO_DIR/target/flink-demo-1.0.SNAPSHOT.jar --port 9000
3.上面的YARN session是在Hadoop YARN環境下啟動一個Flink cluster集群,里面的資源是可以共享給其他的Flink作業,是Flink on YARN Session模式。我們還可以在YARN上啟動一個Flink作業,這里我們還是使用./bin/flink,但是不需要事先啟動YARN session,這就是Flink on YARN Per-Job模式:
./bin/flink run -m yarn-cluster -c com.demo.wangzhiwu.WordCount -ys 2 ./examples/batch/WordCount.jar --input hdfs://user/hadoop/input.txt --output hdfs://user/hadoop/output.txt
該命令同樣會啟動一個類似於YARN session啟動的頁面。
4.后台運行 yarn session:
如果你不希望flink yarn client一直運行,也可以啟動一個后台運行的yarn session。使用這個參數:-d 或者 --detached。在這種情況下,flink yarn client將會只提交任務到集群然后關閉自己。注意:在這種情況下,無法使用flink停止yarn session,必須使用yarn工具來停止yarn session。
yarn application -kill $applicationId
5.日志文件查看:
在某種情況下,flink yarn session 部署失敗是由於它自身的原因,用戶必須依賴於yarn的日志來進行分析。最有用的就是yarn log aggregation 。啟動它,用戶必須在yarn-site.xml文件中設置yarn.log-aggregation-enable 屬性為true。一旦啟用了,用戶可以通過下面的命令來查看一個失敗的yarn session的所有詳細日志。
yarn logs -applicationId $applicationId
三、flink啟動參數
1 參數必選 : -n,--container <arg> 分配多少個yarn容器 (=taskmanager的數量) 2 參數可選 : -D <arg> 動態屬性 -d,--detached 獨立運行 -jm,--jobManagerMemory <arg> JobManager的內存 [in MB] -nm,--name 在YARN上為一個自定義的應用設置一個名字 -q,--query 顯示yarn中可用的資源 (內存, cpu核數) -qu,--queue <arg> 指定YARN隊列. -s,--slots <arg> 每個TaskManager使用的slots數量 -tm,--taskManagerMemory <arg> 每個TaskManager的內存 [in MB] -z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任務id,附着到一個后台運行的yarn session中 3 run [OPTIONS] <jar-file> <arguments> run操作參數: -c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定 -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同於配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的並行度。可以覆蓋配置文件中的默認值。 4 啟動一個新的yarn-session,它們都有一個y或者yarn的前綴 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 連接指定host和port的jobmanager: ./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 啟動一個新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 5 注意:命令行的選項也可以使用./bin/flink 工具獲得。 6 Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). 7 Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer <arg> Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode