來源於 https://developer.aliyun.com/article/712704
簡介: 本文主要分享 Flink 的 5 種任務提交的方式。熟練掌握各種任務提交方式,有利於提高我們日常的開發和運維效率。
作者:周凱波(寶牛)
1.環境說明
在前面幾期的課程里面講過了 Flink 開發環境的搭建和應用的部署以及運行,今天的課程主要是講 Flink 的客戶端操作。本次講解以實際操作為主。這次課程是基於社區的 Flink 1.7.2 版本,操作系統是 Mac 系統,瀏覽器是 Google Chrome 瀏覽器。有關開發環境的准備和集群的部署,請參考「開發環境搭建和應用的配置、部署及運行」的內容。
2.課程概要
如下圖所示,Flink 提供了豐富的客戶端操作來提交任務和與任務進行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用於提交 SQL 任務的運行,還有就是 Scala Shell 提交 Table API 的任務。同時,Flink 也提供了Restful 服務,用戶可以通過 http 方式進行調用。此外,還有 Web 的方式可以提交任務。
在 Flink 安裝目錄的 bin 目錄下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,這些都是客戶端操作的入口。
3.Flink 客戶端操作
3.1 Flink 命令行
Flink 的命令行參數很多,輸入 flink - h 能看到完整的說明:
flink-1.7.2 bin/flink -h
如果想看某一個命令的參數,比如 Run 命令,輸入:
flink-1.7.2 bin/flink run -h
本文主要講解常見的一些操作,更詳細的文檔請參考: Flink 命令行官方文檔。
3.1.1 Standalone
首先啟動一個 Standalone 的集群:
flink-1.7.2 bin/start-cluster.sh Starting cluster. Starting standalonesession daemon on host zkb-MBP.local. Starting taskexecutor daemon on host zkb-MBP.local.
打開 http://127.0.0.1:8081 能看到 Web 界面。
Run
運行任務,以 Flink 自帶的例子 TopSpeedWindowing 為例:
flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de
運行起來后默認是 1 個並發:
點左側「Task Manager」,然后點「Stdout」能看到輸出日志:
或者查看本地 Log 目錄下的 *.out 文件:
List
查看任務列表:
flink-1.7.2 bin/flink list -m 127.0.0.1:8081 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING) -------------------------------------------------------------- No scheduled jobs.
Stop
停止任務。通過 -m 來指定要停止的 JobManager 的主機地址和端口。
flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb Stopping job d67420e52bd051fae2fddbaa79e046bb. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Could not stop the job d67420e52bd051fae2fddbaa79e046bb. at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985) at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392) at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552) ... 9 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
從日志里面能看出 Stop 命令執行失敗了。一個 Job 能夠被 Stop 要求所有的 Source 都是可以 Stoppable 的,即實現了 StoppableFunction 接口。
/** * 需要能 stoppable 的函數必須實現這個接口,例如流式任務的 source。 * stop() 方法在任務收到 STOP 信號的時候調用。 * source 在接收到這個信號后,必須停止發送新的數據且優雅的停止。 */ @PublicEvolving public interface StoppableFunction { /** * 停止 source。與 cancel() 不同的是,這是一個讓 source 優雅停止的請求。 * 等待中的數據可以繼續發送出去,不需要立即停止。 */ void stop(); }
Cancel
取消任務。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,會保存 Savepoint,否則不會保存 Savepoint。
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de Cancelling job 5e20cb6b0f357591171dfcca2eea09de. Cancelled job 5e20cb6b0f357591171dfcca2eea09de.
也可以在停止的時候顯示指定 Savepoint 目錄。
flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759 Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint. Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7. flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7 total 32K -rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata
取消和停止(流作業)的區別如下:
- cancel() 調用,立即調用作業算子的 cancel() 方法,以盡快取消它們。如果算子在接到 cancel() 調用后沒有停止,Flink 將開始定期中斷算子線程的執行,直到所有算子停止為止。
- stop() 調用,是更優雅的停止正在運行流作業的方式。stop() 僅適用於 Source 實現了 StoppableFunction 接口的作業。當用戶請求停止作業時,作業的所有 Source 都將接收 stop() 方法調用。直到所有 Source 正常關閉時,作業才會正常結束。這種方式,使作業正常處理完所有作業。
Savepoint
觸發 Savepoint。
flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb. Waiting for response... Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee You can resume your program from this savepoint with the run command.
說明:Savepoint 和 Checkpoint 的區別(詳見文檔):
- Checkpoint 是增量做的,每次的時間較短,數據量較小,只要在程序里面啟用后會自動觸發,用戶無須感知;Checkpoint 是作業 failover 的時候自動使用,不需要用戶指定。
- Savepoint 是全量做的,每次的時間較長,數據量較大,需要用戶主動去觸發。Savepoint 一般用於程序的版本更新(詳見文檔),Bug 修復,A/B Test 等場景,需要用戶指定。
通過 -s 參數從指定的 Savepoint 啟動:
flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path.
查看 JobManager 的日志,能夠看到類似這樣的 Log:
2019-03-28 10:30:53,957 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 () 2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2. 2019-03-28 10:30:53,959 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.
Modify
修改任務並行度。
為了方便演示,我們修改 conf/flink-conf.yaml 將 Task Slot 數從默認的 1 改為 4,並配置 Savepoint 目錄。(Modify 參數后面接 -s 指定 Savepoint 路徑當前版本可能有 Bug,提示無法識別)
taskmanager.numberOfTaskSlots: 4 state.savepoints.dir: file:///tmp/savepoint
修改參數后需要重啟集群生效,然后再啟動任務:
flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local. Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local. Starting cluster. Starting standalonesession daemon on host zkb-MBP.local. Starting taskexecutor daemon on host zkb-MBP.local. flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa
從頁面上能看到 Task Slot 變為了 4,這時候任務的默認並發度是 1。
通過 Modify 命令依次將並發度修改為 4 和 3,可以看到每次 Modify 命令都會觸發一次 Savepoint。
flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4. flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/ flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa Modify job 7752ea7b0e7303c780de9d86a5ded3fa. Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3. flink-1.7.2 ll /tmp/savepoint total 0 drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/
查看 JobManager 的日志,可以看到:
2019-06-17 09:05:11,179 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e () 2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3. 2019-06-17 09:05:11,182 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa. 2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - No master state to restore 2019-06-17 09:05:11,184 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING. org.apache.flink.util.FlinkException: Job is being rescaled.
Info
Info 命令是用來查看 Flink 任務的執行計划(StreamGraph)的。
flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar ----------------------- Execution Plan ----------------------- {"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]} --------------------------------------------------------------
拷貝輸出的 Json 內容,粘貼到這個網站:http://flink.apache.org/visualizer/
可以和實際運行的物理執行計划對比:
3.1.2 Yarn per-job
單任務 Attach 模式
默認是 Attach 模式,即客戶端會一直等待直到程序結束才會退出。
- 通過 -m yarn-cluster 指定 Yarn 模式
- Yarn 上顯示名字為 Flink session cluster,這個 Batch 的 Wordcount 任務運行完會 FINISHED。
- 客戶端能看到結果輸出
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $echo $HADOOP_CONF_DIR /etc/hadoop/conf/ [admin@z17.sqa.zth /home/admin/flink/flink-1.7.2] $./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 2019-06-17 09:15:24,511 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:15:24,690 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:15:24,907 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-17 09:15:25,430 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-17 09:15:25,438 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-06-17 09:15:36,239 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0724 2019-06-17 09:15:36,276 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0724 2019-06-17 09:15:36,276 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-17 09:15:36,281 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-17 09:15:40,426 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. (a,5) (action,1) (after,1) (against,1) (all,2) ... ... (would,2) (wrong,1) (you,1) Program execution finished Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished. Job Runtime: 9371 ms Accumulator Results: - 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]
如果我們以 Attach 模式運行 Streaming 的任務,客戶端會一直等待不退出,可以運行以下的例子試驗下:
./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
單任務 Detached 模式
- 由於是 Detached 模式,客戶端提交完任務就退出了
- Yarn 上顯示為 Flink per-job cluster
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar 2019-06-18 09:21:59,247 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-18 09:21:59,428 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-18 09:21:59,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4} 2019-06-18 09:22:00,427 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-18 09:22:00,436 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-18 09:22:12,113 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0729 2019-06-18 09:22:12,151 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0729 2019-06-18 09:22:12,151 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-18 09:22:12,155 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-18 09:22:16,275 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it: yarn application -kill application_1532332183347_0729 Please also note that the temporary files of the YARN session in the home directory will not be removed. Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df
3.1.3 Yarn session
啟動 Session
./bin/yarn-session.sh -tm 2048 -s 3
表示啟動一個 Yarn session 集群,每個 TM 的內存是 2 G,每個 TM 有 3 個 Slot。(注意:-n 參數不生效)
flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3 2019-06-17 09:21:50,177 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-06-17 09:21:50,179 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-06-17 09:21:50,180 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-06-17 09:21:50,644 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-06-17 09:21:50,746 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to baoniu (auth:SIMPLE) 2019-06-17 09:21:50,848 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:21:51,148 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3} 2019-06-17 09:21:51,588 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-06-17 09:21:51,596 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. ^@2019-06-17 09:22:03,304 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0726 2019-06-17 09:22:03,336 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1532332183347_0726 2019-06-17 09:22:03,336 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated 2019-06-17 09:22:03,340 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-06-17 09:22:07,722 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully. 2019-06-17 09:22:08,050 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://z07.sqa.net:37109
客戶端默認是 Attach 模式,不會退出:
- 可以 ctrl + c 退出,然后再通過 ./bin/yarn-session.sh -id application_1532332183347_0726 連上來;
- 或者啟動的時候用 -d 則為 detached 模式
Yarn 上顯示為 Flink session cluster;
- 在本機的臨時目錄(有些機器是 /tmp 目錄)下會生成一個文件:
flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu #Generated YARN properties file #Mon Jun 17 09:22:08 CST 2019 parallelism=3 dynamicPropertiesString= applicationID=application_1532332183347_0726
提交任務
./bin/flink run ./examples/batch/WordCount.jar
將會根據 /tmp/.yarn-properties-admin 文件內容提交到了剛啟動的 Session。
flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar 2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-17 09:26:42,767 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu. 2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 2019-06-17 09:26:43,058 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 3 YARN properties set default parallelism to 3 2019-06-17 09:26:43,097 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:26:43,229 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-06-17 09:26:43,327 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726' Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. ^@(a,5) (action,1) (after,1) (against,1) (all,2) (and,12) ... ... (wrong,1) (you,1) Program execution finished Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished. Job Runtime: 9152 ms Accumulator Results: - fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]
運行結束后 TM 的資源會釋放。
提交到指定的 Session
通過 -yid 參數來提交到指定的 Session。
$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar 2019-03-24 12:36:33,668 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-24 12:36:33,773 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-24 12:36:33,837 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708' Starting execution of program Executing TopSpeedWindowing example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd
注:Blink版本 的 Session 與 Flink 的 Session 的區別:
- Flink 的 session -n 參數不生效,而且不會提前啟動 TM;
- Blink 的 session 可以通過 -n 指定啟動多少個 TM,而且 TM 會提前起來;
3.2 Scala Shell
官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html
3.2.1 Deploy
Local
$bin/start-scala-shell.sh local Starting Flink Shell: Starting local Flink cluster (host: localhost, port: 8081). Connecting to Flink cluster (host: localhost, port: 8081). ... ... scala>
任務運行說明:
- Batch 任務內置了 benv 變量,通過 print() 將結果輸出到控制台;
- Streaming 任務內置了 senv 變量,通過 senv.execute("job name") 來提交任務,且 Datastream 的輸出只有在 Local 模式下打印到控制台;
Remote
先啟動一個 yarn session cluster:
$./bin/yarn-session.sh -tm 2048 -s 3 2019-03-25 09:52:16,341 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-03-25 09:52:16,342 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-03-25 09:52:16,343 INFO org.apache.flink.configuration.GlobalConfiguration … ... Flink JobManager is now running on z054.sqa.net:28665 with leader id 00000000-0000-0000-0000-000000000000. JobManager Web Interface: http://z054.sqa.net:28665
啟動 scala shell,連到 jm:
$bin/start-scala-shell.sh remote z054.sqa.net 28665 Starting Flink Shell: Connecting to Flink cluster (host: z054.sqa.net, port: 28665). ... ... scala>
Yarn
$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn Starting Flink Shell: 2019-03-25 09:47:44,695 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2019-03-25 09:47:44,697 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 4 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint 2019-03-25 09:47:44,698 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2019-03-25 09:47:44,717 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-admin. 2019-03-25 09:47:45,041 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050 2019-03-25 09:47:45,098 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2019-03-25 09:47:45,266 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-03-25 09:47:45,275 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored. 2019-03-25 09:47:45,357 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2} 2019-03-25 09:47:45,711 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 2019-03-25 09:47:45,718 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them. 2019-03-25 09:47:46,514 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1532332183347_0710 2019-03-25 09:47:46,534 INFO org.apache.hadoop.yarn