1

一个基于消息队列的的Go语言RPC框架

 6 months ago
source link: https://studygolang.com/articles/36525
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

一个基于消息队列的的Go语言RPC框架

项目地址: https://github.com/yc90s/xrpc.git

RPC作为分布式系统中的基础组件, 使用非常广泛。大多数的RPC框架都是基于点对点的网络连接, 比如golang原生的rpc框架、grpc等. 点对点连接的通讯方式, 随着集群节点的增加, 会导致集群的拓扑结构越来越复杂, 服务之间的耦合度越来越高, 服务的扩展性和可维护性都会受到影响. 而消息队列的通讯方式, 可以很好的解决这个问题。每个服务只需要关注自己订阅的消息, 不需要关心消息的发送者是谁, 也不需要关心消息的接收者是谁.

XRPC设计的原则是为了实现一套基于消息队列的、易于拓展和易于使用的轻量级RPC框架.

XRPC的特性

除了上面提到的使用消息队列作为RPC的通道之外, XRPC还有以下几个比较实用的特点

  • 支持任意参数数量的远程调用, 不需要把接口的参数都打包成一个结构体再调用, 可以像调用本地函数一样
  • 支持CallCast两种远程调用方式, Call会一直阻塞直到接收到返回值或者超时, 而Cast适用于不需要等待返回值的情况
  • 代码生成, 实现了一套IDL, 最大程度贴近go语法, 用来定义rpc服务的接口信息, 自动生成接口代码 此外, XRPC的核心代码非常精简, 而且非常容易拓展.

XRPC的实现

一个RPC框架可以大体分为三个部分:通信、编码/解码、服务调用. 我从这三个方面分别介绍XRPC是怎么做的. XRPC的每个服务都会订阅一个主题, 等待接收远程调用的消息, 每个服务可以注册多个接口. 客户端将要调用的接口和参数序列化后发布到对应的主题, 服务端收到消息后利用反射调用对应接口, 并将结果通过消息队列返回客户端.

XRPC抽象出了一套消息队列接口

type MQueen interface {
    GenerateSubj() string
    Publish(string, []byte) error
    Subscribe(string, MQCallback) error
    UnSubscribe() error
}
  • GenerateSubj 生成一个唯一的订阅主题名
  • Publish 发布一条消息到指定的主题
  • Subscribe 订阅指定的主题
  • UnSubscribe 取消订阅

要拓展使用其他的消息队列, 只需要实现MQueen接口即可, 目前实现的有nats.

编码/解码

编码/解码部分XRPC也抽象了一个接口

type Codec interface
{
    Unmarshal(b []byte, dst any) error
    Marshal(v any) ([]byte, error)
}

同样只要实现这个接口就可以拓展自己的序列化方式, 目前实现的有gob、protobuf, 默认采用gob

XRPC的支持注册任意数量参数的接口, 以及CallCast两种远程调用方式. 并且实现了一套IDL, 最大程度贴近go语法, 用来定义rpc接口信息, 并自动生成相关代码, 下面是一个简单的例子.

首先定义我们的RPC服务接口hello.service

package main 

service HelloService {
    Hello(string) (string, error)
}

可以看到语法和Go非常类似, 它里面定义了一个名叫HelloService服务, 包含一个Hello方法, 有一个参数两个返回值.

一个文件里面可以定义多个服务, 每个服务可以定义多个接口, 接口支持任意数量的参数.

然后生成接口代码, 执行下面的命令会在当前目录生成一个hello.service.go文件

xrpc hello.service

接下来就可以实现我们的RPC服务

type HelloRPCService struct {
    *xrpc.RPCServer
}

func (s *HelloRPCService) Hello(request string) (string, error) {
    reply := "hello:" + request
    return reply, nil
}

// 创建服务
func newHelloService(nc *nats.Conn) *HelloRPCService {
    s := &HelloRPCService{
        RPCServer: xrpc.NewRPCServer(
            xrpc.SetMQ(natsmq.NewMQueen(nc)),
            xrpc.SetSubj("hello_server"),
        ),
    }
    RegisterHelloServiceServer(s.RPCServer, s)
    return s
}
  • HelloRPCService 实现了我们定义的RPC接口,
  • Hello 接口将收到的消息添加"hello:"前缀并将结果返回
  • newHelloService 方法用来创建RPC服务, 采用nats消息队列, 指定订阅的主题为hello_server

接下来创建RPC客户端

func newHelloRPCServiceClient(nc *nats.Conn) *HelloServiceClient {
    return NewHelloServiceClient(xrpc.NewRPCClient(
        xrpc.SetMQ(natsmq.NewMQueen(nc)),
        xrpc.SetSubj("hello_client"),
    ))
}

现在我们就可以调用远程接口了

func main() {
    nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(1000))
    if err != nil {
        panic(err)
    }
    defer nc.Close()

    // 启动RPC服务
    s := newHelloService(nc)
    err = s.Start()
    if err != nil {
        panic(err)
    }
    defer s.Stop()

    // 创建RPC客户端
    c := newHelloRPCServiceClient(nc)
    defer c.Close()

    // 调用Hello发放, 第一个参数是rpc服务的名称
    reply, err := c.Hello("hello_server", "yc90s")
    if err != nil {
        panic(err)
    }
    fmt.Println(reply) // 输出: hello:yc90s
}

Hello接口的第一个参数是RPC服务订阅的主题名, 例子里是hello_server, 后面的参数是传递给远程接口实际调用的参数, 程序最后输出"hello:yc90s"

RPC作为分布式系统中基础又重要的一个组件, 所以我将其单独开源出来, 后续会再xrpc基础上, 再开源一个分布式服务器框架, 欢迎感兴趣的同学一起交流


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK