10

Dapr Workflow构建块的.NET Demo - 张善友

 1 year ago
source link: https://www.cnblogs.com/shanyou/p/17132707.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Dapr Workflow构建块的.NET Demo

Dapr 1.10版本中带来了最有亮点的特性就是工作流构建块的的发布,虽然是Alpha 阶段,可以让我们尽早在应用系统中规划工作流, 在使用Dapr的系统中更好的编写负责的分布式应用系统。Dapr 工作流使你能够生成跨多个应用的长时间运行的持久进程或数据流。 Dapr 工作流可以与其他 Dapr API 构建基块结合使用。例如,工作流可以通过服务调用调用另一个服务、触发绑定或检索机密,从而使您能够编排和构建复杂的应用程序方案。

Dapr 工作流构建块的提案早在2022年的5月份提出,计划在 Dapr Runtime 中构建工作流引擎,社区反映非常积极。这个提案(https://github.com/dapr/dapr/issues/4576)内容比较长 ,敖小剑 有一个翻译的中文版:https://cn.dapr.io/post/202206-dapr-workflow-proposal/。在Alpha版本里完成了以下内容:

  • 内置工作流引擎将作为初始预览功能提供 ,这个内置工作流引擎是Azure 上的 持久任务框架 (DTF),在在Azure 基础设施有大量的应用,它是用.NET 编写的。Dapr团队正在把这个实践抽象成工作流构建块,基于Dapr Actor构建块来实现。 Beta 版本会实现 Logic Apps 作为工作流引擎,当然还有更多的工作流引擎可以集成进入Dapr Workflow构建块里。
  • API 将支持 3 种方法(启动、停止、获取状态)的工作流管理
  • API 将作为 alpha 版发布
  • 内置引擎的 API 支持
  • 创作 SDK 将支持:.NET
  • 管理 API SDK 将支持:.NET
  • 文档,详细的介绍了Dapr Workflow构建块的工作原理等,推荐大家详细的看一看:https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/
  • 代码示例(.NET ),快速入门文档里包含了。

Dapr 工作流构建块目前是alpha版本,自然是有很多限制的,不推荐应用于生产环境中,至少等到beta版本时再考虑应用于生产,通常应用于生产环境的组件都是stable 版本的。Dapr Workflow 构建块让我们看到了他的潜力,Dapr Actor 模块基本上.NET圈子用的多,在其他java,go等用的很少,工作流模块组合上 Actor 构建块会让Dapr 上一个台阶,和其他类似框架的领先度进一步加大。我们常说“ 技术总是在短期内被高估,但是在长期又被低估。” 再来看看这句话,短期内被高估是因为在短期内的需求所迫,长期内被低估是因为技术的隐形需求要比实际上的业务务求多得多。

作为开发者,你需要考虑你将要投身的技术领域长期的隐形成本与试错成本。再者,投入到某个技术领域中其实不能走马观花,而要长期以往的进行深入研究与学习,才能获取更高的技术视野。Dapr 作为云原生时代的开发框架值得投入进去深入研究和学习。

在Dapr的 components-contrib仓库里 也有一个 https://github.com/dapr/components-contrib/tree/master/workflows ,其中包含了一个分布式调度框架

Temporal https://github.com/temporalio/temporal 的一个组件实现。

现在 可以使用 .NET SDK 体验 Dapr 工作流,在Dapr 工作流的快速入门文档里包含了.NET 的示例,创建一个简单的控制台应用程序来演示 Dapr 的工作流编程模型和工作流管理 API。

image

控制台应用order-processor 启动并管理在状态存储中存储和检索数据的工作流的生命周期。工作流OrderProcessingWorkflow 由四个工作流活动或任务组成:

  • NotifyActivity:利用记录器在整个工作流程中打印出消息
  • ReserveInventoryActivity:检查状态存储以确保有足够的库存供购买
  • ProcessPaymentActivity:处理和授权付款
  • UpdateInventoryActivity:从状态存储中删除请求的物料,并使用新的剩余库存值更新存储

快速入门存储库中提供的示例代码。仓库地址: https://github.com/dapr/quickstarts.git ,我们把它克隆到本地,进入到示例代码目录 order-processor:

cd workflows/csharp/sdk/order-processor

在终端中,与 Dapr Sidecar 一起启动订单处理器应用:dapr run --app-id order-processor dotnet run ,下面是一次完整运行的输出:

❯❯ order-processor git:(master)[docker-desktop] 15:05 dapr run --app-id order-processor dotnet run
Starting Dapr with id order-processor. HTTP Port: 1599. gRPC Port: 1600
Checking if Dapr sidecar is listening on HTTP port 1599
time="2023-02-18T15:05:30.4699685+08:00" level=info msg="starting Dapr Runtime -- version 1.10.0 -- commit 029ec8cb7a1c88ec5d222bc2b0d1d53541217f19" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.470968+08:00" level=info msg="log level set to: info" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.4739685+08:00" level=info msg="metrics server started on :1601/" app_id=order-processor instance=geffzhang-laptop scope=dapr.metrics type=log ver=1.10.0
time="2023-02-18T15:05:30.4950736+08:00" level=info msg="Resiliency configuration loaded." app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5000719+08:00" level=info msg="standalone mode configured" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5000719+08:00" level=info msg="app id: order-processor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5010762+08:00" level=info msg="mTLS is disabled. Skipping certificate request and tls validation" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5030734+08:00" level=info msg="Dapr trace sampler initialized: DaprTraceSampler(P=1.000000)" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5862994+08:00" level=info msg="local service entry announced: order-processor -> 192.168.1.6:1602" app_id=order-processor component="mdns (nameResolution/v1)" instance=geffzhang-laptop scope=dapr.contrib type=log ver=1.10.0
time="2023-02-18T15:05:30.5878674+08:00" level=info msg="Initialized name resolution to mdns" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.5892504+08:00" level=info msg="loading components" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6287701+08:00" level=info msg="component loaded. name: pubsub, type: pubsub.redis/v1" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6291486+08:00" level=info msg="waiting for all outstanding components to be processed" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6400323+08:00" level=info msg="detected actor state store: statestore" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6400323+08:00" level=info msg="component loaded. name: statestore, type: state.redis/v1" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6400323+08:00" level=info msg="all outstanding components processed" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6405641+08:00" level=info msg="gRPC proxy enabled" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6426739+08:00" level=info msg="gRPC server listening on TCP address: :1600" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.grpc.api type=log ver=1.10.0
time="2023-02-18T15:05:30.6431919+08:00" level=info msg="Enabled gRPC tracing middleware" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.grpc.api type=log ver=1.10.0
time="2023-02-18T15:05:30.6437283+08:00" level=info msg="Enabled gRPC metrics middleware" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.grpc.api type=log ver=1.10.0
time="2023-02-18T15:05:30.6491391+08:00" level=info msg="configuring workflow engine gRPC endpoint" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:30.6506026+08:00" level=info msg="API gRPC server is running on port 1600" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6570537+08:00" level=info msg="enabled metrics http middleware" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.http type=log ver=1.10.0
time="2023-02-18T15:05:30.6570537+08:00" level=info msg="enabled tracing http middleware" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.http type=log ver=1.10.0
time="2023-02-18T15:05:30.6575707+08:00" level=info msg="HTTP server listening on TCP address: :1599" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.http type=log ver=1.10.0
time="2023-02-18T15:05:30.6580969+08:00" level=info msg="http server is running on port 1599" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6586808+08:00" level=info msg="The request body size parameter is: 4" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6592257+08:00" level=info msg="gRPC server listening on TCP address: :1602" app_id=order-processor instance=geffzhang-lapance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.6921456+08:00" level=info msg="dapr initialized. Status: Running. Init Elapsed 192ms" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:30.7694178+08:00" level=info msg="placement tables updated, version: 0" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor.internal.placement type=log ver=1.10.0
Checking if Dapr sidecar is listening on GRPC port 1600Dapr sidecar is up and running.
Updating metadata for app command: dotnet runYou're up and running! Both Dapr and your app logs will appear here.

== APP == info: Microsoft.DurableTask[1]
== APP ==       Durable Task worker is connecting to sidecar at localhost:1600.
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Application started. Press Ctrl+C to shut down.
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Hosting environment: Production
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Content root path: C:\Users\zsygz\Documents\GitHub\daprquickstarts\workflows\csharp\sdk\order-processor
== APP == Starting workflow 49caa2d7 purchasing 10 Cars
== APP == info: Microsoft.DurableTask.Client.Grpc.GrpcDurableTaskClient[40]
== APP ==       Scheduling new OrderProcessingWorkflow orchestration with instance ID '49caa2d7' and 47 bytes of input data.
time="2023-02-18T15:05:44.2802197+08:00" level=info msg="Error processing operation DaprBuiltInActorNotFoundRetries. Retrying in 1s…" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
== APP == info: Microsoft.DurableTask[4]
== APP ==       Sidecar work-item streaming connection established.
time="2023-02-18T15:05:44.3005331+08:00" level=info msg="work item stream established by user-agent: [grpc-dotnet/2.50.0 (.NET 6.0.13; CLR 6.0.13; net6.0; windows; x64)]" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:44.3005331+08:00" level=info msg="worker started with backend dapr.actors/v1-alpha" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:44.3010653+08:00" level=info msg="workflow engine started" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:47.0119252+08:00" level=info msg="placement tables updated, version: 1" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor.internal.placement type=log ver=1.10.0
time="2023-02-18T15:05:47.2813299+08:00" level=info msg="Recovered processing operation DaprBuiltInActorNotFoundRetries." app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime type=log ver=1.10.0
time="2023-02-18T15:05:47.3016215+08:00" level=info msg="49caa2d7: starting new 'OrderProcessingWorkflow' instance." app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:47.3980173+08:00" level=info msg="49caa2d7#0: loading activity state" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:47.4054132+08:00" level=info msg="reminder \"start-6af4e78b\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:47.4055923+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||start-6af4e78b. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
== APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
== APP ==       Received order 49caa2d7 for 10 Cars at $15000
time="2023-02-18T15:05:47.4244248+08:00" level=info msg="reminder \"run-activity\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:47.4249645+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.activity||49caa2d7#0||run-activity. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:47.4401806+08:00" level=info msg="49caa2d7#1: loading activity state" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:47.4493286+08:00" level=info msg="reminder \"new-event-21a81b86\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:47.4496796+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||new-event-21a81b86. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
== APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
== APP ==       Reserving inventory for order 49caa2d7 of 10 Cars
== APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
== APP ==       There are: 100, Cars available for purchase
== APP == Your workflow has started. Here is the status of the workflow: Running
time="2023-02-18T15:05:49.4896663+08:00" level=info msg="reminder \"run-activity\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:49.491597+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.activity||49caa2d7#1||run-activity. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:49.5078761+08:00" level=info msg="49caa2d7#2: loading activity state" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:49.5208857+08:00" level=info msg="reminder \"new-event-b34a7d49\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:49.5276731+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||new-event-b34a7d49. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
== APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
== APP ==       Processing payment: 49caa2d7 for 10 Cars at $15000
== APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
== APP ==       Payment for request ID '49caa2d7' processed successfully
time="2023-02-18T15:05:56.5412886+08:00" level=info msg="reminder \"run-activity\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:56.5418687+08:00" level=info msg="49caa2d7#3: loading activity state" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:05:56.5423941+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.activity||49caa2d7#2||run-activity. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:56.551159+08:00" level=info msg="reminder \"new-event-ef938587\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:05:56.5518012+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||new-event-ef938587. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
== APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
== APP ==       Checking Inventory for: Order# 49caa2d7 for 10 Cars
== APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
== APP ==       There are now: 90 Cars left in stock
time="2023-02-18T15:06:01.5721469+08:00" level=info msg="reminder \"run-activity\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.5727943+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.activity||49caa2d7#3||run-activity. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.5741849+08:00" level=info msg="49caa2d7#4: loading activity state" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
== APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
== APP ==       Order 49caa2d7 has completed!
time="2023-02-18T15:06:01.5835833+08:00" level=info msg="reminder \"new-event-d423d421\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.5896839+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||new-event-d423d421. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.5919168+08:00" level=info msg="reminder \"run-activity\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.597233+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.activity||49caa2d7#4||run-activity. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.6004245+08:00" level=info msg="49caa2d7: 'OrderProcessingWorkflow' completed with a COMPLETED status." app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
time="2023-02-18T15:06:01.6048974+08:00" level=info msg="reminder \"new-event-61f53bff\" was canceled by the actor" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
time="2023-02-18T15:06:01.6054844+08:00" level=info msg="Found reminder with key: dapr.internal.wfengine.workflow||49caa2d7||new-event-61f53bff. Deleting reminder" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.actor type=log ver=1.10.0
== APP == Workflow Status: Completed
time="2023-02-18T15:06:03.434236+08:00" level=info msg="work item stream closed" app_id=order-processor instance=geffzhang-laptop scope=dapr.runtime.wfengine type=log ver=1.10.0
Exited App successfully

terminated signal received: shutting down
Exited Dapr successfully

这里面发生的事情大概如下:

  1. 为工作流生成唯一的订单 ID(在上面的示例中为49caa2d7 ),并启动工作流。
  2. 工作流活动NotifyActivity发送一条通知,指出已收到 10 辆汽车的订单。
  3. 工作流活动ReserveInventoryActivity检查库存数据,确定您是否可以提供订购的物料,并使用库存中的汽车数量进行响应。
  4. 您的工作流将启动并通知您其状态。
  5. 工作流活动ProcessPaymentActivity开始处理订单49caa2d7 付款并确认是否成功。
  6. 处理订单后,工作流活动UpdateInventoryActivity使用当前可用汽车更新库存。
  7. 工作流活动NotifyActivity发送一条通知,指出订单 49caa2d7 已完成。
  8. 工作流在完成时终止。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK