5

gRPC开发过程中遇到的问题记录

 2 years ago
source link: https://zhang.ge/5168.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
Jager · 8月17日 · 2022年python 168次已读

最近我在改造手头负责的一个 HTTP Restful API 服务,集成对 gRPC 协议(在鹅厂叫 tRPC,是 gRPC 的本土衍生版本)的支持。由于是从 0 学习这个协议,因此也踩了不少坑,这里用一篇文章来记录下,希望可以帮助到同样从 0 学习 gRPC 的朋友。

列表参数赋值

老的 Rest 接口中,大量用到了列表参数,看了下 pb 协议,给这个参数定义如下:

message Foo {
repeated string Bar = 1;

在 Python RPC 客户端调用测试时,按照经验,传参如下:

req = pb.Foo()
req.Bar = ["a", "b"]

结果赋值就报错了:

AttributeError: Assignment not allowed to repeated field "Bar" in protocol message object.

经过一翻搜索,最终发现 Python 客户端应该如下赋值列表参数:

req = pb.Foo()
bar_list = ["a", "b"]
req.Bar.extend(bar_list)

这样就可以正常赋值了,但是总觉得这个设计太绕了,直接赋值不行么?总体来说,gRPC 还是偏编译型语言的设计的,在 Python 这种解释型语言里面则有点格格不入。

定义二维数组

无独有偶,老接口里面除了上面的单一列表参数之外,还有二维数组作为参数的接口,比如在生成表格图片的时候,pandas 数据里面就有大量二维数组数据,比如:

[["a", "b"], ["c", "d"]]

可以看到这个值并没有键名,因此我第一时间想用 pb 的嵌套参数来赋值,发现玩不转,比如:

message subArg {
repeated string arg1 = 1;
repeated string arg2 = 2;
message Arg {
subArg arg = 1;

解析时,会将 arg1、arg2 作为键名传递过去:

"arg1": ["a", "b"],
"arg2": ["c", "d"]

准确来说 Python 中是没有数组类型的,只有列表(list)和元组(tuple), 不过数组可以用 numpy 库来定义,所以这个场景其实也可以用 numpy 来搞定,不过经过我多番搜索,最终找到一个更简单应对这种复杂参数的解决办法:google.protobuf.Value

import "google/protobuf/struct.proto";
message Foo {
repeated google.protobuf.Value array = 1;

这样定义就解决了,google.protobuf.Value 是个好东西,他包含并兼容以下属性:

"boolValue": false,
"listValue": {},
"nullValue": 0,
"numberValue": 0,
"stringValue": "",
"structValue": {}

也就是说,对于复杂参数,只要属性是 bool、列表、数值型、字符串、结构体他都能支持。

定义空字典

在老的接口中,为了更加灵活,我设计了一些拓展参数,类似于 kwargs,你不传他就是空字典,你需要定义额外的参数就通过传入 {"foo": "bar"} 这类值来实现额外参数支持。

结果在 gRPC 场景中就寄了,pb 要定义字典,就必须用如下嵌套的方式:

message SubFoo {
string arg1 = 1;
string arg2 = 2;
message Foo {
SubFoo arg = 1;

这样才能实现 arg = {"arg1": "xxx", "arg2": "yyy"} 的赋值,但是要定义成空字典这样的拓展参数有没有办法呢?

经过一番研究折腾,发现还是有答案的!其实还是用 google.protobuf.Value 来解决!因此,只要遇到非固定值的参数,基本都可以用 google.protobuf.Value 来满足!

在我进入这个项目的单元测试编写阶段时,我发现在 Python 客户端里面来定义老接口的 pb 参数实在是太太太太复杂了,因为老的 RestAPI 就是一个 JSON,有时候多层嵌套,要多复杂有多复杂,这个在 PB 里面定义简直要了老命,明明就是一个 JSON 可以解决的问题,PB 里面得绕几次才能完成赋值,极大的阻碍了我写单元测试的速度!

于是,我想了一个办法,我直接复制老 API 接口的单元测试里面的 JSON 参数,然后通过同时遍历 pb 协议对象里面的参数字典以及 JSON,来实现自动赋值,最开始我写了一个自动转换的函数来做这个逻辑,结果发现有太多要考虑的地方,健壮性不太行。

最终,经过研究折腾,发现官方本身就提供了一个JSON 和 PB 互转的方法,非常好用!

Contains routines for printing protocol messages in JSON format.
Simple usage example:
# Create a proto object and serialize it to a json format string. message = my_proto_pb2.MyMessage(foo=’bar’) json_string = json_format.MessageToJson(message)
# Parse a json format string to proto object. message = json_format.Parse(json_string, my_proto_pb2.MyMessage())
exception google.protobuf.json_format.Error
Top-level module error for json_format.
with_traceback()
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

这里简单贴一个例子,方便上手:

先定义一个 pb 协议

import "google/protobuf/struct.proto";
message MyRequest {
string a = 1;
repeated string b = 2;
repeated google.protobuf.Value c = 3;

此时客户端赋值通过 Parse 可以快速将 JSON 参数转换为 pb 参数:

import json
from google.protobuf.json_format import Parse, MessageToJson
# 其他代码略..
proxy = rpc.MyClientProxyImpl()
# 定义请求参数
payload = {
"a": "a",
"b": ["a","b"],
"c": [["a", "b"], ["c", "d"]]
payload = json.dumps(payload)
# 直接用 Parse 结合 JSON 和 pb 协议对象就能完成赋值,ignore_unknown_fields 表示忽略未知字段,即遇到 pb 未定义的参数直接跳过而不是报错
req = Parse(payload, pb.MyRequest(), ignore_unknown_fields=True)
# 其他代码略...
ret = proxy.MyFunc(ctx, req, options)
# 用 MessageToJson 可以将 RPC 的返回结果转成 JSON,不过这里需要先序列化再反序列化一次才能正常展示:
print(f"请求参数:{payload}, 响应内容:{json.dumps(json.loads(MessageToJson(ret)))}")

所以,通过自带的 Parse, MessageToJson 方法,我们可以快速将 JSON 转成 pb 协议,也可以将 RPC 结果快速转成 JSON,这个在 Python 这种解释型语言中非常实用!

新服务开发并测试完毕,准备部署到我们的 K8S 集群,发现了一个问题:之前直接用 K8S 的 nodeport 就能快速访问基于 overlay 网络部署的老的 RestAPI 服务,但是用 RPC 之后,其实会将应用的 IP 和端口注册到服务发现中心,如果继续使用 overlay 模式的话,注册到服务发现中心的 IP 就是 K8S 的 overlay 私有 IP,那其他不在同一个 K8S 里面的服务是无法访问到这个私有 IP+端口的。

这里解决办法有两个:

  • 将 overlay 模式改成 floatingIP,即容器会分配一个 underlay IP,直接将整个网平打平,这样整个内网都能访问到这个服务了;
  • 服务改成 Host 网络模式启动,直接绑定 K8S 计算节点网络就能互访。

两个方案都有缺点,第一个是会造成 IP 地址浪费,第二个则会存在端口冲突问题。

最终,我选择了第二个方案,改成 Host 网络模式+随机端口的方式启动,就能完美解决端口冲突问题了。

目前我们的 RPC 框架对随机端口注册到服务发现中心并不完善,因此我这边写了一个 shell 脚本来解决了这个问题:原理是在启动 RPC 服务端之前,脚本先随机一个本地可以用的端口,然后修改 RPC 启动端口并启动。脚本内容如下:

#!/bin/bash
# ******************************************************
# Author : Jager
# Last modified : 2020-11-18 10:00
# Filename : get_random_port.sh
# Description : 获取系统可用的随机端口
# ******************************************************
# 在指定范围内随机数字
# get_range_number STARTNUM ENDNUM
get_range_number()
min=$1
max=$(($2-$min+1))
num=$(cat /dev/urandom | head -n 10 | cksum | awk -F ' ' '{print $1}')
echo $(($num%$max+$min))
# 检查端口是否被系统占用(包括随机端口)
# check_port <PORT>
# 没有被占用: return 0
# 被占用或者参数为空: return 1
check_port()
if [[ -z $1 ]];then
return 1
if ! awk -F '[: ]+' '{print strtonum("0x"$4)}' /proc/net/tcp | grep -wq $1;then
return 0
return 1
# 获取可用随机端口
# get_random_port STARTPORT ENDPORT RETRY_TIMES
get_random_port()
round=0
start_port=${1:-10001}
end_port=${2:-19999}
retry_times=${3:-10000}
while ! check_port ${rand_port};do
export rand_port=$(get_range_number ${start_port} ${end_port})
let round+=1
if [[ ${round} -ge ${retry_times} ]];then
echo "${retry_times} retries, no ports available, export port 0"
export rand_port=0
# 设置起始端口范围和最大重试次数
START_PORT=10001
END_PORT=19999
RETRY_TIMES=10000
# 生成可用的随机端口
get_random_port ${START_PORT} ${END_PORT} ${RETRY_TIMES}
echo 启动的随机端口为:${rand_port}
# 修改配置文件
export config_file=conf/${CONFIG_FILE:-trpc_python.yaml}
if [[ -f $config_file ]];then
sed -i "s/port: .*/port: $rand_port/g" $config_file
echo "指定的配置文件:$config_file 不存在,请检查!"
exit 1
# 启动服务
exec python3 trpc_main.py -c ${config_file}

终于在解决上述问题之后,我手头的这个服务目前也并行支持了 RPC 协议,让服务形态更加丰富。折腾期间大大小小其实踩了不少坑,本文主要记录了一些比较典型、通用的问题,后续如果还有相关经验会继续补充进来。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK