7

开源轻量级工作流WorkflowCore介绍 - 寻找无名的特质

 2 years ago
source link: https://www.cnblogs.com/zhenl/p/16495977.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在.Net Framework环境下,我们使用Windows Workflow Foundation(WF)作为项目的工作流引擎,可是.Net Core已经不支持WF了,需要为基于.Net Core的项目选择新的工作流引擎。基本要求如下:

  • 轻量级,部署和使用都很简单。
  • 有相当数量的用户,往往使用的人越多,产品也就越可靠,遇到问题也容易找到解决办法。
  • 支持使用配置文件定义工作流,而不仅仅是使用代码定义。

符合上述要求的开源项目有几个,这里介绍开源项目WorkflowCore,项目地址:https://github.com/danielgerlag/workflow-core。
本文的示例可以从github下载:https://github.com/zhenl/ZL.WorflowCoreDemo

简单的控制台项目

首先,使用Visual Studio创建一个.Net Core的控制台项目,在NuGet管理器中引入下面程序包:

  • WorkflowCore
  • Microsoft.Extensions.DependencyInjection
  • Microsoft.Extensions.Logging

然后,创建两个工作流的步骤:

using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class HelloWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("你好");
            return ExecutionResult.Next();
        }
    }
}

using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class GoodbyeWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("再见");
            return ExecutionResult.Next();
        }
    }
}

接下来使用这两个步骤定义一个工作流:

using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    public class HelloWorldWorkflow : IWorkflow
    {
        public string Id => "HelloWorld";
        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<HelloWorld>()
                .Then<GoodbyeWorld>();
        }
    }
}

最后,在主程序中,创建WorkflowHost,注册并运行工作流,代码如下:

using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    class Program
    {
        static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();

            host.StartWorkflow("HelloWorld", 1, null);
            Console.ReadLine();
            host.Stop();
        }

        private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();
                        
            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }
    }
}

简单的工作流就完成了。

WorkflowHost

上一节通过一个简单的控制台例子介绍了WorkflowCore工作流的定义和运行过程,从例子中可以看到,工作流是运行在WorkflowHost实例中的,再看一下代码:

static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
                       
            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            
            Console.ReadLine();
            host.Stop();
        }

WorkflowHost的工作过程是这样的,首先需要获取WorkflowHost的实例,然后注册工作流,这里可以注册多个工作流,接下来,启动host,然后可以启动工作流,这里可以启动多个工作流实例,最后,关闭host。

我们需要对WorkflowHost有进一步的了解,第一个问题,每次使用serviceProvider.GetService()获得的host是否是同一对象?为了回答这个问题,我们增加一些代码:

            var host = serviceProvider.GetService<IWorkflowHost>();
            var host1 = serviceProvider.GetService<IWorkflowHost>();

            Console.WriteLine(host == host1);

我们获取两个host变量比较一下看是否指向相同的对象,结果是True,也就是使用serviceProvider.GetService<IWorkflowHost()获得的是相同的对象。

第二个问题,调用host.Stop是否会影响正在执行的流程?
我们修改一下代码,启动流程实例后,马上执行host.Stop():

            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            host.Stop();
            Console.ReadLine();
            

我们发现,没有输出结果,也就是host.Stop()终止了所有流程。
第三个问题,host中启动的流程是否在同一线程运行?
我们启动多个流程,看一下输出结果:

            host.RegisterWorkflow<HelloWorldWorkflow>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            host.StartWorkflow("HelloWorld", 1, null);
            host.StartWorkflow("HelloWorld", 1, null);
            host.Stop();
            Console.ReadLine();

4131-20220719214427203-1597588565.png

说明每个流程是一个独立的线程,并行执行。

下一步我们需要了解流程的参数传递。

流程的数据对象和数据传递

我们已经知道了如何使用Fluent API定义流程和如何注册流程,现在我们需要了解如何定义流程需要处理的数据,和如何进行数据传递。这里举一个最简单的例子来说明。在前面的例子中,我们输出“你好”和“再见”,现在扩展这个需求,流程启动后,等待用户输入名字,然后输出“你好,<输入的名字>”和“<输入的名字>,再见”。为了完成这个需求,需要:

  • 定义一个数据结构用来保存输入的名字
  • 将这个数据结构与流程关联起来
  • 修改流程,让流程等待用户输入
  • 将用户输入的变量传递给流程
    首先我们定义一个简单的类,用来保存输入的名字:
namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class MyNameClass
    {
        public string MyName { get; set; }
    }
}

然后,修改流程的定义:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output(data => data.MyName, step => step.EventData)
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data.MyName);
        }
    }
}

这里,流程声明为 IWorkflow,说明流程使用这个类存储数据,在流程定义中,可以使用data操作相关的数据对象,比如: .Input(step => step.Name, data => data.MyName) 就是将流程数据中的MyName传递给步骤中的Name(step.Name)。

这段代码中还使用WaitFor定义了一个事件,这个事件的输出是将事件接收的外部参数(step.EventData)传递给流程的MyName属性。

还需要修改两个步骤,增加名称字段:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class HelloWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("你好," + Name);
            return ExecutionResult.Next();
        }
    }
}

using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class GoodbyeWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine(Name + ",再见");
            return ExecutionResult.Next();
        }
    }
}

下面是流程注册和运行的代码:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output(data => data.MyName, step => step.EventData)
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data.MyName);
        }
    }
}
using System;
using System.Collections.Generic;

using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using WorkflowCore.Interface;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class FlowRun
    {
        public static void Run()
        {
            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>();
            host.Start();

            var initialData = new MyNameClass();
            var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData).Result;
            
            Console.WriteLine("输入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

            Console.ReadLine();
            host.Stop();
        }

        private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();

            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }
    }
}

我们也可以使用字典作为数据对象,流程的定义如下:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
    {
        public string Id => "HelloWithNameWorkflowDynamic";
        public int Version => 1;

        public void Build(IWorkflowBuilder<Dictionary<string, string>> builder)
        {
            builder
                .StartWith(context => ExecutionResult.Next())
                .WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
                    .Output((step,data)=>data.Add("Name",(string)step.EventData))
                .Then<HelloWithName>()
                    .Input(step => step.Name, data => data["Name"])
                .Then<GoodbyeWithName>()
                    .Input(step => step.Name, data => data["Name"]);
        }
    }
}

这里没有使用自定义的类,而是使用了字典Dictionary<string, string>,流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
            host.Start();

            var initialData = new Dictionary<string,string>();
            var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
            
            Console.WriteLine("输入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

            
            Console.ReadLine();
            foreach (var key in initialData.Keys)
            {
                Console.WriteLine(key + ":" + initialData[key]);
            }
            Console.ReadLine();
            host.Stop();

采用JSON格式定义流程

WorkflowCore 支持采用JSON或者YAML格式定义流程,使用时通过使用IDefintionLoader加载流程来替代RegisterWorkflow。我们仍然通过简单的例子来说明。在我们现有的工程中已经定义了几个简单的流程步骤,我们用JSON格式将这几个步骤组成简单的工作流。

首先,在现有的解决方案中增加一个.Net Core的控制台项目,名称为ZL.WorkflowCoreDemo.Json,使用NuGet引入WorkflowCore,Microsoft.Extensions.Logging,还有WorkflowCore.DSL,然后,我们在项目中增加一个json文件,将文件的属性“复制到输出目录”修改为“始终复制”:

4131-20220720080545309-131295196.png

在json文件中定义流程:

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      
    }
  ]
}

Json定义格式符合WorkflowCore的DSL,这里不进行DSL的详细介绍,我们重点关注流程如何定义,加载和运行。
我们可以将前面项目中的代码拷贝过来进行修改,首先修改下面的函数:

private static IServiceProvider ConfigureServices()
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection();
            services.AddLogging();
            services.AddWorkflow();
            //这是新增加的服务
            services.AddWorkflowDSL();

            var serviceProvider = services.BuildServiceProvider();

            return serviceProvider;
        }

ConfigureServices新增加了services.AddWorkflowDSL();
在主函数中,使用IDefintionLoader加载JSON格式的流程定义:

static void Main(string[] args)
        {
            IServiceProvider serviceProvider = ConfigureServices();

            var loader = serviceProvider.GetService<IDefinitionLoader>();

            var json = System.IO.File.ReadAllText("myflow.json");
            loader.LoadDefinition(json, Deserializers.Json);
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.Start();
            host.StartWorkflow("HelloWorld", 1, null);
            
            Console.ReadLine();
            host.Stop();
        }

现在,流程可以运行了。

在研究过程中发现了一个坑,可能需要注意。在这个例子中我们使用了前面项目定义的流程步骤,如果在本项目中定义流程步骤,会出现找不到相应动态库的错误,不知道是否是一个缺陷。

JSON格式(DSL)定义流程与使用Fluent API定义流程的比较

前面我们分别讨论了使用Fluent API定义流程和使用JSON格式定义流程,按照以前的使用经验,感觉这两种定义方式应该可以互相转换,互相代替,但在实际应用中发现并不是如此,两种方式都有不能被替代的功能。

使用Fluent API可以使用Lambda 表达式定义步骤

我们可以在流程中直接使用Lambda表达式定义步骤,而不需要定义类,比如:

public class HelloWorldWorkflow : IWorkflow
{
    public string Id => "HelloWorld";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder
            .StartWith(context =>
            {
                Console.WriteLine("你好");
                return ExecutionResult.Next();
            })
            .Then(context =>
            {
                Console.WriteLine("再见");
                return ExecutionResult.Next();
            });
    }
}

这种方式无法使用JSON等格式实现。

采用JSON等DSL格式可以方便地定义步骤间的跳转

采用JSON等DSL格式时,每个步骤有明示的ID,步骤转移通过ID标识进行,这样可以很方便地进行步骤间的跳转。而采用Fluent API则没有这么灵活,我们看以下的定义:

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      "NextStepId": "Hello"
    }
  ]
}

步骤“Hello”执行完成后,执行"Bye",“Bye”执行完又回到“Hello”,如此循环。但在Fluent API中就没有这么方便,必须使用循环或者其它的方式。而这种跳转方式在实际应用中非常常见,最常见的场景就是审批流程中的提交/驳回,提交-驳回过程可以形成多次循环,这种流程模式,采用带有步骤标记的跳转很容易实现。

流程数据类的局限性

流程相关的数据类和流程步骤中的属性在理论上是没有限制的,我们可以使用复杂的数据类型,比如Dictionary<string,string>或者具有复杂层次的数据类,但在研究中我们发现由于JSON DSL定义的限制,我们无法实现复杂数据结构的数据传递。使用Fluent API定义的流程中,可以使用Lamdba 表达式,但在JSON DSL中没找到更好的方法。

下面的代码展示通过Lamdba表达式实现两个Dictionary<string,string>之间的数据传递,但在DSL中没有对应的方式:

                    .Output((step, data)=> {
                        var dic = step.EventData as Dictionary<string, object>;
                        foreach (var key in dic.Keys)
                        {
                            if (data.MyDic.ContainsKey(key)) data.MyDic[key] = dic[key];
                            else data.MyDic.Add(key, dic[key]);
                        }

而在实际应用中,我们需要使用流程定义文件而不是写死的代码来定义流程,这样在流程修改时,就不需要修改代码和重新编译部署。这个限制是WorkflowCore在实际项目中落地的一个主要障碍。

工作流持久化与恢复

WorkflowCore提供了几乎针对流行数据库的各种持久化方式,支持SqlServer、Sqlite等关系数据库,也支持MongoDb、Redis等非关系数据库。缺省使用的是在内存中保存流程数据,但在实际应用中,必须将流程数据持久化以保证系统的可靠性。当系统因为计划内或者意外原因出现异常后,正在执行的流程应该能够在断点处恢复并继续执行。我们改造一下第一部分的例子,增加持久化设置,并模拟流程中断和恢复过程。

首先,我们需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,当然也可以使用其它类型的数据存储。

然后,修改ConfigureServices,将services.AddWorkflow()修改为:

services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));

最后修改一下执行代码,增加流程Id输入和恢复代码:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>();
            host.Start();

            var initialData = new Dictionary<string,string>();

            Console.WriteLine("请输入需要恢复的流程编号,如执行新流程直接回车:");
            string workflowId = Console.ReadLine();
            
            if (string.IsNullOrEmpty(workflowId))
            {
                workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData).Result;
                Console.WriteLine(workflowId);
            }
            else
            {
                host.ResumeWorkflow(workflowId);
            }
              

            
            Console.WriteLine("输入名字");
            string value = Console.ReadLine();
            host.PublishEvent("MyEvent", workflowId, value);

下面,我们模拟中断-恢复过程。首先,运行程序,不输入流程id,直接按回车,会生成新的流程,并输出流程Id,拷贝这个流程ID,并退出程序:

4131-20220719214825014-951610615.png

再次执行程序,输入或粘贴上一次生成的流程编号,可以继续执行流程:

4131-20220719214834324-289203090.png

我们已经创建简单的工作流,并可以在控制台环境运行,现在我们可以为工作流创建简单的单元测试,这里我们使用xUnit作为测试框架。

在ZL.WorkflowCoreDemo解决方案中增加一个xUnit测试项目,命名为ZL.WorkflowCoreDemo.Test,创建好的项目中已经包含xunit和xunit.runner.visualstudio。我们还需要使用NuGet引入其它的框架,首先要引入FluentAssertions,这个框架结合xUnit,可以让 我们在测试中使用Should断言。还需要引入WorkflowCore和WorkflowCore.Testing以及我们需要测试的项目。这里我们测试最简单的HelloWorldWorkflow。

接下来编写测试代码,测试类需要继承WorkflowTest<流程类,流程相关的数据类>,由于HelloWorldWorkflow没有相关的数据类,我们使用dynamic代替,类的定义如下:

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest()
        {
            Setup();
        }

        [Fact]
        public void Test1()
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data);
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

            WorkflowStatus status = GetStatus(workflowId);
            status.Should().Be(WorkflowStatus.Complete);
            UnhandledStepErrors.Count.Should().Be(0);
           
        }

    }
}

需要注意的是在测试类的构造函数中调用Setup(),用来初始化流程引擎。

现在我们可以在测试资源管理器中运行测试项目,如果一切顺利的化,结果是这样的:

4131-20220720080456498-65395879.png

但有时候理想和现实总是有些差距,我在执行时遇到了如下的异常:

4131-20220720080513496-115327756.png

通过研究发现我引用的WorkflowCore是最新的3.1.2版本,而WorkflowCore.Testing的版本是2.2,应该是版本不一致造成的问题,WorkflowCore和WorkflowCore.Testing的更新不同步。这时,开源项目的好处就体现出来了,通过查看代码,改写测试类如下:

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest()
        {
            Setup();
        }

        [Fact]
        public void Test1()
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data);
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

            WorkflowStatus status = GetStatus(workflowId);
            status.Should().Be(WorkflowStatus.Complete);
            UnhandledStepErrors.Count.Should().Be(0);
           
        }

        protected new WorkflowStatus GetStatus(string workflowId)
        {
            var instance = PersistenceProvider.GetWorkflowInstance(workflowId).Result;
            return instance.Status;
        }

        protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut)
        {
            var status = GetStatus(workflowId);
            var counter = 0;
            while ((status == WorkflowStatus.Runnable) && (counter < (timeOut.TotalMilliseconds / 100)))
            {
                Thread.Sleep(100);
                counter++;
                status = GetStatus(workflowId);
            }
        }
    }
}

再次运行,测试通过了。

Activity Workers

前面提到了使用WaitFor暂停工作流,等待人工输入后发布事件重新激活流程,今天介绍另一种方式,使用WorkflowCore的Activity,它的作用就是等待数据输入,数据输入完成后,工作流继续执行。下面是简单的例子:

using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ActivityWorker
{
    public class MyActivityWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "MyActivityWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {

            builder
                .StartWith<HelloWithName>().Input(data => data.Name, step => step.MyName)
                    .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, data => data.MyName)
                    .Activity("activity-2", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                     .Then<HelloWithName>().Input(step => step.Name, data => data.MyName)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, data => data.MyName);
        }
    }
}

这个例子很简单,使用了我们前面定义的两个步骤,HelloWithName和GoodbyeWithName,Activity在这里就是接收外部输入的Name。流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("MyActivityWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

            if (activity != null)
            {
                Console.WriteLine("输入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
            }

            activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1)).Result;

            if (activity != null)
            {
                Console.WriteLine("输入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
            }

            Console.ReadLine();
            host.Stop();

工作流启动后,需要通过host.GetPendingActivity获取Activity,获取成功,就从外部获取数据,然后使用host.SubmitActivitySuccess提交数据。

WaitFor vs Activity

使用WorkflowCore获取外部数据时,有两种方法可以让流程等待外部数据,一是使用WaitFor注册一个事件,外部数据输入完成后,通过PublishEvent返回流程;另一种是使用Activity,注册一个人工活动,执行到这个活动时,工作流等待,外部代码通过GetPendingActivity获取相应的Activity,通过SubmitActivitySuccess提交数据。看起来两种都可以完成外部数据输入的任务,但实际中发现GetPendingActivity无法获取是哪一个工作流实例的活动,如果有两个实例同时运行,就没有办法分清除向哪个流程提报数据:

            var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;
            var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass).Result;

             //上面两个实例中有相同的activity-1,无法知道这里获取的是哪一个实例的活动,         
            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

WairFor事件发布时有工作流实例ID传入:

host.PublishEvent("MyEvent", workflowId, value);

没有上面的缺陷。

使用ForEach并行执行多个流程

如果需要同时执行多个过程相同的而输入不同的流程,可以使用ForEach控制语句,一定要注意,这里的ForEach不是循环,不是一个流程执行完再执行另一个流程,我们仍然使用前面定义的简单的步骤来组织ForEach示例流程,代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParalleWorkflow : IWorkflow
    {
        public string Id => "ParalleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
            .StartWith(context => { Console.WriteLine("开始"); ExecutionResult.Next(); })
            .ForEach(data => new List<string>() { "张三", "李四", "王五", "赵六" })
                .Do(x => x
                    .StartWith<HelloWithName>()
                        .Input(step => step.Name, (data, context) => context.Item as string)
                    .Then<GoodbyeWithName>()
                        .Input(step => step.Name, (data, context) => context.Item as string)
                    )
            .Then(context => { Console.WriteLine("结束"); ExecutionResult.Next(); });
        }
    }
}

在这个例子里,我们没有定义相关的数据类,需要输入的人名作为ForEach中的循环变量,这些变量保存在context中,输入到相应的环节中。执行代码如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<ParalleWorkflow>();

            host.Start();
            host.StartWorkflow("ParalleWorkflow", 1, null);


            Console.ReadLine();
            host.Stop();

Parallel并行执行多个流程

前面我们提到了使用ForEach执行并行流程,这些流程的执行过程相同,不同的只是输入的参数。如果需要并行执行多个不同的流程,需要使用Parallel,示例代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParallePathWorkflow : IWorkflow
    {
        public string Id => "ParallePathWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
            .StartWith(context => { Console.WriteLine("开始"); ExecutionResult.Next(); })
            .Parallel()
                .Do(then =>
                    then.StartWith(context=>{ Console.WriteLine("分支一开始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支一结束"); ExecutionResult.Next(); }))
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二开始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支二结束"); ExecutionResult.Next(); }))
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二开始"); ExecutionResult.Next(); })
                        .Then(context => { Console.WriteLine("分支二结束"); ExecutionResult.Next(); }))
            .Join()
            .Then(context => { Console.WriteLine("结束"); ExecutionResult.Next(); });
        }
    }
}

为了说明分支语句的构成,这个流程没有使用关联的数据类,也没有使用类定义步骤,全部使用Lambda表达式。Parallel的结构是分支的开始是Parallel(),结束是Join(),每个分支在Do语句中表示。流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<ParallePathWorkflow>();
            host.Start();
            host.StartWorkflow("ParallePathWorkflow", 1, null);
            Console.ReadLine();
            host.Stop();

While循环

While循环会重复执行某些步骤,直到条件得到满足再继续执行下面的流程。使用While循环可以实现审批流程中的“提交/驳回”,如果审批没有通过,驳回重新输入,直到审批通过或者驳回次数到达上限。这里举一个简单的例子说明使用方法,结合前面提到的Activity,可以实现对输入进行判断,如果输入不满足要求,就重新输入。流程定义如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class WhileWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "WhileWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName)
                .While(data => data.MyName.Length < 3)
                    .Do(x => x
                        .StartWith(context=> { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); })
                        .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result))
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程运行的代码如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<WhileWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("WhileWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;

            
            while (activity != null)
            {
                Console.WriteLine("输入大于3个字符的名字结束,小于3个字符的名字继续");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
            }
                        
            Console.ReadLine();
            host.Stop();

If判断比较简单,根据流程关联的数据对象中的值进行判断,如果条件满足执行相应的分支。需要注意的是没有else相关语句,如果需要实现相关逻辑,需要再次进行一次条件相反的判断。下面是简单的例子,仍然使用前面定义的数据类和步骤,输入采用Activity:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class IfWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "IfWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context=> ExecutionResult.Next())
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)    
                .If(data => data.MyName.Length < 3)
                    .Do(then=>then
                        .StartWith(context => { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); }))
                .If(data => data.MyName.Length >= 3)
                    .Do(then => then
                        .StartWith(context => { Console.WriteLine("输入大于等于3个字符"); ExecutionResult.Next(); }))
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程的运行代码如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<IfWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("IfWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("输入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                
            }

            Console.ReadLine();
            host.Stop();

条件分支Decision Branches

Decision Branches有点类似于switch语句,可以为每个条件创建一个分支,这些分支相对独立,根据不同的条件选择执行。如果使用Fluent API,可以使用CreateBranch方法创建分支,然后在流程中使用分支。为了说明问题,我们改造前面的If流程,使用Decision Branches实现相同的功能,流程定义的代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class DecisionWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "DecisionWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            var branch1 = builder.CreateBranch()
                .StartWith(context => { Console.WriteLine("输入小于3个字符"); ExecutionResult.Next(); });
            var branch2 = builder.CreateBranch()
                .StartWith(context => { Console.WriteLine("输入大于等于3个字符"); ExecutionResult.Next(); });

            builder
                .StartWith(context => ExecutionResult.Next())
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result)
                .Decide(data => data.MyName.Length)
                     .Branch((data, outcome) => data.MyName.Length<3, branch1)
                     .Branch((data, outcome) => data.MyName.Length >= 3, branch2)
                .Then<GoodbyeWithName>()
                   .Input(step => step.Name, data => data.MyName);
        }
    }
}

流程执行定义的代码如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();
            host.RegisterWorkflow<DecisionWorkflow, MyNameClass>();

            host.Start();

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("DecisionWorkflow", 1, myClass);

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("输入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);
                
            }

            Console.ReadLine();
            host.Stop();

使用Schedule执行定时任务

WorkflowCore 提供了定时执行后台任务的功能,使用Schedule可以定义异步执行的任务,在工作流的后台执行。示例代码如下:

using System;
using WorkflowCore.Interface;


namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class ScheduleWorkflow : IWorkflow
    {
        public string Id => "ScheduleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("开始"))
                    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("后台工作")))
                .Then(context => Console.WriteLine("前台工作"));
        }
    }
}

在上面的代码中,工作流开始后,定义了一个Schedule,这个任务在延时5秒后,启动一个后台流程。流程的执行代码如下:

           IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();

            host.RegisterWorkflow<ScheduleWorkflow>();
            host.Start();

            
            var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null).Result;

            Console.ReadLine();
            host.Stop();

流程的执行代码与前面的例子基本类似,执行结果如下:
4131-20220719215344986-868943856.png

执行时,前台任务完成5秒后,后台工作才执行。

使用Recur执行重复的后台任务

前面介绍的Schedule可以启动一个后台的定时任务,这个任务只执行一次。如果需要执行多次固定间隔的任务,可以使用Recur,当条件满足时任务不再执行。Recur的定义与Schedule类似,只是多了条件判断输入,流程定义的代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class RecurWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "RecurWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("开始"))
                    .Recur(data => TimeSpan.FromSeconds(5),data=>data.MyName.Length>5).Do(recur => recur
                    .StartWith<HelloWithName>()
                    .Input(step => step.Name, data => data.MyName))
                .Then(context => Console.WriteLine("前台工作"))
                .Activity("activity-1", (data) => data.MyName)
                        .Output(data => data.MyName, step => step.Result);
        }
    }
}

这流程稍微复杂一点,我们增加了使用Activity的输入,目的是看一下前台的输入等待是否会影响后台的进程运行,还有就是前台输入的数据,能否正确传递到后台,流程的运行代码如下:

            IServiceProvider serviceProvider = ConfigureServices();
            var host = serviceProvider.GetService<IWorkflowHost>();

            host.RegisterWorkflow<RecurWorkflow,MyNameClass>();
            host.Start();

            var myClass = new MyNameClass { MyName = "张三" };

            var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass).Result;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;


            if (activity != null)
            {
                Console.WriteLine("输入名字");
                string value = Console.ReadLine();
                host.SubmitActivitySuccess(activity.Token, value);

            }

            Console.ReadLine();
            host.Stop();

运行效果如下:
4131-20220719215449476-1991820541.png

可以看出,前台需要的输入等待并没有影响后台的执行,我们输入一个新名字后:

4131-20220719215457536-300295254.png

集成Elasticsearch

WorkflowCore 自身的查询功能很弱,不过它提供了Elasticsearch的plugin,可以使用Elasticsearch对流程进行索引和查询。不太方便的地方是必须要安装Elasticsearch。这里先简单介绍一下Elasticsearch,它是基于Lucene的搜索服务器,提供了分布式多用户的全文检索引擎,基于RESTful web接口。网上关于Elasticsearch的资料很多,可以自行搜索。

如果希望使用Elasticsearch索引工作流,需要在项目中安装WorkflowCore.Providers.Elasticsearch,使用NuGet安装这个插件,然后在services中进行设置:

using Nest;
...
services.AddWorkflow(cfg =>
{
    ...
    cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200")), "index_name");
});

在代码中,通过依赖注入引入ISearchIndex,使用Search方法进行搜索:

Search(string terms, int skip, int take, params SearchFilter[] filters)

检索的范围包括流程的定义、描述、状态等。如果流程相关的自定义数据类需要检索,数据类需要实现ISearchable接口。

WorkflowCore启动的流程多线程的方式运行,如果流程中出现的异常不会抛出到主程序,很多情况下感觉流程莫名奇妙地结束了。为了避免这种情况,需要显示地声明流程步骤的异常处理。如果使用Fluent API定义流程,可以在流程后附加OnError处理异常,但我们更希望对异常进行集中处理和记录,这时可以使用WorkflowHost服务的OnStepError事件。定义如下:

 var host = serviceProvider.GetService<IWorkflowHost>();
 host.OnStepError += Host_OnStepError;

异常处理代码可以写在Host_OnStepError中:

private static void Host_OnStepError(WorkflowCore.Models.WorkflowInstance workflow, WorkflowCore.Models.WorkflowStep step, Exception exception)
        {
            
        }

实际使用中的问题

到这里,我们介绍了WorkflowCore的使用,下面谈一下这个项目在实际使用时遇到一些问题。

  • 轻量级,部署和使用都很简单。项目本身满足这个条件,但对流程相关的查询功能很弱,如果需要增强,需要Elasticsearch的支持。部署和使用Elasticsearch带来了额外的工作量。
  • WorkflowCore支持使用JSON格式定义工作流,然而从功能上要弱于使用Fluent API定义的工作流,因为不具备解析Lambda表达式的能力
  • 参数传递功能相对较弱,无法传递复杂对象。
    上述问题是我们在实际中遇到的,希望对大家有所帮助。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK