3

broker and event in go-micro

 2 years ago
source link: https://ioridy.github.io/2022/01/22/go-micro-broker-and-event/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在调研broker的具体实现时,发现之前项目中的消息发送,是使用的Client(publish)/Server(subscribe)的方式,并没有直接使用broker,于是决定调研下这两者是什么关系。

Broker

broker是go-micro自身定义的异步Pub/Sub interface, 不同的机制(kafka、mqtt、nats…)最终只需要实现对应的接口,即可支持go-micro的异步消息发布/订阅。

type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}

Event

event是go-micro基于broker的interface封装的一个基于protobuf的消息发送/订阅模块, 即最终还是依赖broker的实现(go-micro默认提供一个点对点http代理),所以只需要使用plugin的方式,修改了broker的实现, event即可应用。

  • Event只定义了Publish接口 (micro.go)
    // Event is used to publish messages to a topic
    type Event interface {
    // Publish publishes a message to the event topic
    Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
    }

    // Type alias to satisfy the deprecation
    type Publisher = Event
  • Client中Publish (client/client.go)
    // Client is the interface used to make requests to services.
    // It supports Request/Response via Transport and Publishing via the Broker.
    // It also supports bidirectional streaming of requests.
    type Client interface {
    Init(...Option) error
    Options() Options
    NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
    NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
    Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
    Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
    Publish(ctx context.Context, msg Message, opts ...PublishOption) error
    String() string
    }
  • Server中Subscribe (server/server.go)
    // Server is a simple micro server abstraction
    type Server interface {
    Options() Options
    Init(...Option) error
    Handle(Handler) error
    NewHandler(interface{}, ...HandlerOption) Handler
    NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
    Subscribe(Subscriber) error
    Start() error
    Stop() error
    String() string
    }
  • micro中RegisterSubscriber (micro.go)
    // RegisterSubscriber is syntactic sugar for registering a subscriber
    func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
    return s.Subscribe(s.NewSubscriber(topic, h, opts...))
    }
  • grpc实现的Server中Subscribe (server/grpc/grpc.go)
    func newGRPCServer(opts ...server.Option) server.Server {
    options := newOptions(opts...)

    // create a grpc server
    srv := &grpcServer{
    opts: options,
    rpc: &rServer{
    serviceMap: make(map[string]*service),
    },
    handlers: make(map[string]server.Handler),
    subscribers: make(map[*subscriber][]broker.Subscriber),
    exit: make(chan chan error),
    wg: wait(options.Context),
    }

    // configure the grpc server
    srv.configure()

    return srv
    }

基于以上的分析, 直接使用Broker实现的Publish/Subscribe和使用go-micro中封装的Event实现的Publish/Subscribe本质是相同的,但是在使用的时候还是有一点差异:

  • Event的Body可以使用proto定义的message,Broker的body只能是[]byte
  • Event的Header需要通过context传递到底层, Broker直接设置header
  • 同一个topic使用两种方式都可以接受到,但是通过Broker直接发布的消息, Event订阅接受后,会提示序列化错误

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK