conductor 系統任務


動態任務

參數:

dynamicTaskNameParam:來自任務輸入的參數的名稱,其值用於調度任務。 例如 如果參數的值為ABC,則調度的下一個任務類型為“ABC”。

Example

{
  "name": "user_task", "taskReferenceName": "t1", "inputParameters": { "files": "${workflow.input.files}", "taskToExecute": "${workflow.input.user_supplied_task}" }, "type": "DYNAMIC", "dynamicTaskNameParam": "taskToExecute" } 
如果以user_supplied_task的值(假設值為user_task_2)來啟動工作流,那么Conductor將在調度此動態任務時調度user_task_2。

決策任務:

決策任務與編程語言中的case ... switch語句相似。 該任務需要3個參數:
caseValueParam: 其值將用作開關的任務輸入中的參數名稱。
decisionCases: 映射其中key是caseValueParam的可能值,值是要執行的任務列表。
defaultCase:在決策案例中找不到匹配值時執行的任務列表(默認條件)

Example

{
  "name": "decide_task", "taskReferenceName": "decide1", "inputParameters": { "case_value_param": "${workflow.input.movieType}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "Show": [ { "name": "setup_episodes", "taskReferenceName": "se1", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_episode_artwork", "taskReferenceName": "ga", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ], "Movie": [ { "name": "setup_movie", "taskReferenceName": "sm", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_movie_artwork", "taskReferenceName": "gma", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ] } }

Fork任務:

fork用於調度並行的任務。
參數:
forkTasks:任務列表的列表。 每個子列表被安排並行執行。 然而,子列表中的任務是以串行方式安排的。

Example

{
  "forkTasks": [ [ { "name": "task11", "taskReferenceName": "t11" }, { "name": "task12", "taskReferenceName": "t12" } ], [ { "name": "task21", "taskReferenceName": "t21" }, { "name": "task22", "taskReferenceName": "t22" } ] ] } 
當執行時,task11和task21被調度為同時執行。


動態fork任務
動態fork與FORK_JOIN任務相同。 除了在運行時使用任務的輸入提供要分配的任務的列表。 當fork的任務數量不固定並且根據輸入而變化時很有用。
dynamicForkTasksParam:包含並行執行的工作流任務配置列表的參數名稱
dynamicForkTasksInputParamName: 參數的名字,此參數的值是一個映射,映射的key是fork任務的別名,value是fork任務的input

Example

{
  "dynamicTasks": "${taskA.output.dynamicTasksJSON}", "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}", "type": "FORK_JOIN_DYNAMIC", "dynamicForkTasksParam": "dynamicTasks", "dynamicForkTasksInputParamName": "dynamicTasksInput" } 

假設taskA的輸出為:
{
  "dynamicTasksInputJSON": { "forkedTask1": { "width": 100, "height": 100, "params": { "recipe": "jpg" } }, "forkedTask2": { "width": 200, "height": 200, "params": { "recipe": "jpg" } } }, "dynamicTasksJSON": [ { "name": "encode_task", "taskReferenceName": "forkedTask1", "type": "SIMPLE" }, { "name": "encode_task", "taskReferenceName": "forkedTask2", "type": "SIMPLE" } ] }
執行時,動態fork任務將調度兩個並行任務,類型為“encode_task”,引用名稱為“forkedTask1”和“forkedTask2”,由_ dynamicTasksInputJSON_指定

join任務:

join任務用於等待fork任務產生的一個或多個任務的完成。
參數:
joinOn:任務參考名稱列表,其中JOIN將等待完成。

Example

{
    "joinOn": ["taskRef1", "taskRef3"] } 

Join Task Output

fork任務的輸出將是一個JSON對象,其中key是任務別名,值是fork任務的輸出。

 

子工作流任務:

Sub Workflow任務允許在另一個工作流中嵌套工作流。

參數:

subWorkflowParam:任務參考名稱列表,其中JOIN將等待完成。

Example

{
  "name": "sub_workflow_task", "taskReferenceName": "sub1", "inputParameters": { "requestId": "${workflow.input.requestId}", "file": "${encode.output.location}" }, "type": "SUB_WORKFLOW", "subWorkflowParam": { "name": "deployment_workflow", "version": 1 } } 
執行時,將使用兩個輸入參數requestId和file執行deployment_workflow。 完成生成的工作流程后,任務將被標記為已完成。 如果子工作流程終止或失敗,則將任務標記為失敗,並重新配置。


等待任務:

等待任務被實現為保持IN_PROGRESS狀態的門,除非由外部觸發器標記為COMPLETED或FAILED。 要使用等待任務,請將任務類型設置為WAIT

參數:無
外部觸發等待任務
任務資源端點可用於將任務的狀態更新為終止狀態。
Contrib模塊提供SQS集成,其中外部系統可以將消息放置在服務器偵聽的預配置隊列中。 消息到達時,它們被標記為COMPLETED或FAILED。

SQS隊列
可以使用以下API檢索服務器用於更新任務狀態的SQS隊列:
GET /queue

當更新任務的狀態時,消息需要符合以下規范:

     消息必須是有效的JSON字符串。
     消息JSON應包含一個名為externalId的密鑰,其值為包含以下密鑰的JSONized字符串:
         workflowId:工作流的ID
         taskRefName:應該更新的任務引用名稱。
     每個隊列代表一個特定的任務狀態,相應地標記任務。 例如 消息進入COMPLETED隊列將任務狀態標記為COMPLETED。
     使用消息更新任務的輸出。


示例SQS有效負載:

{ "some_key": "valuex", "externalId": "{\"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}" }


HTTP任務:

HTTP任務用於通過HTTP調用另一個微服務器。
該任務期望一個名為http_request的輸入參數作為任務輸入的一部分,具有以下詳細信息:
uri:服務的URI。 使用vipAddress時可以是部分或包含服務器地址。
method: HTTP method. One of the GET, PUT, POST, DELETE, OPTIONS, HEAD
accept: Accept header as required by server.
contentType:Content Type - 支持的類型是 text/plain, text/html and, application/json
headers:要與請求一起發送的其他http標頭的map。
body:Request body
vipAddress:使用基於發現的服務URL時。

HTTP 任務輸出:
response: 包含response的JSON主體(如果存在)
headers: Response Headers
statusCode: Integer status code

Example

Task Input payload using vipAddress

{
  "http_request": { "vipAddress": "examplevip-prod", "uri": "/", "method": "GET", "accept": "text/plain" } } 

Task Input using an absolute URL

{
  "http_request": { "uri": "http://example.com/", "method": "GET", "accept": "text/plain" } } 
如果請求無法完成或遠程服務器返回不成功的狀態代碼,任務將被標記為FAILED。


Event任務:

事件任務提供將事件(消息)發布到conductor或外部事件系統(如SQS)的能力。 事件任務對於為工作流和任務創建基於事件的依賴非常有用。
參數:
sink:生產的事件的合格名稱。 例如 conductor或sqs:sqs_queue_name

Example

{
    "sink": 'sqs:example_sqs_queue_name' } 
當使用conductor作為接收器生成事件時,事件名稱遵循以下結構:conductor:<workflow_name>:<task_reference_name>
對於SQS,使用隊列的名稱,而不是URI。 conductor根據名稱查找URI。

Supported Sinks

  • Conductor
  • SQS

事件任務輸入:

給予事件任務的輸入作為有效載荷提供給發布的消息。 例如 如果將消息放入SQS隊列(sink is sqs)中,則消息有效內容將作為任務的輸入。

時間任務輸出:

event_produced生成的事件的名稱。


 

 

 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM