簡介:本文主要介紹netflix conductor的基本概念和主要運行機制。
作者 | 夜陽
來源 | 阿里技術公眾號
本文主要介紹netflix conductor的基本概念和主要運行機制。
一 簡介
netflix conductor是基於JAVA語言編寫的開源流程引擎,用於架構基於微服務的流程。它具備如下特性:
- 允許創建復雜的業務流程,流程中每個獨立的任務都是由一個微服務所實現。
- 基於JSON DSL 創建工作流,對任務的執行進行編排。
- 工作流在執行的過程中可見、可追溯。
- 提供暫停、恢復、重啟等多種控制模型。
- 提供一種簡單的方式來最大限度重用微服務。
- 擁有擴展到百萬流程並發運行的服務能力。
- 通過隊列服務實現客戶端與服務端的分離。
- 支持 HTTP 或其他RPC協議進行數據傳送
二 基本概念
1 Task
Task是最小執行單元,承載了一段執行邏輯,如發送HTTP請求等。
- System Task:被conductor服務執行,這些任務的執行與引擎在同一個JVM中。
- Worker Task:被worker服務執行,執行與引擎隔離開,worker通過隊列獲取任務后,執行並更新結果狀態到引擎。Worker的實現是跨語言的,其使用Http協議與Server通信。
conductor提供了若干內置SystemTask:
-
功能性Task:
- HTTP:發送http請求
- JSON_JQ_TRANSFORM:jq命令執行,一般用戶json的轉換,具體可見jq官方文檔
- KAFKA_PUBLISH: 發布kafka消息
-
流程控制Task:
- SWITCH(原Decision):條件判斷分支,類似於代碼中的switch case
- FORK:啟動並行分支,用於調度並行任務
- JOIN:匯總並行分支,用於匯總並行任務
- DO_WHILE:循環,類似於代碼中的do while
- WAIT:一直在運行中,直到外部時間觸發更新節點狀態,可用於等待外部操作
- SUB_WORKFLOW:子流程,執行其他的流程
- TERMINATE:結束流程,以指定輸出提前結束流程,可以與SWITCH節點配合使用,類似代碼中的提前return語句
-
自定義Task:
- 對於System Task,Conductor提供了WorkflowSystemTask 抽象類,可以自定義擴展實現。
- 對於Worker Task,可以實現conductor的client Worker接口實現執行邏輯。
2 Workflow
- Workflow由一系列需要執行的Task組成,conductor采用json來描述Task的流轉關系。
- 除基本的順序流程外,借助內置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務,還能實現分支、並行、循環、提前結束等流程控制。
3 Input&Output
Task的輸入是一種映射,其作為工作流實例化的一部分或某些其他Task的輸出。允許將來自工作流或其他Task的輸入/輸出作為隨后執行的Task的輸入。
- Task有自己的輸入和輸出,輸入輸出都是jsonobject類型。
- Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用語法為json-path,除最基礎的${taskxxx.output}的值解析方式外,還支持其他復雜操作,如過濾等,具體見json-path語法。
- 啟動Workflow時可以傳入流程的輸入數據,Task可以通過${workflow.input}的方式引用。
Task實現原子操作的處理以及流程控制操作,Workflow定義描述Task的流轉關系,Task引用Workflow或者其它Task的輸入輸出。通過這些機制,conductor實現了JSON DSL對流程的描述。
三 整體架構
- Orchestrator: 負責流程的流轉調度工作;
- Management/Execution Service: 提供流程、任務的管理更新等操作;
- TaskQueues: 任務隊列,Orchestrator解析出來的待執行Task會放到隊列中;
- Worker: 任務執行worker,從TaskQueues中獲取任務,通過Execution Service更新任務狀態與結果數據;
- Database: 元數據&運行時數據庫,用於保存運行時的Workflow、Task等狀態信息,以及流程任務定義的等原信息;
- Index: 索引數據庫,用於存儲執行歷史;
四 運行模型
1 Task狀態轉移
- SCHEDULED:待調度,task放到隊列中還沒有被poll出來執行時的狀態
- IN_PROGRESS:執行中,被poll出來執行但還沒有完成時的狀態
- COMPLETED:執行完成
- FAILED:執行失敗
-
CANCELLED:被中止時為此狀態,一般出現在兩種情況:
- 手動中止流程時,正在運行中的task會被置為此狀態;
- 多個fork分支,當某個分支的task失敗時,其它分支中正在運行的task會被置為此狀態;
任務的執行(同步的系統任務除外)都會先添加到任務隊列中,是典型的生產者消費者模式。
- 任務隊列,是一個帶有延遲、優先級功能的隊列;
- 每種類型的Task是一個單獨的隊列,此外,如果配置了domain、isolationGroup,還會拆分成多個隊列實現執行隔離;
- decider service是生產者,其根據流程配置與當前執行情況,解析出可執行的task后,添加到隊列;
- 任務執行器(SystemTaskWorker、Worker)是消費者,其長輪詢對應的隊列,從隊列中獲取任務執行;
隊列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的實現。
3 核心功能實現機制
conductor調度的核心是decider service,其根據當前流程運行的狀態,解析出將要執行的任務列表,將任務入隊交給worker執行。
decide主要流程簡化如下,詳細代碼見WorkflowExecutor.java的decide方法:
最主要的觸發時機:
- 新啟動執行時,會觸發decide操作
- 系統任務執行完成時,會觸發decide操作
- Workder任務通過ExecutionService更新任務狀態時,會觸發decide操作
流程控制節點的實現機制
1)Task & TaskMapper
對於每一個Task來說,都有Task和TaskMapper兩部分:
- Task:任務的執行邏輯代碼,它的作用是Task的執行
- TaskMapper:任務的映射邏輯代碼,它通過Task的定義配置、當前實例的執行狀態等信息,返回實際需要執行的Task列表
對於一般的任務來說,TaskMapper返回的是就是Task本身,補充一些執行實例的狀態信息。但是對於控制節點來說,會有不同的邏輯。
2)條件分支(SWITCH)的實現機制
SWITCH用於根據條件判斷,執行不同的分支。
實際上,該節點的Task不做任何操作,TaskMapper根據分支條件,判斷出要走的分之后,返回對應分支的第一個Task。
SwitchTaskMapper.java getMappedTasks方法關鍵代碼:
3)並行(FORK)的實現機制
FORK用於開啟多個並行分支。
實際上,該節點的Task不做任何操作,TaskMapper返回所有並行分支的第一個Task。
ForkJoinTaskMapper.java getMappedTasks關鍵代碼:
總的來說,分支(SWITCH)、並行(FORK)節點本身沒有執行邏輯,其通過TaskMapper返回到實際要執行的Task,然后交給Decider Service處理。
重試的實現機制
重試和其延遲時間設置,都是借助任務隊列的功能實現的。
重試:將任務重新添加到任務隊列
重試的延遲時間:添加到任務隊列時設置延遲時間,延遲時間過后,任務才能在隊列中被poll出來執行
五 完整性保障機制
由於調度過程中可能會出現因機器重啟、網絡異常、JVM崩潰等偶發情況,這些會導致的decide過程意外終止,流程執行不完整,展現出如流程一直運行中(實際已經沒有在調度),或者其它狀態錯誤等異常現象。
1 WorkflowReconciler
針對這種情況,conductor有一個WorkflowReconciler,會定期嘗試decide所有正在運行中的流程,修復流程執行的一致性。此外,它還有一個作用是校驗流程超時時間。
2 decideQueue
那么WorkflowReconciler是如何獲取到當前運行中的流程呢,答案是decideQueue。
decideQueue和任務隊列相同,也是一個具有延遲功能的隊列,其存放的是正在執行中的流程的實例id。在任務開始執行時(包括新啟動執行、重試執行、恢復執行、重跑執行等),會將實例id push到decideQueue中;在執行結束(成功、失敗)時,會從decideQueue中刪除實例id。
3 ExecutionLockService
WorkflowReconciler會定期嘗試decide所有正在運行中的流程用於超時判斷、維護流程一致性。但是流程本身正常執行也會觸發decide,如果同一個執行同時觸發兩個decide,可能會導致狀態混亂,執行卡住等問題。
conductor采用了鎖來解決這個問題,其提供了單機LocalOnlyLock(基於信號量實現)、redis分布式鎖(基於redission實現)、zookeeper分布式鎖三種實現。
decide方法中最開始會嘗試獲取鎖,如果獲取失敗則直接返回。通過鎖來保障不會對同一個流程實例並發執行decide。
由於鎖是可配置的,可能會導致一個誤區:單台機器的話不用配置鎖。其實單機也是需要配置鎖的,因為WorkflowReconciler和流程正常執行會產生沖突,可能會導致偶發的流程狀態混亂問題。
原文鏈接
本文為阿里雲原創內容,未經允許不得轉載。