44

Go Micro Client 源码分析

 5 years ago
source link: https://www.tuicool.com/articles/BrM7Zva
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

概述

Client 主要是用来执行请求服务和订阅发布事件。是对于broker,Transort的一种封装方便使用。

Init

初始化客户端函数

  1. 初始化连接池数量和连接池TTL
  2. 调用注入的opts函数列表
  3. 最后初始化连接池
func (r *rpcClient) Init(opts ...Option) error {
    size := r.opts.PoolSize
    ttl := r.opts.PoolTTL

    for _, o := range opts {
        o(&r.opts)
    }

    // update pool configuration if the options changed
    if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
        r.pool.Lock()
        r.pool.size = r.opts.PoolSize
        r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
        r.pool.Unlock()
    }

    return nil
}

==Call==

Call是Client接口中最主要的方法,在之前 Go Micro Selector 源码分析

  1. Client调用Call方法
  2. Call方法调用selector组件的Select方法,获取next函数
  3. call匿名函数中调用next函数(默认为CacheSelector 随机获取服务列表中的节点, Go Micro Selector 源码分析 ) 返回node
  4. 以grpcClient为例,调用grpcClient.call
  5. call函数中获取conn,然后Invoke调用服务端函数
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
    // 复制出options
    callOpts := g.opts.CallOptions
    for _, opt := range opts {
        opt(&callOpts)
    }
    // 调用next函数 获取selector
    next, err := g.next(req, callOpts)
    if err != nil {
        return err
    }

    // 检查context Deadline
    d, ok := ctx.Deadline()
    if !ok {
        // 没有deadline 创建一个新的
        ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
    } else {
        // 获取到deadline设置context 
        opt := client.WithRequestTimeout(time.Until(d))
        opt(&callOpts)
    }

    // should we noop right here?
    select {
    case <-ctx.Done():
        return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
    default:
    }

    // 复制call函数 在下面的goroutine中使用
    gcall := g.call

    // wrap the call in reverse
    for i := len(callOpts.CallWrappers); i > 0; i-- {
        gcall = callOpts.CallWrappers[i-1](gcall)
    }

    // return errors.New("go.micro.client", "request timeout", 408)
    call := func(i int) error {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, req, i)
        if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {
            time.Sleep(t)
        }

        // select next node
        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return errors.NotFound("go.micro.client", err.Error())
        } else if err != nil {
            return errors.InternalServerError("go.micro.client", err.Error())
        }

        // 调用call 正式调用服务端接口
        err = gcall(ctx, node, req, rsp, callOpts)
        g.opts.Selector.Mark(req.Service(), node, err)
        return err
    }

    ch := make(chan error, callOpts.Retries+1)
    var gerr error
    // 重试 
    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            // 调动call 返回channel 
            ch <- call(i)
        }(i)

        select {
        case <-ctx.Done():
            return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
        case err := <-ch:
            // if the call succeeded lets bail early
            if err == nil {
                return nil
            }

            retry, rerr := callOpts.Retry(ctx, req, i, err)
            if rerr != nil {
                return rerr
            }

            if !retry {
                return err
            }

            gerr = err
        }
    }

    return gerr
}

Stream

Stream跟call的逻辑几乎是一样的,不过stream调用的是rpc_client.stream函数。这边就不过多的分析了

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
    // make a copy of call opts
    callOpts := r.opts.CallOptions
    for _, opt := range opts {
        opt(&callOpts)
    }

    next, err := r.next(request, callOpts)
    if err != nil {
        return nil, err
    }

    // should we noop right here?
    select {
    case <-ctx.Done():
        return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
    default:
    }

    call := func(i int) (Stream, error) {
        // call backoff first. Someone may want an initial start delay
        t, err := callOpts.Backoff(ctx, request, i)
        if err != nil {
            return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
        }

        // only sleep if greater than 0
        if t.Seconds() > 0 {
            time.Sleep(t)
        }

        node, err := next()
        if err != nil && err == selector.ErrNotFound {
            return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
        } else if err != nil {
            return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
        }

        stream, err := r.stream(ctx, node, request, callOpts)
        r.opts.Selector.Mark(request.Service(), node, err)
        return stream, err
    }

    type response struct {
        stream Stream
        err    error
    }

    ch := make(chan response, callOpts.Retries+1)
    var grr error

    for i := 0; i <= callOpts.Retries; i++ {
        go func(i int) {
            s, err := call(i)
            ch <- response{s, err}
        }(i)

        select {
        case <-ctx.Done():
            return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
        case rsp := <-ch:
            // if the call succeeded lets bail early
            if rsp.err == nil {
                return rsp.stream, nil
            }

            retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
            if rerr != nil {
                return nil, rerr
            }

            if !retry {
                return nil, rsp.err
            }

            grr = rsp.err
        }
    }

    return nil, grr
}

Publish

Client中的Publish主要是调用broker中的publish:r.opts.Broker.Publish

然而在client的publish函数中,获取了topic准备了body 最后调用broker的publish

func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
    options := PublishOptions{
        Context: context.Background(),
    }
    for _, o := range opts {
        o(&options)
    }

    md, ok := metadata.FromContext(ctx)
    if !ok {
        md = make(map[string]string)
    }

    id := uuid.New().String()
    md["Content-Type"] = msg.ContentType()
    md["Micro-Topic"] = msg.Topic()
    md["Micro-Id"] = id

    // set the topic
    topic := msg.Topic()

    // get proxy
    if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
        options.Exchange = prx
    }

    // get the exchange
    if len(options.Exchange) > 0 {
        topic = options.Exchange
    }

    // encode message body
    cf, err := r.newCodec(msg.ContentType())
    if err != nil {
        return errors.InternalServerError("go.micro.client", err.Error())
    }
    b := &buffer{bytes.NewBuffer(nil)}
    if err := cf(b).Write(&codec.Message{
        Target: topic,
        Type:   codec.Event,
        Header: map[string]string{
            "Micro-Id":    id,
            "Micro-Topic": msg.Topic(),
        },
    }, msg.Payload()); err != nil {
        return errors.InternalServerError("go.micro.client", err.Error())
    }
    r.once.Do(func() {
        r.opts.Broker.Connect()
    })

    return r.opts.Broker.Publish(topic, &broker.Message{
        Header: md,
        Body:   b.Bytes(),
    })
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK