動態任務:
參數:
dynamicTaskNameParam:來自任務輸入的參數的名稱,其值用於調度任務。 例如 如果參數的值為ABC,則調度的下一個任務類型為“ABC”。
Example
{
"name": "user_task", "taskReferenceName": "t1", "inputParameters": { "files": "${workflow.input.files}", "taskToExecute": "${workflow.input.user_supplied_task}" }, "type": "DYNAMIC", "dynamicTaskNameParam": "taskToExecute" }
如果以user_supplied_task
的值(假設值為user_task_2
)來啟動工作流,那么Conductor將在調度此動態任務時調度user_task_2。
決策任務:
決策任務與編程語言中的case ... switch語句相似。 該任務需要3個參數:
caseValueParam: 其值將用作開關的任務輸入中的參數名稱。
decisionCases: 映射其中key是caseValueParam的可能值,值是要執行的任務列表。
defaultCase:在決策案例中找不到匹配值時執行的任務列表(默認條件)
Example
{
"name": "decide_task", "taskReferenceName": "decide1", "inputParameters": { "case_value_param": "${workflow.input.movieType}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "Show": [ { "name": "setup_episodes", "taskReferenceName": "se1", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_episode_artwork", "taskReferenceName": "ga", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ], "Movie": [ { "name": "setup_movie", "taskReferenceName": "sm", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_movie_artwork", "taskReferenceName": "gma", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ] } }
Fork任務:
fork用於調度並行的任務。
參數:
forkTasks:任務列表的列表。 每個子列表被安排並行執行。 然而,子列表中的任務是以串行方式安排的。
Example
{
"forkTasks": [ [ { "name": "task11", "taskReferenceName": "t11" }, { "name": "task12", "taskReferenceName": "t12" } ], [ { "name": "task21", "taskReferenceName": "t21" }, { "name": "task22", "taskReferenceName": "t22" } ] ] }
當執行時,task11和task21被調度為同時執行。
動態fork任務:
動態fork與FORK_JOIN任務相同。 除了在運行時使用任務的輸入提供要分配的任務的列表。 當fork的任務數量不固定並且根據輸入而變化時很有用。
dynamicForkTasksParam:包含並行執行的工作流任務配置列表的參數名稱
dynamicForkTasksInputParamName: 參數的名字,此參數的值是一個映射,映射的key是fork任務的別名,value是fork任務的input
Example
{
"dynamicTasks": "${taskA.output.dynamicTasksJSON}", "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}", "type": "FORK_JOIN_DYNAMIC", "dynamicForkTasksParam": "dynamicTasks", "dynamicForkTasksInputParamName": "dynamicTasksInput" }
假設taskA的輸出為:
{
"dynamicTasksInputJSON": { "forkedTask1": { "width": 100, "height": 100, "params": { "recipe": "jpg" } }, "forkedTask2": { "width": 200, "height": 200, "params": { "recipe": "jpg" } } }, "dynamicTasksJSON": [ { "name": "encode_task", "taskReferenceName": "forkedTask1", "type": "SIMPLE" }, { "name": "encode_task", "taskReferenceName": "forkedTask2", "type": "SIMPLE" } ] }
執行時,動態fork任務將調度兩個並行任務,類型為“encode_task”,引用名稱為“forkedTask1”和“forkedTask2”,由_ dynamicTasksInputJSON_指定
join任務:
join任務用於等待fork任務產生的一個或多個任務的完成。
參數:
joinOn:任務參考名稱列表,其中JOIN將等待完成。
Example
{
"joinOn": ["taskRef1", "taskRef3"] }
Join Task Output
fork任務的輸出將是一個JSON對象,其中key是任務別名,值是fork任務的輸出。
子工作流任務:
Sub Workflow任務允許在另一個工作流中嵌套工作流。
參數:
subWorkflowParam:任務參考名稱列表,其中JOIN將等待完成。
Example
{
"name": "sub_workflow_task", "taskReferenceName": "sub1", "inputParameters": { "requestId": "${workflow.input.requestId}", "file": "${encode.output.location}" }, "type": "SUB_WORKFLOW", "subWorkflowParam": { "name": "deployment_workflow", "version": 1 } }
執行時,將使用兩個輸入參數requestId和file執行deployment_workflow。 完成生成的工作流程后,任務將被標記為已完成。 如果子工作流程終止或失敗,則將任務標記為失敗,並重新配置。
等待任務:
等待任務被實現為保持IN_PROGRESS狀態的門,除非由外部觸發器標記為COMPLETED或FAILED。 要使用等待任務,請將任務類型設置為WAIT
參數:無
外部觸發等待任務:
任務資源端點可用於將任務的狀態更新為終止狀態。
Contrib模塊提供SQS集成,其中外部系統可以將消息放置在服務器偵聽的預配置隊列中。 消息到達時,它們被標記為COMPLETED或FAILED。
SQS隊列
可以使用以下API檢索服務器用於更新任務狀態的SQS隊列:
GET /queue
當更新任務的狀態時,消息需要符合以下規范:
消息必須是有效的JSON字符串。
消息JSON應包含一個名為externalId的密鑰,其值為包含以下密鑰的JSONized字符串:
workflowId:工作流的ID
taskRefName:應該更新的任務引用名稱。
每個隊列代表一個特定的任務狀態,相應地標記任務。 例如 消息進入COMPLETED隊列將任務狀態標記為COMPLETED。
使用消息更新任務的輸出。
示例SQS有效負載:
{ "some_key": "valuex", "externalId": "{\"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}" }
HTTP任務:
HTTP任務用於通過HTTP調用另一個微服務器。
該任務期望一個名為http_request的輸入參數作為任務輸入的一部分,具有以下詳細信息:
uri:服務的URI。 使用vipAddress時可以是部分或包含服務器地址。
method: HTTP method. One of the GET, PUT, POST, DELETE, OPTIONS, HEAD
accept: Accept header as required by server.
contentType:Content Type - 支持的類型是 text/plain, text/html and, application/json
headers:要與請求一起發送的其他http標頭的map。
body:Request body
vipAddress:使用基於發現的服務URL時。
HTTP 任務輸出:
response: 包含response的JSON主體(如果存在)
headers: Response Headers
statusCode: Integer status code
Example
Task Input payload using vipAddress
{
"http_request": { "vipAddress": "examplevip-prod", "uri": "/", "method": "GET", "accept": "text/plain" } }
Task Input using an absolute URL
{
"http_request": { "uri": "http://example.com/", "method": "GET", "accept": "text/plain" } }
如果請求無法完成或遠程服務器返回不成功的狀態代碼,任務將被標記為FAILED。
Event任務:
事件任務提供將事件(消息)發布到conductor或外部事件系統(如SQS)的能力。 事件任務對於為工作流和任務創建基於事件的依賴非常有用。
參數:
sink:生產的事件的合格名稱。 例如 conductor或sqs:sqs_queue_name
Example
{
"sink": 'sqs:example_sqs_queue_name' }
當使用conductor作為接收器生成事件時,事件名稱遵循以下結構:conductor:<workflow_name>:<task_reference_name>
對於SQS,使用隊列的名稱,而不是URI。 conductor根據名稱查找URI。
Supported Sinks
- Conductor
- SQS
事件任務輸入:
給予事件任務的輸入作為有效載荷提供給發布的消息。 例如 如果將消息放入SQS隊列(sink is sqs)中,則消息有效內容將作為任務的輸入。
時間任務輸出:
event_produced生成的事件的名稱。