場景:flink1.14.0 通過restApi操作flink集群(standalone),能力有限,目前的項目使用Flink比較原始
官網的restApi文檔真的寫的一塌糊塗,傳參和返回結果示例,請求路徑(占位符用冒號表示),明顯不是搞web項目的人寫的
這里只記錄幾個重要的,以及常用的,flink webUI上可以直接進行各種操作,只要這上面有的,restApi都支持,就是界面比較技術化
官網地址:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/
單個job的操作
1、提交一個job
POST /jars/<jarName>/run
排坑:(1)jar只要在flink-conf.yaml中的web.upload.dir下即可,jarName就是這個jar包的全名,包括后綴
(2)POST請求的參數必須是json字符串,不能是form(也就是body),坑不坑爹,官網講的不清楚
(3)提供的入參包括:
allowNoneRestoredState:是否允許跳過無法還原的savepoint的狀態,繼續運行這個job
savepointPath: 保存點,提供了就表示是重啟job操作,不提供就是run一個job
entryClass: 主類的全名
parallelism: job的並行度
programArgs: 這個是main函數的入參
(4)即使啟動job失敗,返回仍然是一個json結果,失敗時包含errors屬性,成功則返回jobid如下
返回示例:
{"jobid":"b41ec6f1e7f7eb91f49db17e4483fa42"}
2、查詢這個job的detail
GET /jobs/<jobId>
返回示例
{ "jid":"0dca134dbbe841d617f3593dfe5f8c2e", "name":"insert-into_default_catalog.default_database.sink220325101420191tL84", "isStoppable":false, "state":"RUNNING", "start-time":1648179253170, "end-time":-1, "duration":277303407, "maxParallelism":-1, "now":1648456556577, "timestamps":{ "INITIALIZING":1648179253170, "CANCELED":0, "RECONCILING":0, "SUSPENDED":0, "FAILING":0, "RESTARTING":0, "RUNNING":1648179253198, "FINISHED":0, "FAILED":0, "CREATED":1648179253177, "CANCELLING":0 }, "vertices":[ { "id":"cbc357ccb763df2852fee8c4fc7d55f2", "name":"Source: KafkaSource-default_catalog.default_database.source220325101420192qXKx -> Calc(select=[operator, sourceHost, Reinterpret(TO_TIMESTAMP(timestamp, _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS ts], where=[(weight > 2)])", "maxParallelism":128, "parallelism":1, "status":"RUNNING", "start-time":1648179253253, "end-time":-1, "duration":277303324, "tasks":{ "SCHEDULED":0, "DEPLOYING":0, "FINISHED":0, "CANCELING":0, "CREATED":0, "RUNNING":1, "CANCELED":0, "FAILED":0, "RECONCILING":0, "INITIALIZING":0 }, "metrics":{ "read-bytes":42814532, "read-bytes-complete":true, "write-bytes":178520, "write-bytes-complete":true, "read-records":0, "read-records-complete":true, "write-records":1886, "write-records-complete":true } }, { "id":"9dd63673dd41ea021b896d5203f3ba7c", "name":"LocalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(time_col=[ts], size=[2 h])], select=[operator, sourceHost, COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])", "maxParallelism":128, "parallelism":1, "status":"RUNNING", "start-time":1648179253259, "end-time":-1, "duration":277303318, "tasks":{ "SCHEDULED":0, "DEPLOYING":0, "FINISHED":0, "CANCELING":0, "CREATED":0, "RUNNING":1, "CANCELED":0, "FAILED":0, "RECONCILING":0, "INITIALIZING":0 }, "metrics":{ "read-bytes":194183, "read-bytes-complete":true, "write-bytes":129496, "write-bytes-complete":true, "read-records":1886, "read-records-complete":true, "write-records":1021, "write-records-complete":true } }, { "id":"e883208d19e3c34f8aaf2a3168a63337", "name":"GlobalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(slice_end=[$slice_end], size=[2 h])], select=[operator, sourceHost, COUNT(count1$0) AS metric_count, start('w$) AS window_start, end('w$) AS window_end]) -> Calc(select=[UUID() AS id, _UTF-16LE'220325101420191HqBW' AS rule_key, _UTF-16LE'B3-測試高級規則' AS rule_name, 10 AS metric_threshold, 0 AS audit_status, 0 AS audit_comment_num, window_start, window_end, operator, sourceHost, metric_count], where=[(metric_count > 10)]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.sink220325101420191tL84], fields=[id, rule_key, rule_name, metric_threshold, audit_status, audit_comment_num, window_start, window_end, operator, sourceHost, metric_count])", "maxParallelism":128, "parallelism":1, "status":"RUNNING", "start-time":1648179253264, "end-time":-1, "duration":277303313, "tasks":{ "SCHEDULED":0, "DEPLOYING":0, "FINISHED":0, "CANCELING":0, "CREATED":0, "RUNNING":1, "CANCELED":0, "FAILED":0, "RECONCILING":0, "INITIALIZING":0 }, "metrics":{ "read-bytes":145094, "read-bytes-complete":true, "write-bytes":0, "write-bytes-complete":true, "read-records":1021, "read-records-complete":true, "write-records":0, "write-records-complete":true } } ], "status-counts":{ "SCHEDULED":0, "DEPLOYING":0, "FINISHED":0, "CANCELING":0, "CREATED":0, "RUNNING":3, "CANCELED":0, "FAILED":0, "RECONCILING":0, "INITIALIZING":0 }, "plan":{ "jid":"0dca134dbbe841d617f3593dfe5f8c2e", "name":"insert-into_default_catalog.default_database.sink220325101420191tL84", "type":"STREAMING", "nodes":[ { "id":"e883208d19e3c34f8aaf2a3168a63337", "parallelism":1, "operator":"", "operator_strategy":"", "description":"GlobalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(slice_end=[$slice_end], size=[2 h])], select=[operator, sourceHost, COUNT(count1$0) AS metric_count, start('w$) AS window_start, end('w$) AS window_end]) -> Calc(select=[UUID() AS id, _UTF-16LE'220325101420191HqBW' AS rule_key, _UTF-16LE'B3-測試高級規則' AS rule_name, 10 AS metric_threshold, 0 AS audit_status, 0 AS audit_comment_num, window_start, window_end, operator, sourceHost, metric_count], where=[(metric_count > 10)]) -> NotNullEnforcer(fields=[id]) -> Sink: Sink(table=[default_catalog.default_database.sink220325101420191tL84], fields=[id, rule_key, rule_name, metric_threshold, audit_status, audit_comment_num, window_start, window_end, operator, sourceHost, metric_count])", "inputs":[ { "num":0, "id":"9dd63673dd41ea021b896d5203f3ba7c", "ship_strategy":"HASH", "exchange":"pipelined_bounded" } ], "optimizer_properties":{ } }, { "id":"9dd63673dd41ea021b896d5203f3ba7c", "parallelism":1, "operator":"", "operator_strategy":"", "description":"LocalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(time_col=[ts], size=[2 h])], select=[operator, sourceHost, COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])", "inputs":[ { "num":0, "id":"cbc357ccb763df2852fee8c4fc7d55f2", "ship_strategy":"FORWARD", "exchange":"pipelined_bounded" } ], "optimizer_properties":{ } }, { "id":"cbc357ccb763df2852fee8c4fc7d55f2", "parallelism":1, "operator":"", "operator_strategy":"", "description":"Source: KafkaSource-default_catalog.default_database.source220325101420192qXKx -> Calc(select=[operator, sourceHost, Reinterpret(TO_TIMESTAMP(timestamp, _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS ts], where=[(weight > 2)])", "optimizer_properties":{ } } ] } }
3、取消或停止一個job
POST /jobs/<jobId>/stop
又是一個很坑的接口,這次傳參,又必須是form參數,也就是通過body傳參,只有這個接口可以指定savepoint的位置
入參:
drain:一個bool值,在獲取savepoint,停止pipeline之前發送MAX_WATERMARK,一般給true
targetDirectory:savepoint的位置,這里填寫一個絕對路徑的位置,如 /opt/flink-1.14.0/aaa,如果不指定,默認使用flink-conf里配置的state.savepoints.dir的值,我是使用默認路徑,讓flink自己生成
返回結果:
{"request-id":"a3554aaa2c59e95b3053c43fcb5570d5"}
這個需要配合另一個接口一起使用,因為stop接口可能不會立即結束,先返回一個request-id,后面用這個參數查詢stop的結果,會返回savepoints的位置,客戶端保存下來
4、查詢取消/停止job的結果
GET /jobs/<jobId>/savepoints/<requestId>
返回示例:
{ "status":{ "id":"COMPLETED" }, "operation":{ "location":"file:/opt/flink-1.14.0/flink-savepoints/savepoint-4c46ac-310609cbea53" } }
下次重啟job的時候需要帶上這個savepoints
5、重啟一個job
重啟一個job的接口和run的接口一樣,不同的是需要額外提供savepoints參數的值
6、查詢某個job的checkpoints的統計信息
GET /jobs/<jobId>/checkpoints
返回示例:(json格式)包含了這個job的checkpoints最后一次成功觸發后的_metadata位置:latest.completed.external_path
如果集群意外掛掉,並且有這個文件的話,可以使用這個文件來重啟job
{
"counts":{
"restored":0,
"total":25616,
"in_progress":0,
"completed":25616,
"failed":0
},
"summary":{
"state_size":{
"min":5911,"max":6375,"avg":6033,"p50":6027.0,"p90":6027.0,"p95":6027.0,"p99":6027.0,"p999":6027.0
},
"end_to_end_duration":{
"min":3,"max":548,"avg":6,"p50":5.0,"p90":6.0,"p95":9.0,"p99":12.0,"p999":44.999000000001615
},
"alignment_buffered":{
"min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
},
"processed_data":{
"min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
},
"persisted_data":{
"min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
}
},
"latest":{
"completed":{
"@class":"completed",
"id":25616,
"status":"COMPLETED",
"is_savepoint":false,
"trigger_timestamp":1648188076643,
"latest_ack_timestamp":1648188076648,
"state_size":6027,
"end_to_end_duration":5,
"alignment_buffered":0,
"processed_data":0,
"persisted_data":0,
"num_subtasks":3,
"num_acknowledged_subtasks":3,
"checkpoint_type":"CHECKPOINT",
"tasks":{},
"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25616",
"discarded":false
},
"savepoint":null,
"failed":null,
"restored":null
},
"history":[{"@class":"completed","id":25616,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648188076643,"latest_ack_timestamp":1648188076648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25616","discarded":false},{"@class":"completed","id":25615,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648188016643,"latest_ack_timestamp":1648188016649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25615","discarded":true},{"@class":"completed","id":25614,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187956643,"latest_ack_timestamp":1648187956649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25614","discarded":true},{"@class":"completed","id":25613,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187896643,"latest_ack_timestamp":1648187896656,"state_size":6027,"end_to_end_duration":13,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25613","discarded":true},{"@class":"completed","id":25612,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187836643,"latest_ack_timestamp":1648187836648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25612","discarded":true},{"@class":"completed","id":25611,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187776643,"latest_ack_timestamp":1648187776648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25611","discarded":true},{"@class":"completed","id":25610,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187716643,"latest_ack_timestamp":1648187716649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25610","discarded":true},{"@class":"completed","id":25609,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187656643,"latest_ack_timestamp":1648187656649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25609","discarded":true},{"@class":"completed","id":25608,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187596643,"latest_ack_timestamp":1648187596648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25608","discarded":true},{"@class":"completed","id":25607,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187536643,"latest_ack_timestamp":1648187536648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25607","discarded":true}]
}
7、查詢某個job的checkpoints的配置信息
GET /jobs/<jobId>/checkpoints/config
返回示例:(json)
{
"mode":"exactly_once",
"interval":60000,
"timeout":600000,
"min_pause":0,
"max_concurrent":1,
"externalization":{
"enabled":true,
"delete_on_cancellation":false
},
"state_backend":"HashMapStateBackend",
"checkpoint_storage":"FileSystemCheckpointStorage",
"unaligned_checkpoints":false,
"tolerable_failed_checkpoints":0,
"aligned_checkpoint_timeout":0
}
查所有job
8、查詢所有job的id和狀態
GET /jobs
返回示例:
{
"jobs":[ {"id":"0dca134dbbe841d617f3593dfe5f8c2e","status":"RUNNING"}, {"id":"f21e864681dcfb375bd2fbc556a82f51","status":"RUNNING"}, {"id":"14e8d5660be66388ed5de63bed2a9ab2","status":"RUNNING"} ] }
9、查詢所有job的概覽信息
在 /jobs的基礎上額外輸出了別的信息
GET /jobs/overview
返回示例:
{
"jobs":[
{"jid":"0dca134dbbe841d617f3593dfe5f8c2e","name":"insert-into_default_catalog.default_database.sink220325101420191tL84","state":"RUNNING","start-time":1648179253170,"end-time":-1,"duration":275491671,"last-modification":1648179253452,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}},
{"jid":"f21e864681dcfb375bd2fbc556a82f51","name":"insert-into_default_catalog.default_database.sink220324174733642qwvs","state":"RUNNING","start-time":1648115953079,"end-time":-1,"duration":338791762,"last-modification":1648115953234,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}},
{"jid":"14e8d5660be66388ed5de63bed2a9ab2","name":"insert-into_default_catalog.default_database.sink220316115755871zzvU","state":"RUNNING","start-time":1648115678791,"end-time":-1,"duration":339066050,"last-modification":1648115679087,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}}
]
}
有用的信息:job的id,name,state
"jid":"0dca134dbbe841d617f3593dfe5f8c2e", "name":"insert-into_default_catalog.default_database.sink220325101420191tL84", "state":"RUNNING"