1

Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式 - 小庄

 1 year ago
source link: https://www.cnblogs.com/xiaozhuang/p/16832868.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

大家好,我是失业在家,正在找工作的博主Jerry,找工作之余,总结和整理以前的项目经验,动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构。

OnionArch 是用来实现单个微服务的。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例,没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例。

一、实现目标

为了方便大家写代码,本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式。

如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单,只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了。

为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件。

        [Topic("pubsub", "TestTopic")]
        public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
        {
            string message = "TestTopicEvent" + request.EventData.Name;
            Console.WriteLine(message);
            return Task.FromResult(new HelloReply
            {
                Message = message
            });
        }

二、实现方案

Dapr实现.Net Grpc服务之间的发布和订阅,根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑。显然不符合我们的实现目标。

参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:<appPort>/dapr/subscribe”的实现代码,可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了。

为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法。理论上,只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?

我这里采用了另一种方式,根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档。Grpc方法可以增加一个转码为Json的WebApi调用。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法。是不是有点脱裤子放屁的感觉?

三、代码实现

我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理。

590-20221027165201080-877032728.png

实现事件发布

GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的。

 public async override Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
        {
            //await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);
            EventData eventData = new EventData() { Id = 6, Name = request.Name, Description = "Looking for a job" };
            await _daprClient.PublishEventAsync<EventData>("pubsub", "TestTopic", eventData);
            return new HelloReply
            {
                Message = "Hello" + request.Name
            };
        }
_daprClient怎么来的?我参考Dapr .Net SDK的代码,给IGrpcServerBuilder 增加了扩展方法:

ContractedBlock.gifExpandedBlockStart.gif

View Code

然后就可以这样把DaprClient依赖注入到服务中。

builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();

实现事件订阅

根据上述实现方案,GrpcServiceB接收事件并处理有点复杂,参考我Grpc接口转码Json的的内容,在要接收事件的Grpc方法上增加转码Json WebApi 配置。

 rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){
    option (google.api.http) = {
      post: "/v1/greeter/testtopicevent",
      body: "eventData"
    };
  }

增加google.api.http选项,,可以通过post eventData 数据到地址“/v1/greeter/testtopicevent”调用该Grpc方法。然后实现该Grpc接口。

 [Topic("pubsub", "TestTopic")]
        public override Task<HelloReply> TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
        {
            string message = "TestTopicEvent" + request.EventData.Name;
            Console.WriteLine(message);
            return Task.FromResult(new HelloReply
            {
                Message = message
            });
        }

我重用了Dapr .Net SDK 的Topic Attribute来标记该Grpc的实现接口,这样就可以搜索所有带Topic Attribute的Grpc方法来获取已经订阅的事件。

接下来才是重头戏,重写AppCallback.AppCallbackBase Grpc接口类的ListTopicSubscriptions方法和OnTopicEvent方法

 public async override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)
        {
            var result = new ListTopicSubscriptionsResponse();

            var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory);
            foreach (var subscription in subcriptions)
            {
                TopicSubscription subscr = new TopicSubscription()
                {
                    PubsubName = subscription.PubsubName,
                    Topic = subscription.Topic,
                    Routes = new TopicRoutes()
                };
                subscr.Routes.Default = subscription.Route;
                result.Subscriptions.Add(subscr);
            }
            return result;
        }

该方法返回所有已订阅的事件和对应的WebApi Url,将事件对应的WebApi地址放入subscr.Routes.Default中。

其中_endpointDataSource.GetDaprSubscriptions 方法参考了Dapr .Net SDK的实现。

public static List<Subscription> GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null)
        {
            var logger = loggerFactory.CreateLogger("DaprTopicSubscription");
            var subscriptions = dataSource.Endpoints
                .OfType<RouteEndpoint>()
                .Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name.
                .SelectMany(e =>
                {
                    var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();
                    var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();

                    var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();

                    for (int i = 0; i < topicMetadata.Count(); i++)
                    {
                        subs.Add((topicMetadata[i].PubsubName,
                            topicMetadata[i].Name,
                            (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
                            (topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
                            topicMetadata[i].Match,
                            topicMetadata[i].Priority,
                            originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id))
                                                 .GroupBy(c => c.Name)
                                                 .ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
                            (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
                            e.RoutePattern));
                    }

                    return subs;
                })
                .Distinct()
                .GroupBy(e => new { e.PubsubName, e.Name })
                .Select(e => e.OrderBy(e => e.Priority))
                .Select(e =>
                {
                    var first = e.First();
                    var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault());
                    var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ?? ",";
                    var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList();
                    var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList();
                    //var defaultRoute = defaultRoutes.FirstOrDefault();
                    var defaultRoute = defaultRoutes.LastOrDefault();

                    //multiple identical names. use comma separation.
                    var metadata = new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct())));
                    if (rawPayload || options?.EnableRawPayload is true)
                    {
                        metadata.Add(Metadata.RawPayload, "true");
                    }

                    if (logger != null)
                    {
                        if (defaultRoutes.Count > 1)
                        {
                            logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);
                        }

                        var duplicatePriorities = rules.GroupBy(e => e.Priority)
                          .Where(g => g.Count() > 1)
                          .ToDictionary(x => x.Key, y => y.Count());

                        foreach (var entry in duplicatePriorities)
                        {
                            logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value);
                        }
                    }

                    var subscription = new Subscription()
                    {
                        Topic = first.Name,
                        PubsubName = first.PubsubName,
                        Metadata = metadata.Count > 0 ? metadata : null,
                    };

                    if (first.DeadLetterTopic != null)
                    {
                        subscription.DeadLetterTopic = first.DeadLetterTopic;
                    }

                    // Use the V2 routing rules structure
                    if (rules.Count > 0)
                    {
                        subscription.Routes = new Routes
                        {
                            Rules = rules.Select(e => new Rule
                            {
                                Match = e.Match,
                                Path = RoutePatternToString(e.RoutePattern),
                            }).ToList(),
                            Default = defaultRoute,
                        };
                    }
                    // Use the V1 structure for backward compatibility.
                    else
                    {
                        subscription.Route = defaultRoute;
                    }

                    return subscription;
                })
                .OrderBy(e => (e.PubsubName, e.Topic));

            return subscriptions.ToList();
        }

        private static string RoutePatternToString(RoutePattern routePattern)
        {
            return string.Join("/", routePattern.PathSegments
                                    .Select(segment => string.Concat(segment.Parts.Cast<RoutePatternLiteralPart>()
                                    .Select(part => part.Content))));
        }

注意标红的哪一行是我唯一改动的地方,因为Grpc接口增加了Web Api配置后会返回两个Route,一个是原始Grpc的,一个是WebApi的,我们需要后面那个。

接着重写OnTopicEvent方法

public async override Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
        {
            TopicEventResponse topicResponse = new TopicEventResponse();
            string payloadString = request.Data.ToStringUtf8();
            Console.WriteLine("OnTopicEvent Data:" + payloadString);
            
            HttpContent postContent = new StringContent(payloadString, new MediaTypeWithQualityHeaderValue("application/json"));
            var response = await _httpClient4TopicEvent.PostAsync("http://" + context.Host + "/" + request.Path, postContent);
            string responseContent = await response.Content.ReadAsStringAsync();
            Console.WriteLine(responseContent);
            if (response.IsSuccessStatusCode)
            {
                Console.WriteLine("OnTopicEvent Invoke Success.");
                topicResponse.Status = TopicEventResponseStatus.Success;
            }
            else
            {
                Console.WriteLine("OnTopicEvent Invoke Error.");
                topicResponse.Status = TopicEventResponseStatus.Drop;
            }
            return topicResponse;
        }

这里简单处理了事件触发的返回参数TopicEventResponse ,未处理重试的情况。request.path是在ListTopicSubscriptions方法中返回给Dapr的事件对应的WebApi调用地址。

参数_httpClient4TopicEvent是这样注入的:

builder.Services.AddHttpClient("HttpClient4TopicEvent", httpClient =>
{
    httpClient.DefaultRequestVersion = HttpVersion.Version20;
    httpClient.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrHigher;
    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
}
);

因为Grpc是基于Http2.0及以上版本的,所以需要修改HtttpClient默认配置,不然无法访基于Http2.0的WebApi。

然后将我们的AppCallback.AppCallbackBase实现类DaprAppCallbackService Map到GrpcService即可。

app.MapGrpcService<DaprAppCallbackService>();

四、实现效果

分别通过Dapr运行ServiceA和ServiceB微服务,注意指定--app-protocol协议为Grpc,我这里还使用了.Net 热重载技术。

dapr run --app-protocol grpc --app-id serviceA --app-port 5002 --dapr-grpc-port 50002 -- dotnet watch run --launch-profile https

dapr run --app-protocol grpc --app-id serviceB --app-port 5003 --dapr-grpc-port 50003 -- dotnet watch run --launch-profile https

在ServiceA中发布事件

590-20221028115451012-1928947598.png

在ServiceB中查看已订阅的事件和接收到的事件触发

590-20221028115522731-1873844616.png

五、找工作

▪ 博主有15年以上的软件技术实施经验(Technical Leader),专注于微服务(Dapr)和云原生(K8s)软件架构设计、专注于 .Net Core\Java开发和Devops构建发布。
▪ 博主10年以上的软件交付管理经验(Project Manager & Product Ower),致力于敏捷(Scrum)项目管理、软件产品业务需求分析和原型设计。
▪ 博主熟练配置和使用 Microsoft Azure云。
▪ 博主为人诚恳,积极乐观,工作认真负责。 

我家在广州,也可以去深圳工作。做架构师、产品经理、项目经理都可以。有工作机会推荐的朋友可以加我微信 15920128707,微信名字叫Jerry。

本文源代码在这里:iamxiaozhuang/TestDaprGrpcSubscripber (github.com) 大家可以随便取用。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK