4

【Go】Golang实现gRPC的Proxy的原理

 2 years ago
source link: https://www.cnblogs.com/voipman/p/15352001.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

gRPC是Google开始的一个RPC服务框架, 是英文全名为Google Remote Procedure Call的简称。

广泛的应用在有RPC场景的业务系统中,一些架构中将gRPC请求都经过一个gRPC服务代理节点或网关,进行服务的权限限制,限流,服务调用监控,增加请求统计等等诸多功能。

如下以Golang和gRPC为例,简要分析gRPC的转发原理。

gRPC Proxy原理

基本原理如下

  • 基于TCP启动一个gRPC代理服务端
  • 拦截gRPC框架的服务,能将gRPC请求的服务拦截到转发代理的一个函数中执行。
  • 接收客户端的请求,处理业务指标后转发给服务端。
  • 接收服务端的响应,处理业务指标后转发给客户端。

基于如上原理描述,如下图所示,gRPC的客户端将所有的请求都发给gRPC Server Proxy,这个代理网关实现请求转发。

将gRPC Client的请求流转发到gRPC 服务实现的节点上。并将服务处理结果响应返回给客户端。

 在这个图中的转发需要回答如下几个问题

  • Proxy怎么知道哪些请求转发到哪些服务节点上,转发的依据是什么?
  • Proxy是否需要解析gRPC协议?
  • Proxy上没有服务的实现,该如何转发?

简化的gRPC服务处理流程

在回答如下问题之前,我们先简单的分析一下gRPC服务器的实现原理和流程。

  • 编写自己的服务实现,例子中以HelloWorld为例。
  • 把自己的服务实现HelloWorldServer注册到gRPC框架中
  • 创建一个TCP的服务端监听
  • 基于TCP监听启动一个gRPC服务
  • gRPC服务接收gRPC客户端的TCP请求
  • 解析gRPC的头部信息,找出服务名
  • 根据服务名找到第一步注册的服务和方法实现处理器handler
  • 处理函数执行
  • 返回处理结果

简化的注册服务处理器函数,启动gRPC服务,调用请求和执行数据流图如下所示:

详细的gRPC服务运行原理

第一步,定义和编写HelloWorld的IDL文件

syntax = "proto3";

package demoapi;


// HelloWorld Service
service HelloWorldService {
   rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){};
}

// Request message
message HelloWorldRequest {
   string  request = 1;
}

// Response message
message HelloWorldResponse {
   string respose = 1;
}

在这个简单的IDL中,定义了一个HelloWorldService的gRPC服务Service,这个服务中有一个HelloWorld方法Method。

第二步,编译IDL文件

将IDL的proto文件编译成helloworld.pb.go的gRPC代码文件。

生成的代码文件中,我们可以看到如下信息

// Hello World的客户端接口
type HelloWorldServiceClient interface {
    HelloWorld(ctx context.Context, in *HelloWorldRequest, opts ...grpc.CallOption) (*HelloWorldResponse, error)
}

// Hello World的服务端接口
type HelloWorldServiceServer interface {
    HelloWorld(context.Context, *HelloWorldRequest) (*HelloWorldResponse, error)
}

// HelloWorld的服务注册处理器函数Handler
func _HelloWorldService_HelloWorld_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloWorldRequest)
    if err := dec(in); err != nil {
        return nil, err
    }
    if interceptor == nil {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
        Server:     srv,
        FullMethod: "/demoapi.HelloWorldService/HelloWorld",
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        return srv.(HelloWorldServiceServer).HelloWorld(ctx, req.(*HelloWorldRequest))
    }
    return interceptor(ctx, in, info, handler)
}

// gRPC服务注册的服务描述信息
// gRPC服务注册时,会建立以ServiceName为Key,Methods为Value的一个Map映射
// Methods中的Handler就是如上的服务处理Handler
var _HelloWorldService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "demoapi.HelloWorldService",
    HandlerType: (*HelloWorldServiceServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "HelloWorld",
            Handler:    _HelloWorldService_HelloWorld_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "demoapi/HelloWorld.proto",
}

如上代码中有如下几个关键信息需要解释

  • 服务Service名称 demoapi.HelloWorldService,对应IDL文件的package包名.service服务名称
  • 方法Method名称 HelloWorld,对应IDL文件的rpc方法

第三步,注册HelloWorld服务到gRPC的服务映射中

  • grpc.ServiceDesc是 gRPC服务注册的服务描述信息。
  • gRPC服务注册时,会建立以ServiceName为Key,包装Methods为Value的一个Map映射m。
  • Methods中的Handler就是如上的服务处理Handler。

对应的注册代码如下

// 注册gRPC服务
func RegisterHelloWorldServiceServer(s *grpc.Server, srv HelloWorldServiceServer) {
    s.RegisterService(&_HelloWorldService_serviceDesc, srv)
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
       // ...
    m      map[string]*service // service name -> service info
}

// gRPC service.go的服务注册
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
    for i := range sd.Streams {
        d := &sd.Streams[i]
        srv.sd[d.StreamName] = d
    }
    s.m[sd.ServiceName] = srv
}

第四步,接收客户端gRPC请求并处理

在这一步中,会进行如下几个步骤和函数的调用,也会回答前面的第一个问题。

  • gRPC客户端通过TCP链接,连接到gRPC服务端
  • gRPC的Serve函数触发TCP的Accept函数调用,生成一个和客户端的网络连接
  • grpc框架代码执行handleRawConn方法,将这个网络连接设置打破gRPC的传输层,做为网络的读和写实现
  • 依次调用grpc流的handlerStream方法,用于处理gRPC数据流
  • 这个函数中会接收gRPC请求的头信息,并解析得到服务名 如第二步中的服务名 demoapi.HelloWorldService
  • 通过如下的服务名中的方法名HelloWorld,并在Method的map中找到这个方法的处理器函数Handler,并执行这个Handler函数,实现gRPC服务的调用
  • 最后将处理结果返回

 整体的数据流整理如下: 

我们发现在gRPC框架代码中的handleStream存在两类服务,一类是已知服务 knownService, 第二类是unknownService

这两个有什么区别呢?

已知服务 knownService就是gRPC服务端代码注册到gRPC框架中的服务,叫做已知服务,其他没有注册的服务叫做未知服务。

为什么我们要提到这个未知服务unknownService呢?着就是我们实现gRPC服务代码的关键所在,是前面问题三的答案,

要实现gRPC服务代理,我们在创建grpc服务grpc.NewServer时,传递一个未知服务的handler,将未知服务的处理进行接管,然后通过注册的这个Handler实现gRPC代理转发的逻辑。

基于如下描述,gRPC代理的原理如下图所示:

  • 创建grpc服务时,注册一个未知服务处理器Handler和一个自定义的编码Codec编码和解码,此处使用proto标准的Codec(回答前面第二个问题)
  • 这个handle给业务方预留一个director的接口,用于代理重定向转发的grpc连接获取,这样proxy就可以通过redirector得到gRPCServer的grpc连接。
  • proxy接收gRPC客户端的连接,并使用gRPC的RecvMsg方法,接收客户端的消息请求
  • proxy将接收到的gRPC客户端消息请求,通过SendHeader和SendMsg方法发送给gRPC服务端。
  • 同样的方法,RecvMsg接收gRPC服务端的响应消息,使用SendMsg发送给gRPC客户端。
  • 至此gRPC代码服务就完成了消息的转发功能,企业的限流,权限等功能可以通过转发的功能进行拦截处理。

gRPC Proxy的实现逻辑如下图所示:

 gRPC 代理服务的关键代码如下所示:

服务端到客户端的转发

// 转发服务端的数据流到客户端
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

客户端到服务端的转发

// 转发客户端的数据流到服务端
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
    ret := make(chan error, 1)
    go func() {
        f := &frame{}
        for i := 0; ; i++ {
            if err := src.RecvMsg(f); err != nil {
                ret <- err // this can be io.EOF which is happy case
                break
            }
            if i == 0 {
                // This is a bit of a hack, but client to server headers are only readable after first client msg is
                // received but must be written to server stream before the first msg is flushed.
                // This is the only place to do it nicely.
                md, err := src.Header()
                if err != nil {
                    ret <- err
                    break
                }
                if err := dst.SendHeader(md); err != nil {
                    ret <- err
                    break
                }
            }
            if err := dst.SendMsg(f); err != nil {
                ret <- err
                break
            }
        }
    }()
    return ret
}

https://github.com/grpc/grpc

https://github.com/mwitkow/grpc-proxy

done。

祝玩的开心~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK