flink-restApi項目應用


場景:flink1.14.0   通過restApi操作flink集群(standalone),能力有限,目前的項目使用Flink比較原始

官網的restApi文檔真的寫的一塌糊塗,傳參和返回結果示例,請求路徑(占位符用冒號表示),明顯不是搞web項目的人寫的

這里只記錄幾個重要的,以及常用的,flink webUI上可以直接進行各種操作,只要這上面有的,restApi都支持,就是界面比較技術化

官網地址:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/

單個job的操作

1、提交一個job

POST    /jars/<jarName>/run

排坑:(1)jar只要在flink-conf.yaml中的web.upload.dir下即可,jarName就是這個jar包的全名,包括后綴

(2)POST請求的參數必須是json字符串,不能是form(也就是body),坑不坑爹,官網講的不清楚

(3)提供的入參包括:

allowNoneRestoredState:是否允許跳過無法還原的savepoint的狀態,繼續運行這個job

savepointPath:                 保存點,提供了就表示是重啟job操作,不提供就是run一個job

entryClass:                       主類的全名

parallelism:                       job的並行度

programArgs:                   這個是main函數的入參

(4)即使啟動job失敗,返回仍然是一個json結果,失敗時包含errors屬性,成功則返回jobid如下

返回示例:

{"jobid":"b41ec6f1e7f7eb91f49db17e4483fa42"}

2、查詢這個job的detail

GET /jobs/<jobId>

返回示例

{
    "jid":"0dca134dbbe841d617f3593dfe5f8c2e",
    "name":"insert-into_default_catalog.default_database.sink220325101420191tL84",
    "isStoppable":false,
    "state":"RUNNING",
    "start-time":1648179253170,
    "end-time":-1,
    "duration":277303407,
    "maxParallelism":-1,
    "now":1648456556577,
    "timestamps":{
        "INITIALIZING":1648179253170,
        "CANCELED":0,
        "RECONCILING":0,
        "SUSPENDED":0,
        "FAILING":0,
        "RESTARTING":0,
        "RUNNING":1648179253198,
        "FINISHED":0,
        "FAILED":0,
        "CREATED":1648179253177,
        "CANCELLING":0
    },
    "vertices":[
        {
            "id":"cbc357ccb763df2852fee8c4fc7d55f2",
            "name":"Source: KafkaSource-default_catalog.default_database.source220325101420192qXKx -&gt; Calc(select=[operator, sourceHost, Reinterpret(TO_TIMESTAMP(timestamp, _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS ts], where=[(weight &gt; 2)])",
            "maxParallelism":128,
            "parallelism":1,
            "status":"RUNNING",
            "start-time":1648179253253,
            "end-time":-1,
            "duration":277303324,
            "tasks":{
                "SCHEDULED":0,
                "DEPLOYING":0,
                "FINISHED":0,
                "CANCELING":0,
                "CREATED":0,
                "RUNNING":1,
                "CANCELED":0,
                "FAILED":0,
                "RECONCILING":0,
                "INITIALIZING":0
            },
            "metrics":{
                "read-bytes":42814532,
                "read-bytes-complete":true,
                "write-bytes":178520,
                "write-bytes-complete":true,
                "read-records":0,
                "read-records-complete":true,
                "write-records":1886,
                "write-records-complete":true
            }
        },
        {
            "id":"9dd63673dd41ea021b896d5203f3ba7c",
            "name":"LocalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(time_col=[ts], size=[2 h])], select=[operator, sourceHost, COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])",
            "maxParallelism":128,
            "parallelism":1,
            "status":"RUNNING",
            "start-time":1648179253259,
            "end-time":-1,
            "duration":277303318,
            "tasks":{
                "SCHEDULED":0,
                "DEPLOYING":0,
                "FINISHED":0,
                "CANCELING":0,
                "CREATED":0,
                "RUNNING":1,
                "CANCELED":0,
                "FAILED":0,
                "RECONCILING":0,
                "INITIALIZING":0
            },
            "metrics":{
                "read-bytes":194183,
                "read-bytes-complete":true,
                "write-bytes":129496,
                "write-bytes-complete":true,
                "read-records":1886,
                "read-records-complete":true,
                "write-records":1021,
                "write-records-complete":true
            }
        },
        {
            "id":"e883208d19e3c34f8aaf2a3168a63337",
            "name":"GlobalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(slice_end=[$slice_end], size=[2 h])], select=[operator, sourceHost, COUNT(count1$0) AS metric_count, start('w$) AS window_start, end('w$) AS window_end]) -&gt; Calc(select=[UUID() AS id, _UTF-16LE'220325101420191HqBW' AS rule_key, _UTF-16LE'B3-測試高級規則' AS rule_name, 10 AS metric_threshold, 0 AS audit_status, 0 AS audit_comment_num, window_start, window_end, operator, sourceHost, metric_count], where=[(metric_count &gt; 10)]) -&gt; NotNullEnforcer(fields=[id]) -&gt; Sink: Sink(table=[default_catalog.default_database.sink220325101420191tL84], fields=[id, rule_key, rule_name, metric_threshold, audit_status, audit_comment_num, window_start, window_end, operator, sourceHost, metric_count])",
            "maxParallelism":128,
            "parallelism":1,
            "status":"RUNNING",
            "start-time":1648179253264,
            "end-time":-1,
            "duration":277303313,
            "tasks":{
                "SCHEDULED":0,
                "DEPLOYING":0,
                "FINISHED":0,
                "CANCELING":0,
                "CREATED":0,
                "RUNNING":1,
                "CANCELED":0,
                "FAILED":0,
                "RECONCILING":0,
                "INITIALIZING":0
            },
            "metrics":{
                "read-bytes":145094,
                "read-bytes-complete":true,
                "write-bytes":0,
                "write-bytes-complete":true,
                "read-records":1021,
                "read-records-complete":true,
                "write-records":0,
                "write-records-complete":true
            }
        }
    ],
    "status-counts":{
        "SCHEDULED":0,
        "DEPLOYING":0,
        "FINISHED":0,
        "CANCELING":0,
        "CREATED":0,
        "RUNNING":3,
        "CANCELED":0,
        "FAILED":0,
        "RECONCILING":0,
        "INITIALIZING":0
    },
    "plan":{
        "jid":"0dca134dbbe841d617f3593dfe5f8c2e",
        "name":"insert-into_default_catalog.default_database.sink220325101420191tL84",
        "type":"STREAMING",
        "nodes":[
            {
                "id":"e883208d19e3c34f8aaf2a3168a63337",
                "parallelism":1,
                "operator":"",
                "operator_strategy":"",
                "description":"GlobalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(slice_end=[$slice_end], size=[2 h])], select=[operator, sourceHost, COUNT(count1$0) AS metric_count, start('w$) AS window_start, end('w$) AS window_end]) -&gt; Calc(select=[UUID() AS id, _UTF-16LE'220325101420191HqBW' AS rule_key, _UTF-16LE'B3-測試高級規則' AS rule_name, 10 AS metric_threshold, 0 AS audit_status, 0 AS audit_comment_num, window_start, window_end, operator, sourceHost, metric_count], where=[(metric_count &gt; 10)]) -&gt; NotNullEnforcer(fields=[id]) -&gt; Sink: Sink(table=[default_catalog.default_database.sink220325101420191tL84], fields=[id, rule_key, rule_name, metric_threshold, audit_status, audit_comment_num, window_start, window_end, operator, sourceHost, metric_count])",
                "inputs":[
                    {
                        "num":0,
                        "id":"9dd63673dd41ea021b896d5203f3ba7c",
                        "ship_strategy":"HASH",
                        "exchange":"pipelined_bounded"
                    }
                ],
                "optimizer_properties":{

                }
            },
            {
                "id":"9dd63673dd41ea021b896d5203f3ba7c",
                "parallelism":1,
                "operator":"",
                "operator_strategy":"",
                "description":"LocalWindowAggregate(groupBy=[operator, sourceHost], window=[TUMBLE(time_col=[ts], size=[2 h])], select=[operator, sourceHost, COUNT(*) AS count1$0, slice_end('w$) AS $slice_end])",
                "inputs":[
                    {
                        "num":0,
                        "id":"cbc357ccb763df2852fee8c4fc7d55f2",
                        "ship_strategy":"FORWARD",
                        "exchange":"pipelined_bounded"
                    }
                ],
                "optimizer_properties":{

                }
            },
            {
                "id":"cbc357ccb763df2852fee8c4fc7d55f2",
                "parallelism":1,
                "operator":"",
                "operator_strategy":"",
                "description":"Source: KafkaSource-default_catalog.default_database.source220325101420192qXKx -&gt; Calc(select=[operator, sourceHost, Reinterpret(TO_TIMESTAMP(timestamp, _UTF-16LE'yyyy-MM-dd HH:mm:ss')) AS ts], where=[(weight &gt; 2)])",
                "optimizer_properties":{

                }
            }
        ]
    }
} 

3、取消或停止一個job

POST /jobs/<jobId>/stop

又是一個很坑的接口,這次傳參,又必須是form參數,也就是通過body傳參,只有這個接口可以指定savepoint的位置

入參:

drain:一個bool值,在獲取savepoint,停止pipeline之前發送MAX_WATERMARK,一般給true

targetDirectory:savepoint的位置,這里填寫一個絕對路徑的位置,如   /opt/flink-1.14.0/aaa,如果不指定,默認使用flink-conf里配置的state.savepoints.dir的值,我是使用默認路徑,讓flink自己生成

返回結果:

{"request-id":"a3554aaa2c59e95b3053c43fcb5570d5"}

這個需要配合另一個接口一起使用,因為stop接口可能不會立即結束,先返回一個request-id,后面用這個參數查詢stop的結果,會返回savepoints的位置,客戶端保存下來

4、查詢取消/停止job的結果

GET   /jobs/<jobId>/savepoints/<requestId>

返回示例:

{
    "status":{
        "id":"COMPLETED"
    },
    "operation":{
        "location":"file:/opt/flink-1.14.0/flink-savepoints/savepoint-4c46ac-310609cbea53"
    }
}

下次重啟job的時候需要帶上這個savepoints

5、重啟一個job

重啟一個job的接口和run的接口一樣,不同的是需要額外提供savepoints參數的值

6、查詢某個job的checkpoints的統計信息

GET         /jobs/<jobId>/checkpoints

返回示例:(json格式)包含了這個job的checkpoints最后一次成功觸發后的_metadata位置:latest.completed.external_path

如果集群意外掛掉,並且有這個文件的話,可以使用這個文件來重啟job

{
  "counts":{
    "restored":0,
    "total":25616,
    "in_progress":0,
    "completed":25616,
    "failed":0
  },
  "summary":{
    "state_size":{
      "min":5911,"max":6375,"avg":6033,"p50":6027.0,"p90":6027.0,"p95":6027.0,"p99":6027.0,"p999":6027.0
    },
    "end_to_end_duration":{
      "min":3,"max":548,"avg":6,"p50":5.0,"p90":6.0,"p95":9.0,"p99":12.0,"p999":44.999000000001615
    },
    "alignment_buffered":{
      "min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
    },
    "processed_data":{
      "min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
    },
    "persisted_data":{
      "min":0,"max":0,"avg":0,"p50":0.0,"p90":0.0,"p95":0.0,"p99":0.0,"p999":0.0
    }
  },
  "latest":{
    "completed":{
      "@class":"completed",
      "id":25616,
      "status":"COMPLETED",
      "is_savepoint":false,
      "trigger_timestamp":1648188076643,
      "latest_ack_timestamp":1648188076648,
      "state_size":6027,
      "end_to_end_duration":5,
      "alignment_buffered":0,
      "processed_data":0,
      "persisted_data":0,
      "num_subtasks":3,
      "num_acknowledged_subtasks":3,
      "checkpoint_type":"CHECKPOINT",
      "tasks":{},
      "external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25616",
      "discarded":false
    },
    "savepoint":null,
    "failed":null,
    "restored":null
  },
  "history":[{"@class":"completed","id":25616,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648188076643,"latest_ack_timestamp":1648188076648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25616","discarded":false},{"@class":"completed","id":25615,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648188016643,"latest_ack_timestamp":1648188016649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25615","discarded":true},{"@class":"completed","id":25614,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187956643,"latest_ack_timestamp":1648187956649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25614","discarded":true},{"@class":"completed","id":25613,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187896643,"latest_ack_timestamp":1648187896656,"state_size":6027,"end_to_end_duration":13,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25613","discarded":true},{"@class":"completed","id":25612,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187836643,"latest_ack_timestamp":1648187836648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25612","discarded":true},{"@class":"completed","id":25611,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187776643,"latest_ack_timestamp":1648187776648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25611","discarded":true},{"@class":"completed","id":25610,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187716643,"latest_ack_timestamp":1648187716649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25610","discarded":true},{"@class":"completed","id":25609,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187656643,"latest_ack_timestamp":1648187656649,"state_size":6027,"end_to_end_duration":6,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25609","discarded":true},{"@class":"completed","id":25608,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187596643,"latest_ack_timestamp":1648187596648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25608","discarded":true},{"@class":"completed","id":25607,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1648187536643,"latest_ack_timestamp":1648187536648,"state_size":6027,"end_to_end_duration":5,"alignment_buffered":0,"processed_data":0,"persisted_data":0,"num_subtasks":3,"num_acknowledged_subtasks":3,"checkpoint_type":"CHECKPOINT","tasks":{},"external_path":"file:/opt/hikvision/idatafusion/flink-1.14.0/local-checkpoints/3560f8db528b3fb640e2b5a7db89af2d/chk-25607","discarded":true}]
}

7、查詢某個job的checkpoints的配置信息

GET         /jobs/<jobId>/checkpoints/config

返回示例:(json)

{
  "mode":"exactly_once",
  "interval":60000,
  "timeout":600000,
  "min_pause":0,
  "max_concurrent":1,
  "externalization":{
  "enabled":true,
  "delete_on_cancellation":false
  },
  "state_backend":"HashMapStateBackend",
  "checkpoint_storage":"FileSystemCheckpointStorage",
  "unaligned_checkpoints":false,
  "tolerable_failed_checkpoints":0,
  "aligned_checkpoint_timeout":0
}

查所有job

8、查詢所有job的id和狀態

GET /jobs

返回示例:

{
   "jobs":[ {"id":"0dca134dbbe841d617f3593dfe5f8c2e","status":"RUNNING"}, {"id":"f21e864681dcfb375bd2fbc556a82f51","status":"RUNNING"}, {"id":"14e8d5660be66388ed5de63bed2a9ab2","status":"RUNNING"} ] }

9、查詢所有job的概覽信息

在 /jobs的基礎上額外輸出了別的信息

GET /jobs/overview

返回示例:

{
"jobs":[
{"jid":"0dca134dbbe841d617f3593dfe5f8c2e","name":"insert-into_default_catalog.default_database.sink220325101420191tL84","state":"RUNNING","start-time":1648179253170,"end-time":-1,"duration":275491671,"last-modification":1648179253452,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}},
{"jid":"f21e864681dcfb375bd2fbc556a82f51","name":"insert-into_default_catalog.default_database.sink220324174733642qwvs","state":"RUNNING","start-time":1648115953079,"end-time":-1,"duration":338791762,"last-modification":1648115953234,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}},
{"jid":"14e8d5660be66388ed5de63bed2a9ab2","name":"insert-into_default_catalog.default_database.sink220316115755871zzvU","state":"RUNNING","start-time":1648115678791,"end-time":-1,"duration":339066050,"last-modification":1648115679087,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":3,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0,"initializing":0}}
]
}

有用的信息:job的id,name,state

 "jid":"0dca134dbbe841d617f3593dfe5f8c2e",
 "name":"insert-into_default_catalog.default_database.sink220325101420191tL84",
 "state":"RUNNING"

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM