https://netflix.github.io/conductor/
https://github.com/Netflix/conductor
編譯版: https://jcenter.bintray.com/com/netflix/conductor/
-----------------------------------------------------
Motivation
我們建立了Conductor,以幫助我們在Netflix上編制基於微服務的流程,具有以下特點:
- 允許創建復雜的流程/業務流程,其中單個任務由微服務器實現。
- 基於JSON DSL的藍圖(blueprint)定義了執行流程。
- 為這些流程提供可見性和可追溯性。
- 圍繞暫停,恢復,重新啟動等方式展現控制語義,從而實現更好的體驗。
- 允許更多地重用現有的微服務器,為onboarding提供更簡單的路徑。
- 用戶界面可視化流程。
- 能夠在需要時同步處理所有任務。
- 能夠擴展數以百萬計的同時運行的流程。
- 由客戶抽象的排隊服務支持。
- 能夠在HTTP或其他傳輸(如gRPC)上運行。
Why not peer to peer choreography?
通過對等任務編排,我們發現隨着業務需求和復雜性的增長難以擴展。 Pub / sub模式為最簡單的流程工作,但很快突出了與該方法相關的一些問題:
- 流程在多個應用程序的代碼中被“嵌入”。
- 通常,關於輸入/輸出,SLA等的緊耦合和假設,使其難以適應不斷變化的需求。
- 幾乎沒有辦法系統地回答“我們對過程X做了多少工作”?
API和存儲層可插拔,並提供與不同后台和隊列服務提供者合作的能力。
-----------------------------------------------------------------------------------------------
git clone git@github.com:Netflix/conductor.git
啟動server
方法1:進入server目錄
執行../gradlew server
此命令會自動下載gradle
此腳本最終執行的是如下命令
(java -Dorg.gradle.appname=gradlew -classpath /root/conductor/gradle/wrapper/gradle-wrapper.jar org.gradle.wrapper.GradleWrapperMain server)
方法2: 手動下載gradle工具,進入server目錄
gradle build
進入/root/conductor/server/build/libs 目錄下
java -jar conductor-server-VERSION-all.jar
實現此接口
https://github.com/Netflix/conductor/blob/dev/client/src/main/java/com/netflix/conductor/client/worker/Worker.java
例子:
https://github.com/Netflix/conductor/blob/dev/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java
https://github.com/Netflix/conductor/blob/dev/client/src/test/java/com/netflix/conductor/client/sample/SampleWorker.java
啟動ui:
cd ui gulp watch
訪問 http://localhost:3000/
運行模式:
conductor遵循基於RPC的通信模式,其中worker與server在單獨的機器上運行。 worker通過基於HTTP的端點與服務器通信,並使用輪詢模型來管理工作隊列。
worker是遠程系統,並通過HTTP(或任何支持的RPC機制)與conductor server進行通信。
任務隊列用於為worker安排任務。 我們在內部使用了dyno-queue,但是它可以很容易地與SQS或類似的pub-sub機制進行交換。
conductor重新持久化模塊使用Dynomite存儲狀態和元數據以及Elasticsearch用於索引后端。
請參閱擴展后端部分,以實現對不同數據庫的支持以進行存儲和索引。
要注冊並執行新工作流所需的步驟:
1. 定義工作流的任務定義。
2. 創建工作流定義
3. 創建定期輪詢調度任務的任務worker.
觸發工作流執行
POST /workflow/{name} { ... //json payload as workflow input }
輪詢任務
GET /tasks/poll/batch/{taskType}
更新任務狀態
POST /tasks
{
"outputData": { "encodeResult":"success", "location": "http://cdn.example.com/file/location.png" //any task specific output }, "status": "COMPLETED" }
工作流定義:
工作流使用以DSL為基礎的JSON定義的,包括一組要執行的任務,這些任務組成了工作流,任務是遠程機器上的控制任務(fork, conditional等等),
或應用程序任務(e.g. encode a file)
任務定義:
所有的任務必須在活動的工作流使用之前注冊.
一個任務可以被多個工作流重復使用,任務分成兩類:
- System Task
- Worker Task
System Tasks:
系統任務在Conductor服務器的JVM內執行,由Conductor管理,用於其執行和可擴展性。
DYNAMIC : 基於對任務的輸入表達式導出的工作任務,而不是靜態定義為blueprint的一部分
DECIDE:決策任務 - 實現案例...開關式fork
FORK:fork一組並行的任務。 每組計划並行執行
FORK_JOIN_DYNAMIC:FORK_JOIN_DYNAMIC與FORK類似,但不是並行執行blueprint中定義的任務集,基於此任務的輸入表達式生成並行任務
JOIN:完成FORK和FORK_JOIN_DYNAMIC。 用於合並更多的並行分支*
SUB_WORKFLOW: 將另一個工作流作為子工作流任務。 執行后,實例化子工作流程,等待完成
EVENT:在支持的事件系統(例如conductor,SQS)中產生事件
conductor提供了一個API來創建在與引擎相同的JVM中執行的用戶定義的任務。 有關詳細信息,請參閱WorkflowSystemTask界面。
Worker Tasks
worker tasks由應用程序實現,並在獨立於Conductor的環境中運行。 worker tasks可以用任何語言實現。 這些任務通過REST API端點與Conductor服務器進行通信,以輪詢任務並在執行后更新其狀態。
worker tasks由blueprint中的任務類型SIMPLE標識。