10

gRPC 練習 - 簡單寫個聊天室

 2 years ago
source link: https://blog.darkthread.net/blog/grpc-chatroom-example/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

gRPC 練習 - 簡單寫個聊天室

2022-06-08 09:33 PM 0 545

隨時代演進,.NET 的 API 介面標準從 Remoting、Web Service、WCF 轉向 Web API,近幾年,主打 HTTP/2 (傳輸效率大勝 HTTP 1.1)、Protobuf 格式(輕量級二進位序列化,遠比 JSON 精簡有效率)、支援雙向傳輸、跨語言相容的 gRPC 異軍突起,成為效能優先 API 的開發首選,而從 .NET Core 3 開始,gRPC 還成為官方指定的 WCF 接班人。參考:為何我們為 WCF 開發人員建議 gRPC

gRPC 與 WebAPI 比較表 來源

Fig1_637902929601004999.png

抱著沒吃過豬肉也要看看豬走路的心情,我打算挑個題材練習,之前寫過 ASP.NET Core WebSocket 聊天室,這回就利用 gRPC 的雙向傳輸特性,寫個 gRPC 聊天室吧。

不得不說,微軟官方文件的 gRPC 教學寫得真好,跟著做完範例差不多算入門了。這裡跳過細節,整理重點:

  1. gRPC 分為伺服器端跟客戶端,伺服器有專屬的 grpc 專案型別(用dotnet new grpc建立專案),客戶端則用一般專案加入 NuGet Package: Grpc.Net.Client、Google.Protobuf、Grpc.Tools
  2. 在 Protos\svcName.proto 檔案定義 Protobuf 介面,.proto IDL 語法有點小複雜,但能支援集合、列舉、多重型別等應用,要熟悉得花點時間,微軟有份不錯的參考文件,讀完應可掌握十之八九
  3. 我定義的 chat.proto 長這樣,只有一個 Join 方法,參數跟回應都使用 Streaming 方式傳送,善用 gRPC 的雙向傳輸功能:
     syntax = "proto3";
    
     import "google/protobuf/timestamp.proto";
     option csharp_namespace = "grpc_chat";
    
     package chat;
    
     service Chatroom {
       rpc Join(stream SpeakRequest) returns (stream BroadcastMessage);
     }
    
     message SpeakRequest {
       string uid = 1;
       string name = 2;
       string message = 3;
     }
    
     message BroadcastMessage {
       string speaker = 1;
       google.protobuf.Timestamp time = 2;
       string message = 3;
     }
    
  4. 在 .csproj 加入以下設定 (GrpcServices 在伺服器端為 Server、客戶端為 Client)
    <ItemGroup>
      <Protobuf Include="Protos\filename.proto" GrpcServices="Client" />
    </ItemGroup>
    
    讓 Grpc.Tools 依據 .proto 產生對映的類別:
    Fig2_637902929602256169.png
  5. 伺服端實作 Services/SvcNameService.cs,繼承 SvcName.SvcNameBase (由步驟 3 自動產生) 並一一 override 方法介面實作邏輯
  6. 客戶端也要加入相同的 Protos\svcName.proto,先GrpcChannel.ForAddress(url)建立GrpcChannel,再new Chatroom.ChatroomClient(channel)生成客戶端物件,呼叫 Join() 方法取得一個雙向 Streaming 傳輸 AsyncDuplexStreamingCall 物件,透過 RequestStream 傳送 SpeakRequest、透過 ResponseStream 接收 BroadcastMessage,這部分稍稍複雜,官方文件有篇 Migrate WCF duplex services to gRPC 以訂閱股票報價為例,參考價值頗高。

這個簡單的聊天室分為伺服器端 grpc-chat-svc (Microsoft.NET.Sdk.Web/ASP.NET Core) 跟客戶端 grpc-chat-client (.NET 6 Console),專案結構很簡單:

Fig3_637902929602743065.png

兩支核心程式為 grpc-chat-svc/Services/ChatroomService.cs:

using Grpc.Core;
using grpc_chat_svc;
using grpc_chat;
using System.Collections.Concurrent;

namespace grpc_chat_svc.Services;

public class ChatroomService : Chatroom.ChatroomBase
{
    private readonly ILogger<ChatroomService> _logger;
    public ChatroomService(ILogger<ChatroomService> logger)
    {
        _logger = logger;
    }

    static ConcurrentDictionary<string, MsgChannel> channels =
        new ConcurrentDictionary<string, MsgChannel>();
    class MsgChannel
    {
        public string Name;
        public IServerStreamWriter<BroadcastMessage> Stream;
    }

    async Task Speak(string speaker, string message)
    {
        var msg = new BroadcastMessage
        {
            Speaker = speaker,
            Time = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(DateTime.UtcNow),
            Message = message
        };
        var lostUsers = new List<string>();
        foreach (var kv in channels.ToArray())
        {
            try
            {
                await kv.Value.Stream.WriteAsync(msg);
            }
            catch
            {
                lostUsers.Add(kv.Value.Name);
                channels.TryRemove(kv.Key, out _);
            }
        }
        if (lostUsers.Any())
        {
            await Speak("system", String.Join(", ", lostUsers.ToArray()) + " disconnected");
        }
    }

    public override async Task Join(IAsyncStreamReader<SpeakRequest> requestStream, 
            IServerStreamWriter<BroadcastMessage> responseStream, ServerCallContext context)
    {
        var clientIp = context.GetHttpContext().Connection.RemoteIpAddress!.ToString();
        //_logger.LogInformation($"{clientIp}/{context.Peer} connnected");
        string uid = string.Empty;
        string name = string.Empty;
        try
        {
            await foreach (var speakReq in requestStream.ReadAllAsync(context.CancellationToken))
            {
                uid = speakReq.Uid;
                name = speakReq.Name;
                var speaker = $"{name}@{clientIp}";
                var newMember = false;
                if (!channels.TryGetValue(uid, out var channel))
                {
                    _logger.LogInformation($"{uid}/{speaker}");
                    channel = new MsgChannel { Name = name, Stream = responseStream };
                    if (!channels.TryAdd(uid, channel))
                        throw new ApplicationException("Failed to join the chatroom");
                    newMember = true;
                }
                channel.Name = name;
                if (speakReq.Message == "/exit")
                    break;
                else if (newMember)
                    await Speak("system", $"{name} joined");
                else
                    await Speak(speaker, speakReq.Message);
            }
        }
        catch (Exception ex)
        {
            await Speak("system", $"{name} {ex.Message}");
        }
        //_logger.LogInformation($"{context.Peer} disconnected");
        await Speak($"system", $"{name} left");
        if (!string.IsNullOrEmpty(uid)) channels.TryRemove(uid, out _);
    }
}

與 grpc-chat-client/Program.cs 力求簡單,程式碼都壓在 100 行內。gRPC 程式庫走 async/await 風格,這部分我的經驗尚淺,處理得有點生硬,再請大家指點一二。

using System.Net.Sockets;
using Grpc.Core;
using Grpc.Net.Client;
using grpc_chat;

if (args.Length < 2)
{
    Console.WriteLine("Syntax: grpc-chat-client https://localhost:7042 userName");
    return;
}
var url = args[0];
var name = args[1];
Console.Clear();
using var channel = GrpcChannel.ForAddress(url);
var client = new Chatroom.ChatroomClient(channel);
var duplex = client.Join();
var uid = Guid.NewGuid().ToString();
var spkMsg = new SpeakRequest
{
    Uid = uid,
    Name = name,
    Message = "/join"
};
await duplex.RequestStream.WriteAsync(spkMsg);
int x = 0, y = 2;
void Print(string msg, ConsoleColor color = ConsoleColor.White) {
    Console.ForegroundColor = color;
    int origX = Console.CursorLeft, origY = Console.CursorTop;
    Console.SetCursorPosition(x, y);
    Console.WriteLine(msg);
    x = Console.CursorLeft;
    y = Console.CursorTop;
    Console.SetCursorPosition(origX, origY);
    Console.ResetColor();
};
var rcvTask = Task.Run(async () =>
{
    try
    {
        await foreach (var resp in duplex.ResponseStream.ReadAllAsync(CancellationToken.None))
        {
            Print($"{resp.Time.ToDateTime().ToLocalTime():HH:mm:ss} [{resp.Speaker}] {resp.Message}", 
                resp.Speaker == "system" ? ConsoleColor.Yellow : ConsoleColor.Cyan);
        }
    }
    catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
    {
        Print($"Connection broken", ConsoleColor.Magenta);
        Console.Clear();
        Environment.Exit(254);
    }
    catch (RpcException ex) {
        Print($"Error {ex.InnerException?.Message}", ConsoleColor.Magenta);
        Console.Clear();
        Environment.Exit(255);
    }
});


while (true)
{
    Console.SetCursorPosition(0, 0);
    Console.WriteLine(new String(' ', Console.WindowWidth) + new string('-', Console.WindowWidth));
    Console.SetCursorPosition(0, 0);
    var msg = Console.ReadLine();
    try
    {
        spkMsg.Message = msg;
        await duplex.RequestStream.WriteAsync(spkMsg);
        if (msg == "/exit") break;
    }
    catch (RpcException)
    {
        break;
    }
}
try { await duplex.RequestStream.CompleteAsync(); } catch { }
rcvTask.Wait();
Console.Clear();

最後來看執行效果:

Fig4_637902929604491807.gif

還蠻有模有樣的吧!

老規矩,範例專案已上傳到 Github,有興趣的同學可 Clone 回去玩看看。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK