30

gRPC-go服务发现&负载均衡

 4 years ago
source link: https://studygolang.com/articles/29664
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言

以下示例基于 https://github.com/grpc/grpc-go v1.30.0,关于proto文件定义,服务生成参考 gRPC 官方文档中文版

client

grpc使用的是客户端负载均衡模式,每次新建连接的时候会根据负载均衡算法选出服务端的IP然后建立连接。现在grpc默认支持两种算法pick_first(第一次地址) 和 round_robin(轮询)

pick_first: pick_first每次都是尝试连接第一个地址,如果连接失败就会尝试下一个,直到连接成功为止,之后的RPC请求都会使用这个连接

round_robin: round_robin会对每个地址建立连接,之后的RPC请求会依次通过这些连接发送到后端

客户端新建一个连接

conn, err := grpc.Dial(
        fmt.Sprintf("%s:///%s", "game", baseService),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        //grpc.WithUnaryInterceptor(unaryClientInterceptor),
        //grpc.WithBlock(),
               //grpc.WithCompressor  Deprecated
    )

客户端每次发起请求都需要通过grpc.dail创建一个ClientConn,然后通过ClientConn.XXXX发送请求。

建立连接的各项参数:

grpc.WithInsecure :禁用传输认证,没有这个选项必须设置一种认证方式

grpc.WithCompressor: 在grpc.Dial参数中设置压缩的方式将要被废弃,推荐使用UseCompressor

grpc.UseCompressor(gzip.Name)
        conn, err := grpc.Dial(
              //...
        )

PS:压缩方式客户端应该和服务端对应

grpc.WithBlock(): grpc.Dial默认建立连接是异步的,加了这个参数后会等待所有连接建立成功后再返回

grpc.WithUnaryInterceptor: 一元拦截器,适用于普通rpc连接,相应的还有流拦截器。拦截器只有第一个生效,所以一般设置一个。拦截器是对请求的一次封装,客户端和服务端都可以设置拦截器,请求的发送/执行都是在拦截器内操作的,所以在请求的前后都可以嵌入用户自定义的代码,类似hook

//客户端拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    var credsConfigured bool
    for _, o := range opts {
        _, ok := o.(grpc.PerRPCCredsCallOption)
        if ok {
            credsConfigured = true
            break
        }
    }
    if !credsConfigured {
        opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
            AccessToken: fallbackToken,
        })))
    }
    start := time.Now()
    err := invoker(ctx, method, req, reply, cc, opts...)
    end := time.Now()
    logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err)
    return err
}

//服务端拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // authentication (token verification)
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, errMissingMetadata
    }
    if !valid(md["authorization"]) {
        return nil, errInvalidToken
    }
    m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }
    return m, err
}

grpc.WithDefaultServiceConfig: 旧的版本可以通过grpc.RoundRobin(),和grpc.WithBalancer()来设置负载均衡,这个版本grpc.RoundRobin()已经取消了,grpc.WithBalancer()和grpc. 也WithBalancerName()标记为废弃。

//service config example
{
  "loadBalancingConfig": [ { "round_robin": {} } ],
  "methodConfig": [
    {
      "name": [
        { "service": "foo", "method": "bar" },
        { "service": "baz" }
      ],
      "timeout": "1.0000000001s"
    }
  ]
}
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name))

可以这样设置BalancingPolicy

target: grpc.Dial: 的第一个参数,这个参数的主要作用的通过它来找到对应的服务端地址,target传入是一个字符串,统一格式为 scheme://authority/endpoint ,然后通过以下方式解析为Target struct

type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
}

func parseTarget(target string) (ret resolver.Target) {
    var ok bool
    ret.Scheme, ret.Endpoint, ok = split2(target, "://")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    return ret
}

解析target的时候有以下几种情况:

  • 当前参数有没有直接设置resolverBuilder,如果设置了,直接设置Endpoint=target
  • 如果未直接设置resolverBuilder,则通过Scheme来找到resolverBuilder
  • 如果通过Scheme没有找到resolverBuilder,resolverBuilder为默认的dns builder,设置
    Endpoint=target

所以,真正获取IP地址是通过resolverBuilder这个接口

type Builder interface {
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    Scheme() string
}

Build(): 为给定目标创建一个新的resolver,当调用grpc.Dial()时执行。

Scheme(): 返回此resolver方案的名称

type Resolver interface {
    ResolveNow(ResolveNowOptions)
    Close()
}

ResolveNow(): 被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。

Close方法: 关闭resolver

下面我们看一个示例

func init() {
    resolver.Register(&exampleResolverBuilder{})  
/*
//注册的时候将Scheme => builder保存到m
func Register(b Builder) {
    m[b.Scheme()] = b
}
*/
}

const (
    exampleScheme      = "example"
    exampleServiceName = "lb.example.grpc.io"
)

var addrs = []string{"localhost:50051", "localhost:50052"}

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    r := &exampleResolver{
        target: target,
        cc:     cc,
        addrsStore: map[string][]string{
            exampleServiceName: addrs,
        },
    }
    r.start()
    return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

func (r *exampleResolver) start() {
    addrStrs := r.addrsStore[r.target.Endpoint]
    addrs := make([]resolver.Address, len(addrStrs))
    for i, s := range addrStrs {
        addrs[i] = resolver.Address{Addr: s}
    }
    r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

func main() {
//...
roundrobinConn, err := grpc.Dial(
        // Target{Scheme:exampleScheme,Endpoint:exampleServiceName}
        fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
        grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
//...
}

grpc.Dial() 会调用Scheme=>builder 的Build() 方法,之后调用r.start()

r.cc.UpdateState(resolver.State{Addresses: addrs})

UpdateState()将addr更新到cc,也就是外部的连接中,供其他接口使用。

server

server相对来说启动比较简单,一般都会加拦截器来获取matedata或者去recover() panic,又或者打印一些日志

grpc.UseCompressor(gzip.Name)
        s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor))
//...

matedata: matedata是一个map[string][]string的结构,用来在客户端和服务器之间传输数据。其中的一个作用是可以传递分布式调用环境中的链路id,方便跟踪调试。另外也可以传一些业务相关的数据

客户端拦截器中设置metedata

md := metadata.Pairs("XXX_id",xxxID, "YYY_id", yyyID)
        mdOld, _ := metadata.FromIncomingContext(ctx)
        md = metadata.Join(mdOld, md)
        ctx = metadata.NewOutgoingContext(ctx, md)
          //...
       invoker(ctx, method, req, reply, cc, opts...)

服务端拦截器获取metadata

var xxxID,yyyID
    md, _ := metadata.FromIncomingContext(ctx)
    if arr := md["XXX_id"]; len(arr) > 0 {
        xxxID = arr[0]
    }
    if arr := md["YYY_id"]; len(arr) > 0 {
        yyyID = arr[0]
    }
        m, err := handler(ctx, req)
    if err != nil {
        logger("RPC failed with error %v", err)
    }

在server启动之后,需要将这个服务注册到etcd 。

用etcd3在编译的时候出现了和groc-go版本不兼容的问题

首先当前用的etcd 版本是 3.4.9,支持的grpc-go最高版本是v1.26.0,于是需要将grpc-go降级

replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

降级之后之前生成的proto.pb.go 又出现了错误,于是将protobuf降级

replace github.com/golang/protobuf => github.com/golang/protobuf v1.2.0

以上的问题网上其他人也遇到过,下面的这个不清楚是我本地环境有问题还是其他原因

报错原因是 google.golang.org/genproto这个包下面生成的proto.pb.go里面指定了protobuf1.4的版本变量,解决办法还是降级,版本号是在$GOPATH/pkg/mod/... 下面找到的

replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8

关于etcd的内容之后再整理吧。

小结

结合etcd 的watch功能,很容易检测某一个路径节点的变化,如果,server端注册两个服务到etcd

key = /project/service/user/1 val = 127.0.0.1:9999

key = /project/service/user/2 val = 127.0.0.1:9998

在客户端,如果我们自定义了一个名叫example的resolverBuilder,

同时开启一个watch协程 ,监测/project/service下面的节点,动态维护Build()中addrsStore,这个时候我们设置addrsStore[user] = {127.0.0.1:9999,127.0.0.1:9998}。

然后在客户端grpc.Dai中令target = example:///user

那么在r.start()中就可以获取到 {127.0.0.1:9999,127.0.0.1:9998}(具体可以看上面示例中r.start()方法)

server注册的key,Build()中addrsStore中的key,以及target 后面的endPoint 的不同选择可以实现不通粒度的服务划分。

欢迎关注我们的微信公众号,每天学习Go知识

FveQFjN.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK