来源于 https://developer.aliyun.com/article/712704
简介: 本文主要分享 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。
作者:周凯波(宝牛)
1.环境说明
在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.7.2 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。有关开发环境的准备和集群的部署,请参考「开发环境搭建和应用的配置、部署及运行」的内容。
2.课程概要
如下图所示,Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。
在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。
3.Flink 客户端操作
3.1 Flink 命令行
Flink 的命令行参数很多,输入 flink - h 能看到完整的说明:
flink-1.7.2 bin/flink -h
如果想看某一个命令的参数,比如 Run 命令,输入:
flink-1.7.2 bin/flink run -h
本文主要讲解常见的一些操作,更详细的文档请参考: Flink 命令行官方文档。
3.1.1 Standalone
首先启动一个 Standalone 的集群:
flink-1.7.2 bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host zkb-MBP.local. Starting taskexecutor daemon on host zkb-MBP.local.
打开 http://127.0.0.1:8081 能看到 Web 界面。
Run
运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:
flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de
运行起来后默认是 1 个并发:
点左侧「Task Manager」,然后点「Stdout」能看到输出日志:
或者查看本地 Log 目录下的 *.out 文件:
List
查看任务列表:
flink-1.7.2 bin/flink list -m 127.0.0.1:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING) -------------------------------------------------------------- No scheduled jobs.
Stop
停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。
flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb Stopping job d67420e52bd051fae2fddbaa79e046bb. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop the job d67420e52bd051fae2fddbaa79e046bb. at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552) ... 9 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。
/** * 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。 * stop() 方法在任务收到 STOP 信号的时候调用。 * source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。 */ @PublicEvolving public interface StoppableFunction { /** * 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。 * 等待中的数据可以继续发送出去,不需要立即停止。 */ void stop(); }
Cancel
取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de Cancelling job 5e20cb6b0f357591171dfcca2eea09de. Cancelled job 5e20cb6b0f357591171dfcca2eea09de.
也可以在停止的时候显示指定 Savepoint 目录。
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759 Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint. Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7. flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7 total 32K -rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata
取消和停止(流作业)的区别如下:
- cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
- stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。
Savepoint
触发 Savepoint。
flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee You can resume your program from this savepoint with the run command.
说明:Savepoint 和 Checkpoint 的区别(详见文档):
- Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
- Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。
通过 -s 参数从指定的 Savepoint 启动:
flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path.
查看 JobManager 的日志,能够看到类似这样的 Log:
2019-03-28 10:30:53,957 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 () 2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2. 2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.
Modify
修改任务并行度。
为了方便演示,我们修改 conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录。(Modify 参数后面接 -s 指定 Savepoint 路径当前版本可能有 Bug,提示无法识别)
taskmanager.numberOfTaskSlots: 4 state.savepoints.dir: file:///tmp/savepoint
修改参数后需要重启集群生效,然后再启动任务:
flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local. Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local. Starting cluster. Starting standalonesession daemon on host zkb-MBP.local. Starting taskexecutor daemon on host zkb-MBP.local. flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa
从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。
通过 Modify 命令依次将并发度修改为 4 和 3,可以看到每次 Modify 命令都会触发一次 Savepoint。
flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4. flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/ flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3. flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/
查看 JobManager 的日志,可以看到:
2019-06-17 09:05:11,179 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e () 2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3. 2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa. 2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore 2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING. org.apache.flink.util.FlinkException: Job is being rescaled.
Info
Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。
flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar ----------------------- Execution Plan ----------------------- {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]} --------------------------------------------------------------
拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/
可以和实际运行的物理执行计划对比:
3.1.2 Yarn per-job
单任务 Attach 模式
默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。
- 通过 -m yarn-cluster 指定 Yarn 模式
- Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。
- 客户端能看到结果输出
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $echo $HADOOP_CONF_DIR /etc/hadoop/conf/ [admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 2019-06-17 09:15:24,511 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:15:24,907 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-17 09:15:25,430 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-17 09:15:25,438 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-06-17 09:15:36,239 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0724 2019-06-17 09:15:36,276 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0724 2019-06-17 09:15:36,276 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-17 09:15:36,281 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-17 09:15:40,426 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) ... ... (would,2) (wrong,1) (you,1) Program execution finished Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished. Job Runtime: 9371 ms Accumulator Results: - 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]
如果我们以 Attach 模式运行 Streaming 的任务,客户端会一直等待不退出,可以运行以下的例子试验下:
./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
单任务 Detached 模式
- 由于是 Detached 模式,客户端提交完任务就退出了
- Yarn 上显示为 Flink per-job cluster
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar 2019-06-18 09:21:59,247 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-18 09:21:59,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-18 09:22:00,427 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-18 09:22:00,436 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-18 09:22:12,113 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0729 2019-06-18 09:22:12,151 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0729 2019-06-18 09:22:12,151 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-18 09:22:12,155 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1532332183347_0729 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df
3.1.3 Yarn session
启动 Session
./bin/yarn-session.sh -tm 2048 -s 3
表示启动一个 Yarn session 集群,每个 TM 的内存是 2 G,每个 TM 有 3 个 Slot。(注意:-n 参数不生效)
flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3 2019-06-17 09:21:50,177 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-06-17 09:21:50,644 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-06-17 09:21:50,746 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to baoniu (auth:SIMPLE) 2019-06-17 09:21:50,848 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:21:51,148 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3} 2019-06-17 09:21:51,588 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-17 09:21:51,596 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-17 09:22:03,304 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0726 2019-06-17 09:22:03,336 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0726 2019-06-17 09:22:03,336 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-17 09:22:03,340 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-17 09:22:07,722 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-17 09:22:08,050 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://z07.sqa.net:37109
客户端默认是 Attach 模式,不会退出:
- 可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183347_0726 连上来;
- 或者启动的时候用 -d 则为 detached 模式
Yarn 上显示为 Flink session cluster;
- 在本机的临时目录(有些机器是 /tmp 目录)下会生成一个文件:
flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu #Generated YARN properties file #Mon Jun 17 09:22:08 CST 2019 parallelism=3 dynamicPropertiesString= applicationID=application_1532332183347_0726
提交任务
./bin/flink run ./examples/batch/WordCount.jar
将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 Session。
flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar 2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 YARN properties set default parallelism to 3 2019-06-17 09:26:43,097 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:26:43,327 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. ^@(a,5) (action,1) (after,1) (against,1) (all,2) (and,12) ... ... (wrong,1) (you,1) Program execution finished Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished. Job Runtime: 9152 ms Accumulator Results: - fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]
运行结束后 TM 的资源会释放。
提交到指定的 Session
通过 -yid 参数来提交到指定的 Session。
$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar 2019-03-24 12:36:33,668 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-24 12:36:33,837 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708' Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd
注:Blink版本 的 Session 与 Flink 的 Session 的区别:
- Flink 的 session -n 参数不生效,而且不会提前启动 TM;
- Blink 的 session 可以通过 -n 指定启动多少个 TM,而且 TM 会提前起来;
3.2 Scala Shell
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
3.2.1 Deploy
Local
$bin/start-scala-shell.sh local Starting Flink Shell: Starting local Flink cluster (host: localhost, port: 8081). Connecting to Flink cluster (host: localhost, port: 8081). ... ... scala>
任务运行说明:
- Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台;
- Streaming 任务内置了 senv 变量,通过 senv.execute("job name") 来提交任务,且 Datastream 的输出只有在 Local 模式下打印到控制台;
Remote
先启动一个 yarn session cluster:
$./bin/yarn-session.sh -tm 2048 -s 3 2019-03-25 09:52:16,341 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration … ... Flink JobManager is now running on z054.sqa.net:28665 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://z054.sqa.net:28665
启动 scala shell,连到 jm:
$bin/start-scala-shell.sh remote z054.sqa.net 28665 Starting Flink Shell: Connecting to Flink cluster (host: z054.sqa.net, port: 28665). ... ... scala>
Yarn
$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn Starting Flink Shell: 2019-03-25 09:47:44,695 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-03-25 09:47:44,717 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-admin. 2019-03-25 09:47:45,041 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-25 09:47:45,098 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-03-25 09:47:45,266 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-25 09:47:45,275 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored. 2019-03-25 09:47:45,357 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2} 2019-03-25 09:47:45,711 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-03-25 09:47:45,718 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-03-25 09:47:46,514 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0710 2019-03-25 09:47:46,534 INFO org.apache.hadoop.yarn