開源工作流引擎 Workflow Core 的研究和使用教程


開源工作流引擎 Workflow Core 的研究和使用教程

一,工作流對象和使用前說明

為了避免歧義,事先約定。

工作流有很多節點組成,一個節點成為步驟點(Step)。

1,IWorkflow / IWorkflowBuilder

Workflow Core 中,用於構建工作流的類繼承 IWorkflow,代表一條有任務規則的工作流,可以表示工作流任務的開始或者 Do() 方法,或工作流分支獲取其它方法。

IWorkflow 有兩個同名接口:

    public interface IWorkflow<TData>
        where TData : new()
    {
        string Id { get; }
        int Version { get; }
        void Build(IWorkflowBuilder<TData> builder);
    }

    public interface IWorkflow : IWorkflow<object>
    {
    }

Id:此工作流的唯一標識符;

Version:此工作流的版本。

void Build:在此方法內構建工作流。

工作流運作過程中,可以傳遞數據。有兩種傳遞方法:使用泛型,從運行工作流時就要傳入;使用 object 簡單類型,由單獨的步驟產生並且傳遞給下一個節點。

IWorkflowBuilder 是工作流對象,構建一個具有邏輯規則的工作流。可以構建復雜的、具有循環、判斷的工作流規則,或者並行或者異步處理工作流任務。

一個簡單的工作流規則:

    public class DeferSampleWorkflow : IWorkflow
    {
        public string Id => "DeferSampleWorkflow";
            
        public int Version => 1;
            
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context =>
                {
                    // 開始工作流任務
                    Console.WriteLine("Workflow started");
                    return ExecutionResult.Next();
                })
                .Then<SleepStep>()
                    .Input(step => step.Period, data => TimeSpan.FromSeconds(20))
                .Then(context =>
                {
                    Console.WriteLine("workflow complete");
                    return ExecutionResult.Next();
                });
        }
    }

2,EndWorkflow

此對象表示當前工作流任務已經結束,可以表示主工作流或者工作流分支任務的完成。

        /// Ends the workflow and marks it as complete
        IStepBuilder<TData, TStepBody> EndWorkflow();

因為工作流是可以出現分支的,每個工作流各自獨立工作,每個分支都有其生命周期。

3,容器

ForEachWhileIfWhenScheduleRecur步驟容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>

Parallel、Saga是步驟的容器,都返回 IStepBuilder<TData, Sequence>

ForEach、While、If、When、Schedule、Recur 返回類型的接口:

    public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
        where TStepBody : IStepBody
        where TReturnStep : IStepBody
    {
        /// The block of steps to execute
        IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);

Parallel、Saga :

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

        /// Execute a sequence of steps in a container
        IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

也就是說,ForEach、While、If、When、Schedule、Recur 是真正的容器。

按照我的理解,繼承了 IContainerStepBuilder的,是一個容器,一個流程下的一個步驟/容器;因為 Workflow Core 作者對接口的命名很明顯表達了 This a container

因為里面包含了一組操作,可以說是一個步驟里面包含了一個流程,這個流程由一系列操作組成,它是線性的,是順序的里面是一條工作流(Workflow)。

而 Parllel、Saga,相當於步驟點的容器。

更直觀的理解是電路,繼承 IContainerStepBuilder 的是串聯設備的容器,是順序的;

Parllel 是並聯電路/設備的一個容器,它既是一個開關,使得一條電路變成多條並流的電路,又包含了這些電路的電器。里面可以產生多條工作流,是多分支的、不同步的、獨立的。

1

從實現接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都實現了 Do() 方法,而 Saga 沒有實現。

關於 Saga,后面說明。

4,工作流的步驟點

實現接口如下:

IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);

        IEnumerable<WorkflowStep> GetUpstreamSteps(int id);

        IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名稱 說明
StartWith 任務的開始,必須調用此方法
GetUpstreamSteps 獲取上一個步驟(StepBody)的ID
UseDefaultErrorBehavior 不詳

StepBody 是一個節點,IStepBuilder 構建一個節點,只有通過 StartWith,才能開始一個工作流、一個分支、異步任務等。

UseDefaultErrorBehavior筆者沒有使用到,不敢瞎說。貌似與事務有關,當一個步驟點發生異常時,可以終止、重試等。

二,IStepBuilder 節點

IStepBuilder 表示一個節點,或者說一個容器,里面可以含有其它操作,例如並行、異步、循環等。

1,設置屬性的方法

Name:設置此步驟點的名稱;
id:步驟點的唯一標識符。

        /// Specifies a display name for the step
        IStepBuilder<TData, TStepBody> Name(string name);

        /// Specifies a custom Id to reference this step
        IStepBuilder<TData, TStepBody> Id(string id);

2,設置數據

前面說到,工作流每個步驟點傳遞數據有兩種方式。

TData(泛型) 是工作流中,隨着流傳的數據,這個對象會在整個工作流程生存。

例如 Mydata

 class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyData> builder)
        {
        ...
        }
    }
 public class MyData
    {
        public int Counter { get; set; }
    }

3,Input / Output

為當前步驟點(StepBody)設置數據,亦可為 TData 設置數據。

兩類數據:每個步驟點都可以擁有很多字段、屬性和方法等;工作流流轉 TData。

Input、Output 是設置這些數據的具體方法。

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);

        IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);

        IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);

        IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);

三,工作流節點的邏輯和操作

容器操作

1,Saga

用於在容器中執行一系列操作。

    /// Execute a sequence of steps in a container
    IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);

雖然注釋說明 “用於在容器中執行一系列操作”,但實際上它不是一個真正的”容器“。

因為它沒有繼承 IContainerStepBuilder,也沒有實現 Do()

但是它返回的 Sequence 實現了ContainerStepBody

如果說真正的容器相當於一條長河流中的一個湖泊(可以容納和儲水),而 Saga 可能只是某一段河流的命名,而不是具體的湖泊。

或者說 static void Main(string[] args)里面的代碼太多了,新建一個方法體,把部分代碼放進去。總不能把所有代碼寫在一個方法里吧?那么創建一個類,把代碼分成多個部分,放到不同方法中,增強可讀性。本質還是沒有變。

Saga 可以用來處理事務,進行重試或回滾等操作。后面說明。

普通節點

1,Then

用於創建下一個節點,創建一個普通節點。可以是主工作流的節點(最外層)、或者作為循環、條件節點里的節點、作為節點中節點的節點。

 IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;

        IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);

2,Attach

Then 作為普通節點,按順序執行。操作對象是類型、StepBody。

Attach 也是普通節點,無特殊意義,通過 id 來指定要執行 StepBody 。可以作為流程控制的跳轉。

相當於 goto 語句。

        /// Specify the next step in the workflow by Id
        IStepBuilder<TData, TStepBody> Attach(string id);

事件

1,WaitFor

用於定義事件,將當前節點作為事件節點,然后在后台掛起,工作流會接着執行下一個節點。在工作流停止前,可以通過指定 標識符(Id) 觸發事件。在一個工作流中,每個事件的標識符都是唯一的。

        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);


        IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

條件體和循環體

1,End

意思應該是結束一個節點的運行。

如果在 When 中使用,相當於 break。

        IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;

使用例子

            builder
                .StartWith<RandomOutput>(x => x.Name("Random Step"))
                    .When(0)
                        .Then<TaskA>()
                        .Then<TaskB>()                        
                        .End<RandomOutput>("Random Step")
                    .When(1)
                        .Then<TaskC>()
                        .Then<TaskD>()
                        .End<RandomOutput>("Random Step");

2,CancelCondition

在一個條件下過早地取消此步驟的執行。

應該相當於 contiune。

        /// Prematurely cancel the execution of this step on a condition
        IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);

節點的異步或多線程

1,Delay

延遲執行,使得當前節點延時執行。並非是阻塞當前的工作流運行。Delay 跟在節點后面,使得這個節點延時運行。可以理解成異步,工作流不會等待此節點執行完畢,會直接執行下一個節點/步驟。

        /// Wait for a specified period
        IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);

2,Schedule

預定執行。將當前節點設置一個時間,將在一段時間后執行。Schedule 不會阻塞工作流。

Schedule 是非阻塞的,工作流不會等待Schedule執行完畢,會直接執行下一個節點/步驟。

        /// Schedule a block of steps to execute in parallel sometime in the future
        IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);

例子

            builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => Console.WriteLine("Doing normal tasks"));

3,Recur

用於重復執行某個節點,直至條件不符。

Recur 是非阻塞的,工作流不會等待 Rezur 執行完畢,會直接執行下一個節點/步驟。

        /// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
        IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);

用於事務的操作

相當於數據庫中的事務,流程中某些步驟發生異常時的時候執行某些操作。

例如:

        builder
            .StartWith(context => Console.WriteLine("Begin"))
            .Saga(saga => saga
                .StartWith<Task1>()
                    .CompensateWith<UndoTask1>()
                .Then<Task2>()
                    .CompensateWith<UndoTask2>()
                .Then<Task3>()
                    .CompensateWith<UndoTask3>()
            )
                .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
            .Then(context => Console.WriteLine("End"));

1,CompensateWith

如果此步驟引發未處理的異常,則撤消步驟;如果發生異常,則執行。

可以作為節點的 B計划。當節點執行任務沒有問題時, CompensateWith 不會運行;如果節點發生錯誤,就會按一定要求執行 CompensateWith 。

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;

        IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);

        IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);

2,CompensateWithSequence

如果此步驟引發未處理的異常,則撤消步驟;如果發生異常,則執行。與 CompensateWith 的區別是,傳入參數前者是 Func,后者是 Action。

CompensateWith 的內部實現了 CompensateWith,是對 CompensateWith 的封裝。

        /// Undo step if unhandled exception is thrown by this step
        IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);

3,OnError

用於事務操作,表示發生錯誤時如果回滾、設置時間等。一般與 Saga 一起使用。

OnError 是阻塞的。

        /// Configure the behavior when this step throws an unhandled exception
        IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);

OnError 可以捕獲一個容器內,某個節點的異常,並執行回滾操作。如果直接在節點上使用而不是容器,可以發生回滾,然后執行下個節點。如果作用於容器,那么可以讓容器進行重新運行,等一系列操作。

OnError 可以與 When、While 等節點容器一起使用,但他們本身帶有循環功能,使用事務會讓代碼邏輯變得奇怪。

Saga 沒有條件判斷、沒有循環,本身就是一個簡單的袋子,是節點的容器。因此使用 Saga 作為事務操作的容器,十分適合,進行回滾、重試等一系列操作。

四,條件或開關

迭代

1,ForEach

迭代,也可以說是循環。內部使用 IEnumerable 來實現。

與 C# 中 Foreach 的區別是,C# 中是用來迭代數據;

而工作流中 ForEach 用來判斷元素個數,標識應該循環多少次。

ForEach 是阻塞的。

        /// Execute a block of steps, once for each item in a collection in a parallel foreach
        IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);

示例

            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();

最終會循環5次。

條件判斷

1,When

條件判斷,條件是否真。

When 是阻塞的。

When 可以捕獲上一個節點流轉的數據(非 TData)。

        /// Configure an outcome for this step, then wire it to another step
        [Obsolete]
        IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
        
        
        /// Configure an outcome for this step, then wire it to a sequence
        IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);

前一個方法例如

When(0),會捕獲 return ExecutionResult.Outcome(value); 的值,判斷是否相等。但是這種方式已經過時。

需要使用表達式來判斷。例如

.When(data => 1)
.When(data => data.value==1)

2,While

條件判斷,條件是否真。與When有區別,When可以捕獲 ExecutionResult.Outcome(value);

While 是阻塞的。

        /// Repeat a block of steps until a condition becomes true
        IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);

示例

            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();

3,If

條件判斷,是否符合條件。

If是阻塞的。

        /// Execute a block of steps if a condition is true
        IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);

When、While、If的區別是,When、While 是條件是否為真,If是表達式是否為真。

實質上,是語言上的區別,與代碼邏輯無關。

真假用 When/While,條件判斷、表達式判斷用 If 。

節點並發

1,Parallel

並行任務。作為容器,可以在里面設置多組任務,這些任務將會同時、並發運行。

Parallel 是阻塞的。

        /// Execute multiple blocks of steps in parallel
        IParallelStepBuilder<TData, Sequence> Parallel();

示例:

                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();

有三個 Do,代表三個並行任務。三個 Do 是並行的,Do 內的代碼,會按順序執行。

Paeallel 的 Do:

    public interface IParallelStepBuilder<TData, TStepBody>
        where TStepBody : IStepBody
    {
        IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
        IStepBuilder<TData, Sequence> Join();
    }

比起 ForEach、When、While、If,除了有 Do,還有 Join 方法。

對於其它節點類型來說,Do直接構建節點。

對於Parallel來說,Do收集任務,最終需要Join來構建節點和運行任務。

五,其它

寫得長不好看,其它內容壓縮一下。

數據傳遞和依賴注入

Workflow Core 支持對每個步驟點進行依賴注入。

1565439224(1)

支持數據持久化

Workflow Core 支持將構建的工作流存儲到數據庫中,以便以后再次調用。

支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、

Elasticsearch、RabbitMQ... ....

支持動態調用和動態生成工作流

你可以通過 C# 代碼構建工作流,或者通過 Json、Yaml 動態構建工作流。

可以利用可視化設計器,將邏輯和任務生成配置文件,然后動態傳遞,使用 Workflow Core 動態創建工作流。

篇幅有限,不再贅述。

有興趣請關注 Workflow Core:https://github.com/danielgerlag/workflow-core


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM