 2 years ago
broker是go-micro自身定义的异步Pub/Sub interface, 不同的机制(kafka、mqtt、nats…)最终只需要实现对应的接口,即可支持go-micro的异步消息发布/订阅。

type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string


event是go-micro基于broker的interface封装的一个基于protobuf的消息发送/订阅模块, 即最终还是依赖broker的实现(go-micro默认提供一个点对点http代理),所以只需要使用plugin的方式,修改了broker的实现, event即可应用。

  • Event只定义了Publish接口 (micro.go)
    // Event is used to publish messages to a topic
    type Event interface {
    // Publish publishes a message to the event topic
    Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error

    // Type alias to satisfy the deprecation
    type Publisher = Event
  • Client中Publish (client/client.go)
    // Client is the interface used to make requests to services.
    // It supports Request/Response via Transport and Publishing via the Broker.
    // It also supports bidirectional streaming of requests.
    type Client interface {
    Init(...Option) error
    Options() Options
    NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
    NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
    Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
    Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
    Publish(ctx context.Context, msg Message, opts ...PublishOption) error
    String() string
  • Server中Subscribe (server/server.go)
    // Server is a simple micro server abstraction
    type Server interface {
    Options() Options
    Init(...Option) error
    Handle(Handler) error
    NewHandler(interface{}, ...HandlerOption) Handler
    NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
    Subscribe(Subscriber) error
    Start() error
    Stop() error
    String() string
  • micro中RegisterSubscriber (micro.go)
    // RegisterSubscriber is syntactic sugar for registering a subscriber
    func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
    return s.Subscribe(s.NewSubscriber(topic, h, opts...))
  • grpc实现的Server中Subscribe (server/grpc/grpc.go)
    func newGRPCServer(opts ...server.Option) server.Server {
    options := newOptions(opts...)

    // create a grpc server
    srv := &grpcServer{
    opts: options,
    rpc: &rServer{
    serviceMap: make(map[string]*service),
    handlers: make(map[string]server.Handler),
    subscribers: make(map[*subscriber][]broker.Subscriber),
    exit: make(chan chan error),
    wg: wait(options.Context),

    // configure the grpc server

    return srv

基于以上的分析, 直接使用Broker实现的Publish/Subscribe和使用go-micro中封装的Event实现的Publish/Subscribe本质是相同的,但是在使用的时候还是有一点差异:

  • Event的Body可以使用proto定义的message,Broker的body只能是[]byte
  • Event的Header需要通过context传递到底层, Broker直接设置header
  • 同一个topic使用两种方式都可以接受到,但是通过Broker直接发布的消息, Event订阅接受后,会提示序列化错误

