開源工作流引擎 Workflow Core 的研究和使用教程
一,工作流對象和使用前說明
為了避免歧義,事先約定。
工作流有很多節點組成,一個節點成為步驟點(Step)。
1,IWorkflow / IWorkflowBuilder
Workflow Core 中,用於構建工作流的類繼承 IWorkflow
,代表一條有任務規則的工作流,可以表示工作流任務的開始或者 Do() 方法,或工作流分支獲取其它方法。
IWorkflow 有兩個同名接口:
public interface IWorkflow<TData>
where TData : new()
{
string Id { get; }
int Version { get; }
void Build(IWorkflowBuilder<TData> builder);
}
public interface IWorkflow : IWorkflow<object>
{
}
Id:此工作流的唯一標識符;
Version:此工作流的版本。
void Build
:在此方法內構建工作流。
工作流運作過程中,可以傳遞數據。有兩種傳遞方法:使用泛型,從運行工作流時就要傳入;使用 object 簡單類型,由單獨的步驟產生並且傳遞給下一個節點。
IWorkflowBuilder 是工作流對象,構建一個具有邏輯規則的工作流。可以構建復雜的、具有循環、判斷的工作流規則,或者並行或者異步處理工作流任務。
一個簡單的工作流規則:
public class DeferSampleWorkflow : IWorkflow
{
public string Id => "DeferSampleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
// 開始工作流任務
Console.WriteLine("Workflow started");
return ExecutionResult.Next();
})
.Then<SleepStep>()
.Input(step => step.Period, data => TimeSpan.FromSeconds(20))
.Then(context =>
{
Console.WriteLine("workflow complete");
return ExecutionResult.Next();
});
}
}
2,EndWorkflow
此對象表示當前工作流任務已經結束,可以表示主工作流或者工作流分支任務的完成。
/// Ends the workflow and marks it as complete
IStepBuilder<TData, TStepBody> EndWorkflow();
因為工作流是可以出現分支的,每個工作流各自獨立工作,每個分支都有其生命周期。
3,容器
ForEach
、While
、If
、When
、Schedule
、Recur
是步驟容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>
Parallel、Saga是步驟的容器,都返回 IStepBuilder<TData, Sequence>
。
ForEach、While、If、When、Schedule、Recur 返回類型的接口:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
where TStepBody : IStepBody
where TReturnStep : IStepBody
{
/// The block of steps to execute
IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
Parallel、Saga :
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
也就是說,ForEach、While、If、When、Schedule、Recur 是真正的容器。
按照我的理解,繼承了 IContainerStepBuilder
的,是一個容器,一個流程下的一個步驟/容器;因為 Workflow Core 作者對接口的命名很明顯表達了 This a container
。
因為里面包含了一組操作,可以說是一個步驟里面包含了一個流程,這個流程由一系列操作組成,它是線性的,是順序的。里面是一條工作流(Workflow)。
而 Parllel、Saga,相當於步驟點的容器。
更直觀的理解是電路,繼承 IContainerStepBuilder 的是串聯設備的容器,是順序的;
Parllel 是並聯電路/設備的一個容器,它既是一個開關,使得一條電路變成多條並流的電路,又包含了這些電路的電器。里面可以產生多條工作流,是多分支的、不同步的、獨立的。
從實現接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都實現了 Do()
方法,而 Saga 沒有實現。
關於 Saga,后面說明。
4,工作流的步驟點
實現接口如下:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);
IEnumerable<WorkflowStep> GetUpstreamSteps(int id);
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名稱 | 說明 |
---|---|
StartWith | 任務的開始,必須調用此方法 |
GetUpstreamSteps | 獲取上一個步驟(StepBody)的ID |
UseDefaultErrorBehavior | 不詳 |
StepBody 是一個節點,IStepBuilder 構建一個節點,只有通過 StartWith,才能開始一個工作流、一個分支、異步任務等。
UseDefaultErrorBehavior
筆者沒有使用到,不敢瞎說。貌似與事務有關,當一個步驟點發生異常時,可以終止、重試等。
二,IStepBuilder 節點
IStepBuilder 表示一個節點,或者說一個容器,里面可以含有其它操作,例如並行、異步、循環等。
1,設置屬性的方法
Name:設置此步驟點的名稱;
id:步驟點的唯一標識符。
/// Specifies a display name for the step
IStepBuilder<TData, TStepBody> Name(string name);
/// Specifies a custom Id to reference this step
IStepBuilder<TData, TStepBody> Id(string id);
2,設置數據
前面說到,工作流每個步驟點傳遞數據有兩種方式。
TData(泛型) 是工作流中,隨着流傳的數據,這個對象會在整個工作流程生存。
例如 Mydata
class RecurSampleWorkflow : IWorkflow<MyData>
{
public string Id => "recur-sample";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData> builder)
{
...
}
}
public class MyData
{
public int Counter { get; set; }
}
3,Input / Output
為當前步驟點(StepBody)設置數據,亦可為 TData 設置數據。
兩類數據:每個步驟點都可以擁有很多字段、屬性和方法等;工作流流轉 TData。
Input、Output 是設置這些數據的具體方法。
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);
IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);
IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);
三,工作流節點的邏輯和操作
容器操作
1,Saga
用於在容器中執行一系列操作。
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
雖然注釋說明 “用於在容器中執行一系列操作”,但實際上它不是一個真正的”容器“。
因為它沒有繼承 IContainerStepBuilder
,也沒有實現 Do()
。
但是它返回的 Sequence
實現了ContainerStepBody
。
如果說真正的容器相當於一條長河流中的一個湖泊(可以容納和儲水),而 Saga 可能只是某一段河流的命名,而不是具體的湖泊。
或者說 static void Main(string[] args)
里面的代碼太多了,新建一個方法體,把部分代碼放進去。總不能把所有代碼寫在一個方法里吧?那么創建一個類,把代碼分成多個部分,放到不同方法中,增強可讀性。本質還是沒有變。
Saga 可以用來處理事務,進行重試或回滾等操作。后面說明。
普通節點
1,Then
用於創建下一個節點,創建一個普通節點。可以是主工作流的節點(最外層)、或者作為循環、條件節點里的節點、作為節點中節點的節點。
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);
2,Attach
Then 作為普通節點,按順序執行。操作對象是類型、StepBody。
Attach 也是普通節點,無特殊意義,通過 id 來指定要執行 StepBody 。可以作為流程控制的跳轉。
相當於 goto 語句。
/// Specify the next step in the workflow by Id
IStepBuilder<TData, TStepBody> Attach(string id);
事件
1,WaitFor
用於定義事件,將當前節點作為事件節點,然后在后台掛起,工作流會接着執行下一個節點。在工作流停止前,可以通過指定 標識符(Id) 觸發事件。在一個工作流中,每個事件的標識符都是唯一的。
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
條件體和循環體
1,End
意思應該是結束一個節點的運行。
如果在 When 中使用,相當於 break。
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
使用例子
builder
.StartWith<RandomOutput>(x => x.Name("Random Step"))
.When(0)
.Then<TaskA>()
.Then<TaskB>()
.End<RandomOutput>("Random Step")
.When(1)
.Then<TaskC>()
.Then<TaskD>()
.End<RandomOutput>("Random Step");
2,CancelCondition
在一個條件下過早地取消此步驟的執行。
應該相當於 contiune。
/// Prematurely cancel the execution of this step on a condition
IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
節點的異步或多線程
1,Delay
延遲執行,使得當前節點延時執行。並非是阻塞當前的工作流運行。Delay 跟在節點后面,使得這個節點延時運行。可以理解成異步,工作流不會等待此節點執行完畢,會直接執行下一個節點/步驟。
/// Wait for a specified period
IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
2,Schedule
預定執行。將當前節點設置一個時間,將在一段時間后執行。Schedule 不會阻塞工作流。
Schedule 是非阻塞的,工作流不會等待Schedule執行完畢,會直接執行下一個節點/步驟。
/// Schedule a block of steps to execute in parallel sometime in the future
IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
例子
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("Doing scheduled tasks"))
)
.Then(context => Console.WriteLine("Doing normal tasks"));
3,Recur
用於重復執行某個節點,直至條件不符。
Recur 是非阻塞的,工作流不會等待 Rezur 執行完畢,會直接執行下一個節點/步驟。
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
用於事務的操作
相當於數據庫中的事務,流程中某些步驟發生異常時的時候執行某些操作。
例如:
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then(context => Console.WriteLine("End"));
1,CompensateWith
如果此步驟引發未處理的異常,則撤消步驟;如果發生異常,則執行。
可以作為節點的 B計划。當節點執行任務沒有問題時, CompensateWith 不會運行;如果節點發生錯誤,就會按一定要求執行 CompensateWith 。
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);
2,CompensateWithSequence
如果此步驟引發未處理的異常,則撤消步驟;如果發生異常,則執行。與 CompensateWith 的區別是,傳入參數前者是 Func,后者是 Action。
CompensateWith
的內部實現了 CompensateWith
,是對 CompensateWith
的封裝。
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
3,OnError
用於事務操作,表示發生錯誤時如果回滾、設置時間等。一般與 Saga 一起使用。
OnError 是阻塞的。
/// Configure the behavior when this step throws an unhandled exception
IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError 可以捕獲一個容器內,某個節點的異常,並執行回滾操作。如果直接在節點上使用而不是容器,可以發生回滾,然后執行下個節點。如果作用於容器,那么可以讓容器進行重新運行,等一系列操作。
OnError 可以與 When、While 等節點容器一起使用,但他們本身帶有循環功能,使用事務會讓代碼邏輯變得奇怪。
Saga 沒有條件判斷、沒有循環,本身就是一個簡單的袋子,是節點的容器。因此使用 Saga 作為事務操作的容器,十分適合,進行回滾、重試等一系列操作。
四,條件或開關
迭代
1,ForEach
迭代,也可以說是循環。內部使用 IEnumerable 來實現。
與 C# 中 Foreach 的區別是,C# 中是用來迭代數據;
而工作流中 ForEach 用來判斷元素個數,標識應該循環多少次。
ForEach 是阻塞的。
/// Execute a block of steps, once for each item in a collection in a parallel foreach
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
示例
builder
.StartWith<SayHello>()
.ForEach(data => new List<int>() { 1, 2, 3, 4 })
.Do(x => x
.StartWith<DisplayContext>()
.Input(step => step.Item, (data, context) => context.Item)
.Then<DoSomething>())
.Then<SayGoodbye>();
最終會循環5次。
條件判斷
1,When
條件判斷,條件是否真。
When 是阻塞的。
When 可以捕獲上一個節點流轉的數據(非 TData)。
/// Configure an outcome for this step, then wire it to another step
[Obsolete]
IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
/// Configure an outcome for this step, then wire it to a sequence
IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);
前一個方法例如
When(0)
,會捕獲 return ExecutionResult.Outcome(value);
的值,判斷是否相等。但是這種方式已經過時。
需要使用表達式來判斷。例如
.When(data => 1)
.When(data => data.value==1)
2,While
條件判斷,條件是否真。與When有區別,When可以捕獲 ExecutionResult.Outcome(value);
。
While 是阻塞的。
/// Repeat a block of steps until a condition becomes true
IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
示例
builder
.StartWith<SayHello>()
.While(data => data.Counter < 3)
.Do(x => x
.StartWith<DoSomething>()
.Then<IncrementStep>()
.Input(step => step.Value1, data => data.Counter)
.Output(data => data.Counter, step => step.Value2))
.Then<SayGoodbye>();
3,If
條件判斷,是否符合條件。
If是阻塞的。
/// Execute a block of steps if a condition is true
IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
When、While、If的區別是,When、While 是條件是否為真,If是表達式是否為真。
實質上,是語言上的區別,與代碼邏輯無關。
真假用 When/While,條件判斷、表達式判斷用 If 。
節點並發
1,Parallel
並行任務。作為容器,可以在里面設置多組任務,這些任務將會同時、並發運行。
Parallel 是阻塞的。
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
示例:
.StartWith<SayHello>()
.Parallel()
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 1.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 1.2"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 2.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.2")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.3"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 3.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 3.2"))
.Join()
.Then<SayGoodbye>();
有三個 Do,代表三個並行任務。三個 Do 是並行的,Do 內的代碼,會按順序執行。
Paeallel 的 Do:
public interface IParallelStepBuilder<TData, TStepBody>
where TStepBody : IStepBody
{
IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
IStepBuilder<TData, Sequence> Join();
}
比起 ForEach、When、While、If,除了有 Do,還有 Join 方法。
對於其它節點類型來說,Do直接構建節點。
對於Parallel來說,Do收集任務,最終需要Join來構建節點和運行任務。
五,其它
寫得長不好看,其它內容壓縮一下。
數據傳遞和依賴注入
Workflow Core 支持對每個步驟點進行依賴注入。
支持數據持久化
Workflow Core 支持將構建的工作流存儲到數據庫中,以便以后再次調用。
支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、
Elasticsearch、RabbitMQ... ....
支持動態調用和動態生成工作流
你可以通過 C# 代碼構建工作流,或者通過 Json、Yaml 動態構建工作流。
可以利用可視化設計器,將邏輯和任務生成配置文件,然后動態傳遞,使用 Workflow Core 動態創建工作流。
篇幅有限,不再贅述。
有興趣請關注 Workflow Core:https://github.com/danielgerlag/workflow-core