38

gRPC 流式调用 | Beck's Blog

 4 years ago
source link: http://beckjin.com/2019/10/26/grpc-streaming/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

gRPC 使用 Protocol buffers 作为接口定义语言(IDL)来描述服务接口和输入输出消息的结构,目前支持 4 种定义服务方法类型:

类型 说明
简单 RPC 客户端传入一个请求对象,服务端返回一个结果对象
客户端流式 RPC 客户端传入多个请求对象,服务端返回一个结果对象
服务端流式 RPC 客户端传入一个请求对象,服务端返回多个结果对象
双向流式 RPC 客户端传入多个请求对象,服务端返回多个结果对象

RPC 定义

简单 RPC:一般这种方式使用较多,如下:定义 SayHello 方法,输入 HelloRequest,返回 HelloResponse

1
2
3
4
5
6
7
8
9
10
11
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

而流式 RPC 定义与 简单 RPC 的区别只是在请求或返回参数前增加了 stream 关键词,如下:

1
2
3
4
5
6
7
8
service HelloService {
// 客户端流式 RPC
rpc SayHello1 (stream HelloRequest) returns (HelloResponse);
// 服务端流式 RPC
rpc SayHello2 (HelloRequest) returns (stream HelloResponse);
// 双向流式 RPC
rpc SayHello3 (stream HelloRequest) returns (stream HelloResponse);
}

gRPC 能支持流式调用本质是因为 gRPC 通信是基于 HTTP/2 实现的,HTTP/2 具有流的概念,流是为了实现 HTTP/2 的多路复用。流是服务器和客户端在 HTTP/2 连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程。

在 gRPC 中接收消息大小限制参数 grpc.max_receive_message_length 默认是 4M ,如果大于该值,则会提示:Received message larger than max (xxxxxx vs. 4194304),当然我们可以修改默认值解决问题,但如果默认值支持过大对服务器资源也是一种消耗,这时候其实应该考虑使用流式调用,有效将数据进行分批处理,提高性能。

这里主要介绍一下双向流式 RPC(客户端和服务端流式 RPC 类似),完整代码请 前往这里查看 。双向流模拟功能是客户端流式输入文件路径,服务端针对每个文件每次最多读取 1M 的数据返回,客户端拿到数据后生成新文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package GrpcStream;

service StreamTest {
// 双向流程 RPC
rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}

message BidirectionalStreamRequest {
// 文件路径
string file_path = 1;
}
message BidirectionalStreamResponse {
// 文件路径
string file_path = 1;
// 数据
bytes data = 2;
}

这里是基于 .NET Core 3.0 使用 gRPC,可以通过 VS 预置的 gRPC 服务 模板来创建服务端,创建后将默认的 porto 文件替换成上面的内容。

服务端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public override async Task BidirectionalStream(IAsyncStreamReader<BidirectionalStreamRequest> requestStream, IServerStreamWriter<BidirectionalStreamResponse> responseStream, ServerCallContext context)
{
var i = 0;
// 监听客户端数据输入
while (await requestStream.MoveNext())
{
// 打印次数
Console.WriteLine(i++);
using var fs = File.Open(requestStream.Current.FilePath, FileMode.Open);
var leftSize = fs.Length;
// 1M
var buff = new byte[1048576];
while (leftSize > 0)
{
var len = await fs.ReadAsync(buff);
leftSize -= len;
Console.WriteLine($"response {requestStream.Current.FilePath} {len} bytes");
// 流式返回数据
await responseStream.WriteAsync(new BidirectionalStreamResponse
{
FilePath = requestStream.Current.FilePath,
Data = ByteString.CopyFrom(buff, 0, len)
});
}
}
}

客户端代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 测试文件,key 是已存在的文件,value 是需要生成的文件
static readonly Dictionary<string, string> fileDic = new Dictionary<string, string>()
{
{@"d:\dapr\daprd_windows_amd64.zip", @"d:\dapr\daprd_windows_amd64_new.zip" },
{@"d:\dapr\injector_windows_amd64.zip", @"d:\dapr\injector_windows_amd64_new.zip" },
};
static StreamTest.StreamTestClient client;

static async Task Main(string[] args)
{
// 连接 gRPC 服务
var channel = GrpcChannel.ForAddress("https://localhost:5001");
client = new StreamTest.StreamTestClient(channel);
await BidirectionalStreamTestAsync();
Console.ReadKey();
}

static async Task BidirectionalStreamTestAsync()
{
using var call = client.BidirectionalStream();
var responseTask = Task.Run(async () =>
{
// 接收返回值
var iterator = call.ResponseStream;
// 监听服务端数据返回
while (await iterator.MoveNext())
{
Console.WriteLine($"write to new file {fileDic[iterator.Current.FilePath]} {iterator.Current.Data.Length} bytes");
// 写入新文件
using var fs = new FileStream(fileDic[iterator.Current.FilePath], FileMode.Append);
iterator.Current.Data.WriteTo(fs);
}
});

var rand = new Random();
foreach (var item in fileDic)
{
// 流式输入
await call.RequestStream.WriteAsync(new BidirectionalStreamRequest
{
FilePath = item.Key
});
await Task.Delay(rand.Next(200));
}
await call.RequestStream.CompleteAsync();
await responseTask;
}

执行结果:

result

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK