3

Ethereum以太坊源码分析(五)RPC源码分析

 2 years ago
source link: https://dinghaoli.github.io/2018/11/ETH_code_5/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Ethereum以太坊源码分析(五)RPC源码分析

calendar.png2018-11-23 | 阅读:次

本文参考:

Github - go-ethereum-code-analysis
[《分布式系统》第五章]


什么是RPC

远程过程调用(RPC), RPC将过程调用的通用编程抽象扩展到了分布式环境。一个调用过程可以像调用本地结点上的过程那样去调用一个远程结点上的过程。

大多数现代编程语言提供了把一个程序组织成一系列能彼此通信的模块的方法。模块之间的通信可以依靠模块间的过程调用,或者直接访问另外一个模块中的变量来实现。为了控制模块之间可能的交互,必须为每一个模块定义显式的接口,模块接口指定可供其他模块访问的过程和变量。实现后的模块就隐藏了除接口以外的所有信息。只要模块的接口保持相同,模块的实现就可以随意改变而不影响到模块的使用者。

关于RPC更具体内容可以参考我的一篇《分布式系统》读书笔记


RPC包的官方文档

在go-ethereum的rpc包内,专门有一个文件用于描述rpc包:go-ethereum/ethdb/interface.go

rpc包提供这样一种能力,可以通过网络或者其他I/O连接,可以访问对象被导出的方法。创建一个服务器实例之后,对象可以注册到服务器实上,然后可以让外界访问。导出的方法在遵循某些原则的情况下,可以被远程调用。 同时还支持发布/订阅模式。

符合以下标准的方法可用于远程访问

  • 对象必须被导出
  • 方法必须被导出
  • 方法返回0,1(响应或错误)或2(响应和错误)值
  • 方法参数必须被导出或是内置类型
  • 方法返回值必须被导出或是内置类型

举一个例子:

func (s *CalcService) Add(a, b int) (int, error)

当返回的error不等于nil的时候,返回的整型值被忽略,而error将被发送回客户端。反之整型的返回值被发送回客户端。

我们可以通过传递指针去使用可选参数(Optional arguments),比如:如果你想写一个addition,且值在一个有限的范围内(超出了就取mod),我们允许一个mod指针参数。

func (s *CalService) Add(a, b int, mod *int) (int, error)

RPC方法可以通过传两个integer作为前两个参数和一个null值作为第三个参数来调用。在这种情况下,mod参数会被设置为nil。

或者可以传递三个integer,在这种情况下,mod应当指向第三个参数。尽管可选参数是最后的参数,RPC包任然接收两个integer作为参数,而RPC包还是会传入nil作为RPC方法的mod

server提供了ServerCodec方法,这个方法接收一个ServerCodec实例。服务器会使用codec读取请求,处理请求,然后通过codec发送回应给客户端。server可以并发的执行请求。response的顺序可能和request的顺序不一致。codec(在这里可以理解为编码解码的协议,比如可以使用json、proto-buf等)

我们举一个例子,下面的server用的是 JSON codec:

type CalculatorService struct {}

 func (s *CalculatorService) Add(a, b int) int {
    return a + b
 }

 func (s *CalculatorService) Div(a, b int) (int, error) {
    if b == 0 {
        return 0, errors.New("divide by zero")
    }
    return a/b, nil
 }

 calculator := new(CalculatorService)
 server := NewServer()
 server.RegisterName("calculator", calculator")

 l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"})
 for {
    // 接受请求
    c, _ := l.AcceptUnix()
    // 解析请求
    codec := v2.NewJSONCodec(c)
    // 并发处理请求
    go server.ServeCodec(codec)
 }

该软件包还通过使用subscriptions来支持发布订阅模式。被认为符合通知的方法必须满足以下条件:

  • 对象必须导出
  • 方法必须导出
  • 方法的第一个参数类型必须是context.Context
  • 方法参数必须被导出或内置类型
  • 方法必须返回元组(Subscription,错误)

举一个例子:

 func (s *BlockChainService) NewBlocks(ctx context.Context) (Subscription, error) {
    ...
 }

Subscriptions出现以下情况时,会被删除

  • 用户发送了一个取消订阅的请求
  • 创建订阅的连接被关闭。这种情况可能由客户端或者服务器触发。 服务器在写入出错或者是通知队列长度太大的时候会选择关闭连接。

包RPC的大致结构

网络协议channels和Json格式的请求和回应的编码和解码都是同时与服务端和客户端打交道的类。网络协议channels主要提供连接和数据传输的功能。json格式的编码和解码主要提供请求和回应的序列化和反序列化功能(Json -> Go的对象)。

ETH_5_1.png


源码解析

server.go

server.go主要实现了RPC服务端的核心逻辑。包括RPC方法的注册,读取请求,处理请求,发送回应等逻辑。server的核心数据结构是Server结构体。services字段是一个map,记录了所有注册的方法和类。run参数是用来控制Server的运行和停止的。codecs是一个set。用来存储所有的编码解码器,其实就是所有的连接。codecsMu是用来保护多线程访问codecs的锁。

services字段的value类型是service类型。 service代表了一个注册到Server的实例,是一个对象和方法的组合。 service字段的name代表了service的namespace, typ实例的类型, callbacks是实例的回调方法, subscriptions是实例的订阅方法。

// go-ethereum/rpc/types.go

// map用来记录个各种字段和对应的服务
type serviceRegistry map[string]*service // collection of services
// map用来记录个各种字段和对应的回调函数
type callbacks map[string]*callback      // collection of RPC callbacks
// map用来记录个各种字段和对应的订阅回调函数
type subscriptions map[string]*callback  // collection of subscription callbacks

// Server represents a RPC server
type Server struct {
    services serviceRegistry

    // run是用来控制Server的运行和停止
    run      int32
    // 保护多线程访问codecs
    codecsMu sync.Mutex
    // 春村所有的codecs协议的编码解码器
    codecs   mapset.Set
}

...

// callback is a method callback which was registered in the server
type callback struct {
    rcvr        reflect.Value  // receiver of method
    method      reflect.Method // callback
    argTypes    []reflect.Type // input argument types
    hasCtx      bool           // method's first argument is a context (not included in argTypes)
    errPos      int            // err return idx, of -1 when method cannot return error
    isSubscribe bool           // indication if the callback is a subscription
}

// service represents a registered object
type service struct {
    // 服务名字
    name          string        // name for service
    // 实例类型
    typ           reflect.Type  // receiver type
    // 实例的回调方法
    callbacks     callbacks     // registered handlers
    // 实例的订阅方法
    subscriptions subscriptions // available subscriptions/notifications
}

Server的创建,Server创建的时候通过调用server.RegisterName把自己的实例注册上来,提供一些RPC服务的元信息。

// go-ethereum/rpc/server.go

const MetadataApi = "rpc"

// CodecOption specifies which type of messages this codec supports
type CodecOption int

const (
    // OptionMethodInvocation is an indication that the codec supports RPC method calls
    OptionMethodInvocation CodecOption = 1 << iota

    // OptionSubscriptions is an indication that the codec suports RPC notifications
    OptionSubscriptions = 1 << iota // support pub sub
)

// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
    // 创建一个server实例
    server := &Server{
        services: make(serviceRegistry),
        codecs:   mapset.NewSet(),
        run:      1,
    }

    // register a default service which will provide meta information about the RPC service such as the services and
    // methods it offers.
    // 存一个默认的service用来提供RPC服务的元信息
    // 我们可以看出rpcService里面存储的正是server自己
    rpcService := &RPCService{server}
    // 注册服务
    server.RegisterName(MetadataApi, rpcService)

    return server
}

// RPCService gives meta information about the server.
// e.g. gives information about the loaded modules.
type RPCService struct {
    server *Server
}

服务注册函数server.RegisterName,RegisterName方法会通过传入的参数来创建一个service对象,如过传入的rcvr实例没有找到任何合适的方法,那么会返回错误。 如果没有错误,就把创建的service实例加入serviceRegistry。

// go-ethereum/rpc/server.go

// RegisterName will create a service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
// created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
    // 如果为空,创建新的serviceRegistry
    if s.services == nil {
        s.services = make(serviceRegistry)
    }

    // 创建一个新的sevice
    svc := new(service)
    // 从rcvr获取服务类型
    svc.typ = reflect.TypeOf(rcvr)
    // 从rcvr获取服务
    rcvrVal := reflect.ValueOf(rcvr)

    if name == "" {
        return fmt.Errorf("no service name for type %s", svc.typ.String())
    }
    // 如果实例的类名不是导出的(类名的首字母大写),就返回错误。
    if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
        return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
    }

    //通过反射信息找到合适的callbacks 和subscriptions方法
    methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)

    if len(methods) == 0 && len(subscriptions) == 0 {
        return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
    }
    //如果这个名字当前已经被注册过了,那么如果有同名的方法就用新的替代,否者直接插入。
    // already a previous service register under given name, merge methods/subscriptions
    if regsvc, present := s.services[name]; present {
        // 替换/更新callback
        for _, m := range methods {
            regsvc.callbacks[formatName(m.method.Name)] = m
        }
        // 替换/更新subscriptions
        for _, s := range subscriptions {
            regsvc.subscriptions[formatName(s.method.Name)] = s
        }
        return nil
    }

    svc.name = name
    svc.callbacks, svc.subscriptions = methods, subscriptions

    s.services[svc.name] = svc
    return nil
}

我们注意到通过反射信息找出合适的方法,suitableCallbacks,这个方法在utils.go里面。 这个方法会遍历这个类型的所有方法,找到适配RPC callback或者subscription callback类型标准的方法并返回。关于RPC的标准,请参考文档开头的RPC标准。这个方法总的来说就是对检查符合RPC方法,如果符合就放入callbacks和subscriptions当中返回。

// go-ethereum/rpc/utils.go

// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
    callbacks := make(callbacks)
    subscriptions := make(subscriptions)

METHODS:
    for m := 0; m < typ.NumMethod(); m++ {
        method := typ.Method(m)
        mtype := method.Type
        mname := formatName(method.Name)
        if method.PkgPath != "" { // method must be exported
            continue
        }

        var h callback
        // isPubSub能判断该方法第一个参数是否为context.Context类型,而且返回一个对(Subscription, error)
        h.isSubscribe = isPubSub(mtype)
        h.rcvr = rcvr
        h.method = method
        h.errPos = -1

        firstArg := 1
        numIn := mtype.NumIn()
        if numIn >= 2 && mtype.In(1) == contextType {
            h.hasCtx = true
            firstArg = 2
        }

        if h.isSubscribe {
            h.argTypes = make([]reflect.Type, numIn-firstArg) // skip rcvr type
            for i := firstArg; i < numIn; i++ {
                argType := mtype.In(i)
                if isExportedOrBuiltinType(argType) {
                    h.argTypes[i-firstArg] = argType
                } else {
                    continue METHODS
                }
            }

            subscriptions[mname] = &h
            continue METHODS
        }

        // determine method arguments, ignore first arg since it's the receiver type
        // Arguments must be exported or builtin types
        h.argTypes = make([]reflect.Type, numIn-firstArg)
        for i := firstArg; i < numIn; i++ {
            argType := mtype.In(i)
            if !isExportedOrBuiltinType(argType) {
                continue METHODS
            }
            h.argTypes[i-firstArg] = argType
        }

        // check that all returned values are exported or builtin types
        // 确认所有返回值都是内部类型
        for i := 0; i < mtype.NumOut(); i++ {
            if !isExportedOrBuiltinType(mtype.Out(i)) {
                continue METHODS
            }
        }

        // when a method returns an error it must be the last returned value
        h.errPos = -1
        for i := 0; i < mtype.NumOut(); i++ {
            if isErrorType(mtype.Out(i)) {
                h.errPos = i
                break
            }
        }

        if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
            continue METHODS
        }

        switch mtype.NumOut() {
        case 0, 1, 2:
            if mtype.NumOut() == 2 && h.errPos == -1 { // method must one return value and 1 error
                continue METHODS
            }
            callbacks[mname] = &h
        }
    }

    return callbacks, subscriptions
}

server启动和服务, server的启动和服务这里参考ipc.go中的一部分代码。可以看到每Accept()一个链接,就启动一个goroutine调用srv.ServeCodec来进行服务,这里也可以看出JsonCodec的功能,Codec类似于装饰器模式,在连接外面包了一层。Codec会放在后续来介绍,这里先简单了解一下。

// go-ethereum/rpc/ipc.go

// ServeListener accepts connections on l, serving JSON-RPC on them.
func (srv *Server) ServeListener(l net.Listener) error {
    for {
        conn, err := l.Accept()
        // 判断是否为暂时性错误
        if netutil.IsTemporaryError(err) {
            log.Warn("RPC accept error", "err", err)
            continue
        } else if err != nil {
            return err
        }
        // log记录
        log.Trace("Accepted connection", "addr", conn.RemoteAddr())
        // 开启一个goroutine进行服务
        go srv.ServeCodec(NewJSONCodec(conn), OptionMethodInvocation|OptionSubscriptions)
    }
}

ServeCodec这个方法很简单,提供了codec.Close的关闭功能。serveRequest的第二个参数singleShot是控制长连接还是短连接的参数,如果singleShot为真,那么处理完一个请求之后会退出。不过咱们的serveRequest方法是一个死循环,不遇到异常,或者客户端主动关闭,服务端是不会关闭的。所以rpc提供的是长连接的功能。

// go-ethereum/rpc/server.go

// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed or the server is
// stopped. In either case the codec is closed.
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
    defer codec.Close()
    s.serveRequest(context.Background(), codec, false, options)
}

现在我们来看处理请求的核心方法。serveRequest这个方法就是Server主要的流程处理函数。从codec读取请求,找到对应的方法并调用,然后把回应写入codec。

部分标准库的代码可以参考网上的使用教程,sync.WaitGroup实现了一个信号量的功能,Done()是Add(-1)的别名。简单的来说,使用Add()添加计数,Done()减掉一个计数,计数不为0, 阻塞Wait()的运行。Context实现上下文管理。若singleShot参数为真,那么函数只会处理一次request,若singleShot为假,本函数可以一直处理request直到codec返回错误(一般来说这个错误是EOF),在这个情况下,他会并行的执行(goroutine)requests。

// go-ethereum/rpc/server.go

// serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec.
//
// If singleShot is true it will process a single request, otherwise it will handle
// requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
    var pend sync.WaitGroup

    defer func() {
        // 调用recover函数将会捕获到当前的panic(如果有的话)
        if err := recover(); err != nil {
            const size = 64 << 10
            buf := make([]byte, size)
            buf = buf[:runtime.Stack(buf, false)]
            log.Error(string(buf))
        }
        s.codecsMu.Lock()
        s.codecs.Remove(codec)
        s.codecsMu.Unlock()
    }()

    //  ctx, cancel := context.WithCancel(context.Background())
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    // if the codec supports notification include a notifier that callbacks can use
    // to send notification to clients. It is tied to the codec/connection. If the
    // connection is closed the notifier will stop and cancels all active subscriptions.
    if options&OptionSubscriptions == OptionSubscriptions {
        ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
    }
    s.codecsMu.Lock()
    if atomic.LoadInt32(&s.run) != 1 { // server stopped
        s.codecsMu.Unlock()
        return &shutdownError{}
    }
    s.codecs.Add(codec)
    s.codecsMu.Unlock()

    // test if the server is ordered to stop
    for atomic.LoadInt32(&s.run) == 1 {
        // 从codec中读取下一个request
        reqs, batch, err := s.readRequest(codec)
        if err != nil {
            // If a parsing error occurred, send an error
            if err.Error() != "EOF" {
                log.Debug(fmt.Sprintf("read error %v\n", err))
                codec.Write(codec.CreateErrorResponse(nil, err))
            }
            // Error or end of stream, wait for requests and tear down
            // 这里主要是考虑多线程处理的时候,如果出现错误需要等待所有的request处理完毕,再返回
            // 每启动一个go线程会调用pend.Add(1)。 
            // 处理完成后调用pend.Done()会减去1。当为0的时候,Wait()方法就会返回。
            pend.Wait()
            return nil
        }

        // check if server is ordered to shutdown and return an error
        // telling the client that his request failed.
        // 如果serve被要求关闭,则返回一个错误并告诉client他的request已经失败
        if atomic.LoadInt32(&s.run) != 1 {
            err = &shutdownError{}
            if batch {
                resps := make([]interface{}, len(reqs))
                for i, r := range reqs {
                    resps[i] = codec.CreateErrorResponse(&r.id, err)
                }
                codec.Write(resps)
            } else {
                codec.Write(codec.CreateErrorResponse(&reqs[0].id, err))
            }
            return nil
        }
        // If a single shot request is executing, run and return immediately
        // 如果只执行一次,那么执行完成后返回。
        if singleShot {
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
            return nil
        }
        // For multi-shot connections, start a goroutine to serve and loop back
        // singleShot为假,用信号量记录我们并行的数量。
        pend.Add(1)
        // 启动goroutine对请求进行服务。
        go func(reqs []*serverRequest, batch bool) {
            defer pend.Done()
            if batch {
                s.execBatch(ctx, codec, reqs)
            } else {
                s.exec(ctx, codec, reqs[0])
            }
        }(reqs, batch)
    }
    return nil
}

接下来我们看看rpcRequest的结构, 这个结构代表了原本的RPC请求的结构,我们通过调用codec.ReadRequestHeaders()可以得到。而之后,我们通过readReques()函数的处理我们可以获得我们想要的serverRequest。

// go-ethereum/rpc/types.go

// rpcRequest represents a raw incoming RPC request
type rpcRequest struct {
    service  string
    method   string
    id       interface{}
    isPubSub bool
    params   interface{}
    err      Error // invalid batch element
}

// serverRequest is an incoming request
type serverRequest struct {
    id            interface{}
    svcname       string
    callb         *callback
    args          []reflect.Value
    isUnsubscribe bool
    err           Error
}

readRequest() 从codec读取下一个/批原始rpcRequest并且转换成serverRequest,并且检查读取所有rpcRequest,处理分类放入[]*serverRequest当中。

// go-ethereum/rpc/server.go

// readRequest requests the next (batch) request from the codec. It will return the collection
// of requests, an indication if the request was a batch, the invalid request identifier and an
// error when the request could not be read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
    // 获取rpcRequest
    reqs, batch, err := codec.ReadRequestHeaders()
    if err != nil {
        return nil, batch, err
    }
    // 初始化serverRequest
    requests := make([]*serverRequest, len(reqs))

    // verify requests
    // 循环用于检验requests
    for i, r := range reqs {
        var ok bool
        var svc *service

        // 如果request不合法
        if r.err != nil {
            requests[i] = &serverRequest{id: r.id, err: r.err}
            continue
        }
        // 如果请求是解除订阅方面的请求,而且方法名称有_unsubscribe后缀。
        if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
            requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
            argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
            if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                requests[i].args = args
            } else {
                requests[i].err = &invalidParamsError{err.Error()}
            }
            continue
        }
        // 如果没有注册这个服务,说明这个rpc方法不可用
        if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
            requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            continue
        }
        // 如果是发布和订阅模式。放入订阅方法和参数,args第一个元素为service.method的名字,远程调用方法的参数从第二个元素开始
        if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
            if callb, ok := svc.subscriptions[r.method]; ok {
                requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
                if r.params != nil && len(callb.argTypes) > 0 {
                    argTypes := []reflect.Type{reflect.TypeOf("")}
                    argTypes = append(argTypes, callb.argTypes...)
                    if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
                        requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
                    } else {
                        requests[i].err = &invalidParamsError{err.Error()}
                    }
                }
            } else {
                requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
            }
            continue
        }

        // 检查callback是否存在
        if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
            requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
            if r.params != nil && len(callb.argTypes) > 0 {
                // 获取args
                if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
                    requests[i].args = args
                } else {
                    requests[i].err = &invalidParamsError{err.Error()}
                }
            }
            continue
        }

        requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
    }

    return requests, batch, nil
}

我们现在已经获得了所有的Request并且已经转换成serverRequest的格式了,接下来我们需要去执行这些远程调用。exec和execBatch方法,他们都会调用s.handle方法对request进行处理。execBatch功能与exec几乎一致,只是execBatch用于一批request的处理而已。结构上大同小异,直接对比exec的结构来看即可。

// go-ethereum/rpc/server.go

// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
    var response interface{}
    var callback func()
    // request本身就有错,直接返回带有错误信息的response
    if req.err != nil {
        response = codec.CreateErrorResponse(&req.id, req.err)
    } else {
        // 调用s.handle处理请求
        response, callback = s.handle(ctx, codec, req)
    }
    // 把response通过codec进行编码,相当于转换成适当的格式返回给client
    if err := codec.Write(response); err != nil {
        log.Error(fmt.Sprintf("%v\n", err))
        codec.Close()
    }

    // when request was a subscribe request this allows these subscriptions to be actived
    // 当请求是一个订阅请求(subscribe request),那么需要启动subscriptions本身。
    if callback != nil {
        callback()
    }
}

// execBatch executes the given requests and writes the result back using the codec.
// It will only write the response back when the last request is processed.
func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) {
    responses := make([]interface{}, len(requests))
    var callbacks []func()
    for i, req := range requests {
        if req.err != nil {
            responses[i] = codec.CreateErrorResponse(&req.id, req.err)
        } else {
            var callback func()
            if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
                callbacks = append(callbacks, callback)
            }
        }
    }

    if err := codec.Write(responses); err != nil {
        log.Error(fmt.Sprintf("%v\n", err))
        codec.Close()
    }

    // when request holds one of more subscribe requests this allows these subscriptions to be activated
    for _, c := range callbacks {
        c()
    }
}

现在我们来看handle方法,它执行一个request,然后返回response。该函数会根据请求的不同类型(Unsubscribe、Subscribe、Regular RPC)去对症下药,具体的分析请看下面的注释

// go-ethereum/rpc/server.go

// handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
    if req.err != nil {
        return codec.CreateErrorResponse(&req.id, req.err), nil
    }
    // 如果这是一个用于“取消订阅”的请求,第一个参数需要是subscription id
    if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
        if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
            // NotifierFromContext(ctx)获取之前我们存入ctx的notifier
            notifier, supported := NotifierFromContext(ctx)
            // 如果他本身就不支持subscriptions(比如http)那就在response当中写入错误信息。
            if !supported { // interface doesn't support subscriptions (e.g. http)
                return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
            }
            // 获取subscription id
            subid := ID(req.args[0].String())
            // 用于在notifier中,取消该订阅
            if err := notifier.unsubscribe(subid); err != nil {
                return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
            }
            // 取消成功,吧成功的状态写入response
            return codec.CreateResponse(req.id, true), nil
        }
        return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
    }
    // 如果是一个订阅消息。 那么创建订阅。并激活订阅。
    if req.callb.isSubscribe {
        // createSubscription这个函数会调用subscription的callback并返回subscription id或者是错误
        subid, err := s.createSubscription(ctx, codec, req)
        if err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }
        // 当subscription创建成功并且已经把subscription id发送给client后,激活subscription
        // active the subscription after the sub id was successfully sent to the client
        activateSub := func() {
            notifier, _ := NotifierFromContext(ctx)
            notifier.activate(subid, req.svcname)
        }

        return codec.CreateResponse(req.id, subid), activateSub
    }

    // 如果只是正常的RPC调用
    // regular RPC call, prepare arguments
    // 验证args是否与callback函数的args相匹配
    if len(req.args) != len(req.callb.argTypes) {
        rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
            req.svcname, serviceMethodSeparator, req.callb.method.Name,
            len(req.callb.argTypes), len(req.args))}
        return codec.CreateErrorResponse(&req.id, rpcErr), nil
    }

    arguments := []reflect.Value{req.callb.rcvr}
    if req.callb.hasCtx {
        arguments = append(arguments, reflect.ValueOf(ctx))
    }
    if len(req.args) > 0 {
        arguments = append(arguments, req.args...)
    }
    // 执行RPC方法,然后返回结果。
    // execute RPC method and return result
    reply := req.callb.method.Func.Call(arguments)
    // 无返回
    if len(reply) == 0 {
        return codec.CreateResponse(req.id, nil), nil
    }
    // 返回错误
    if req.callb.errPos >= 0 { // test if method returned an error
        if !reply[req.callb.errPos].IsNil() {
            e := reply[req.callb.errPos].Interface().(error)
            res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
            return res, nil
        }
    }
    // 正常返回
    return codec.CreateResponse(req.id, reply[0].Interface()), nil
}

subscription.go 发布订阅模式

在之前的server.go中就有出现了一些发布订阅模式的代码, 在这里集中阐述一下。

我们在serveRequest的代码中,就有这样的代码。

// go-ethereum/rpc/server.go

func (s *Server) serveRequest(ctx context.Context, codec ServerCodec, singleShot bool, options CodecOption) error {
    ...

    // 如果codec支持notification功能, 可以通过一个叫notifier的对象执行回调函数发送notification给客户端。
    // 他和codec/connection关系很紧密。 如果连接被关闭,那么notifier会关闭,并取消掉所有激活的订阅。
    // 
    // if the codec supports notification include a notifier that callbacks can use
    // to send notification to clients. It is tied to the codec/connection. If the
    // connection is closed the notifier will stop and cancels all active subscriptions.
    if options&OptionSubscriptions == OptionSubscriptions {
        ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
    }

    ...
}

在服务一个客户端连接时候,调用newNotifier方法创建了一个notifier对象存储到ctx中。可以观察到Notifier对象保存了codec的实例,也就是说Notifier对象保存了网络连接,用来在需要的时候发送数据。

// go-ethereum/rpc/subscription.go

// newNotifier creates a new notifier that can be used to send subscription
// notifications to the client.
func newNotifier(codec ServerCodec) *Notifier {
    return &Notifier{
        codec:    codec,
        active:   make(map[ID]*Subscription),
        inactive: make(map[ID]*Subscription),
        buffer:   make(map[ID][]interface{}),
    }
}

然后在handle方法中,我们处理一类特殊的方法,这种方法被标识为isSubscribe。调用createSubscription方法创建了了一个Subscription并调用notifier.activate方法存储到notifier的激活队列里面。代码里面有一个技巧。这个方法调用完成后并没有直接激活subscription,而是把激活部分的代码作为一个函数返回回去。然后在exec或者execBatch代码里面等待codec.CreateResponse(req.id, subid)这个response被发送给客户端之后被调用。避免客户端还没有收到subscription ID的时候就收到了subscription信息。

// go-ethereum/rpc/server.go

// handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
   
    ...

    // 如果是一个订阅消息。 那么创建订阅。并激活订阅。
    if req.callb.isSubscribe {
        // createSubscription这个函数会调用subscription的callback并返回subscription id或者是错误
        subid, err := s.createSubscription(ctx, codec, req)
        if err != nil {
            return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
        }
        // 当subscription创建成功并且已经把subscription id发送给client后,激活subscription
        // active the subscription after the sub id was successfully sent to the client
        activateSub := func() {
            notifier, _ := NotifierFromContext(ctx)
            notifier.activate(subid, req.svcname)
        }

        return codec.CreateResponse(req.id, subid), activateSub
    }

   ...
}

createSubscription方法会调用指定的注册上来的方法,并得到回应。

// go-ethereum/rpc/server.go

// createSubscription will call the subscription callback and returns the subscription id or error.
func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
    // subscription have as first argument the context following optional arguments
    args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
    args = append(args, req.args...)
    // 调用req指定的注册subscription的方法
    reply := req.callb.method.Func.Call(args)

    if !reply[1].IsNil() { // subscription creation failed
        return "", reply[1].Interface().(error)
    }
    // 返回 subscription ID
    return reply[0].Interface().(*Subscription).ID, nil
}

在来看看我们的activate方法,这个方法激活了subscription。 subscription在subscription ID被发送给客户端之后被激活,避免客户端还没有收到subscription ID的时候就收到了subscription信息。因为处理Request可以是并行的,所以操作subscription时需要加锁才行。

// go-ethereum/rpc/subscription.go

// activate enables a subscription. Until a subscription is enabled all
// notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client.
func (n *Notifier) activate(id ID, namespace string) {
    // 上锁保证线程安全
    n.subMu.Lock()
    defer n.subMu.Unlock()
    // 用id从inactive队列找到对应的subscription,放入active当中
    if sub, found := n.inactive[id]; found {
        sub.namespace = namespace
        n.active[id] = sub
        delete(n.inactive, id)
        // Send buffered notifications.
        // 开始发送buffer[id]里面的notifications
        for _, data := range n.buffer[id] {
            n.send(sub, data)
        }
        delete(n.buffer, id)
    }
}

我们再来看一个取消订阅的函数。同样他会先加锁,然后根据id找到对应的subscription,然后调用close和delete用于删除subscription

// go-ethereum/rpc/subscription.go

// unsubscribe a subscription.
// If the subscription could not be found ErrSubscriptionNotFound is returned.
func (n *Notifier) unsubscribe(id ID) error {
    n.subMu.Lock()
    defer n.subMu.Unlock()
    if s, found := n.active[id]; found {
        // err 是一个 chan error 需要在unsubscribe时关闭
        close(s.err)
        delete(n.active, id)
        return nil
    }
    return ErrSubscriptionNotFound
}

最后是一个发送订阅的函数Notify,调用这个函数把数据发送到客户端,这个也比较简单。还是先加锁,然后如果subscription是active就直接发送,如果不是那就送入buffer

// go-ethereum/rpc/subscription.go

// Notify sends a notification to the client with the given data as payload.
// If an error occurs the RPC connection is closed and the error is returned.
func (n *Notifier) Notify(id ID, data interface{}) error {
    n.subMu.Lock()
    defer n.subMu.Unlock()

    if sub, active := n.active[id]; active {
        n.send(sub, data)
    } else {
        n.buffer[id] = append(n.buffer[id], data)
    }
    return nil
}

如何使用,建议通过subscription_test.go的TestNotifications来查看完整的流程。

client.go RPC客户端源码分析

客户端的主要功能是把请求发送到服务端,然后接收回应,再把回应传递给调用者。Client就相当于一个连接,用于连接RPC的服务器

客户端的数据结构如下:

// go-ethereum/rpc/client.go

// Client represents a connection to an RPC server.
type Client struct {
    idCounter   uint32
    // 生成连接的函数,客户端会调用这个函数生成一个网络连接对象。
    connectFunc func(ctx context.Context) (net.Conn, error)
    isHTTP      bool

    // writeConn is only safe to access outside dispatch, with the
    // write lock held. The write lock is taken by sending on
    // requestOp and released by sending on sendDone.
    // writeConn是用来写入请求的网络连接对象,
    // 只有在dispatch方法外面调用才是安全的,而且需要通过给requestOp队列发送请求来获取锁,
    // 获取锁之后就可以把请求写入网络,写入完成后发送请求给sendDone队列来释放锁,供其它的请求使用。
    writeConn net.Conn

    // for dispatch
    // 下面有很多的channel,channel一般来说是goroutine之间用来通信的通道,后续会随着代码介绍channel是如何使用的。

    close       chan struct{}
    didQuit     chan struct{}                  // closed when client quits
    reconnected chan net.Conn                  // where write/reconnect sends the new connection
    readErr     chan error                     // errors from read
    readResp    chan []*jsonrpcMessage         // valid messages from read
    requestOp   chan *requestOp                // for registering response IDs
    sendDone    chan error                     // signals write completion, releases write lock
    respWait    map[string]*requestOp          // active requests
    subs        map[string]*ClientSubscription // active subscriptions
}

newClient,新建一个客户端。通过调用connectFunc方法来获取一个网络连接,如果网络连接是httpConn对象的化,那么isHTTP设置为true。然后是对象的初始化,如果是HTTP连接的化,直接返回,否者就启动一个goroutine调用dispatch方法。dispatch方法是整个client的指挥中心,通过上面提到的channel来和其他的goroutine来进行通信,获取信息,根据信息做出各种决策。后续会详细介绍dispatch。因为HTTP的调用方式非常简单,这里先对HTTP的方式做一个简单的阐述。

// go-ethereum/rpc/client.go

func newClient(initctx context.Context, connectFunc func(context.Context) (net.Conn, error)) (*Client, error) {
    // 通过调用connectFunc方法来获取一个网络连接
    conn, err := connectFunc(initctx)
    if err != nil {
        return nil, err
    }
    // 判断是否为HTTP连接
    _, isHTTP := conn.(*httpConn)
    // 对象初始化
    c := &Client{
        writeConn:   conn,
        isHTTP:      isHTTP,
        connectFunc: connectFunc,
        close:       make(chan struct{}),
        didQuit:     make(chan struct{}),
        reconnected: make(chan net.Conn),
        readErr:     make(chan error),
        readResp:    make(chan []*jsonrpcMessage),
        requestOp:   make(chan *requestOp),
        sendDone:    make(chan error, 1),
        respWait:    make(map[string]*requestOp),
        subs:        make(map[string]*ClientSubscription),
    }
    // 如果不是HTTP连接,则执行dispatch
    // dispatch is the main loop of the client.
    // It sends read messages to waiting calls to Call and BatchCall
    // and subscription notifications to registered subscriptions.
    if !isHTTP {
        go c.dispatch(conn)
    }
    return c, nil
}

请求调用通过调用client的 Call方法来进行RPC调用。

// go-ethereum/rpc/client.go

// Call performs a JSON-RPC call with the given arguments and unmarshals into
// result if no error occurred.
// 
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
// 
// 返回值必须是一个指针,这样才能把json值转换成对象。 如果你不关心返回值,也可以通过传nil来忽略。

func (c *Client) Call(result interface{}, method string, args ...interface{}) error {
    ctx := context.Background()
    return c.CallContext(ctx, result, method, args...)
}

// CallContext performs a JSON-RPC call with the given arguments. If the context is
// canceled before the call has successfully returned, CallContext returns immediately.
//
// The result must be a pointer so that package json can unmarshal into it. You
// can also pass nil, in which case the result is ignored.
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
    msg, err := c.newMessage(method, args...)
    if err != nil {
        return err
    }
    // 这里构建了一个requestOp对象。 resp是读取返回的队列,队列的长度是1。
    op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
    // 用HTTP来发送
    if c.isHTTP {
        // 这里直接在sendHTTP里面都会收到回复,并存储在requestOp.resp里面
        err = c.sendHTTP(ctx, op, msg)
    } else {
        err = c.send(ctx, op, msg)
    }
    if err != nil {
        return err
    }

    // dispatch has accepted the request and will close the channel when it quits.
    switch resp, err := op.wait(ctx); {
    case err != nil:
        return err
    case resp.Error != nil:
        return resp.Error
    case len(resp.Result) == 0:
        return ErrNoResult
    default:
        return json.Unmarshal(resp.Result, &result)
    }
}

sendHTTP使用HTTP协议来发送,这个方法直接调用doRequest方法进行请求拿到回应。然后写入到resp队列就返回了。doRequest当中会调用hc.client.Do(req)。本质上是http.Client.Do(req)来发送request。

// go-ethereum/rpc/client.go

func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
    // 转换成httpConn
    hc := c.writeConn.(*httpConn)
    // doRequest会使用hc.client.Do(req)来发送request
    respBody, err := hc.doRequest(ctx, msg)
    if respBody != nil {
        defer respBody.Close()
    }

    if err != nil {
        if respBody != nil {
            buf := new(bytes.Buffer)
            if _, err2 := buf.ReadFrom(respBody); err2 == nil {
                return fmt.Errorf("%v %v", err, buf.String())
            }
        }
        return err
    }
    var respmsg jsonrpcMessage
    if err := json.NewDecoder(respBody).Decode(&respmsg); err != nil {
        return err
    }
    // 写入op.resp的chan中
    op.resp <- &respmsg
    return nil
}

在看看CallContext的另一个方法,op.wait()方法,这个方法会查看两个队列的信息。如果是http那么从resp队列获取到回应就会直接返回。 这样整个HTTP的请求过程就完成了。 中间没有涉及到多线程问题,都在一个线程内部完成了。

// go-ethereum/rpc/client.go

func (op *requestOp) wait(ctx context.Context) (*jsonrpcMessage, error) {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    // 直接从op.resp当中取出response就好了  
    case resp := <-op.resp:
        return resp, op.err
    }
}

如果不是HTTP请求,那处理的流程就比较复杂了。如果不是HTTP请求。在newClient的时候是启动了一个goroutine调用了dispatch方法。

我们先看非http的send方法。从注释来看,这个方法把op写入到requestOp这个队列,注意的是这个队列是没有缓冲区的,也就是说如果这个时候这个队列没有人处理的化,这个调用是会阻塞在这里的。 这就相当于一把锁,如果发送op到requestOp成功了就拿到了锁,可以继续下一步,下一步是调用write方法把请求的全部内容发送到网络上。然后发送消息给sendDone队列。sendDone可以看成是锁的释放,后续在dispatch方法里面会详细分析这个过程。 然后返回。返回之后方法会阻塞在op.wait方法里面。直到从op.resp队列收到一个回应,或者是收到一个ctx.Done()消息(这个消息一般会在完成或者是强制退出的时候获取到。)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK