63

[译] 使用 Go 实现一个简单的事件总线模式

 5 years ago
source link: https://www.tuicool.com/articles/YJJjumb
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
NryIRfy.png!web

事件驱动架构是计算机科学中一种高度可扩展的范例。它允许我们可以多方系统异步处理事件。

事件总线是 发布/订阅模式 的实现,其中发布者发布数据,并且感兴趣的订阅者可以监听这些数据并基于这些数据作出处理。这使发布者与订阅者松耦合。发布者将数据事件发布到事件总线,总线负责将它们发送给订阅者。

Y3maqaf.png!web

传统的实现事件总线的方法会涉及到使用回调。订阅者通常实现接口,然后事件总线通过接口传播数据。

使用 go 的并发模型,我们知道在大多数地方可以使用 channel 来替代回调。在本文中,我们将重点介绍如何使用 channel 来实现事件总线。

我们专注于 基于主题(topic)的事件 。发布者发布到主题,订阅者可以收听它们。

定义数据结构

为了实现事件总线,我们需要定义要传递的数据结构。我们可以使用 struct 简单地创建一个新的数据类型。我们定义一个 DataEvent 的结构体如下:

type DataEvent struct {
   Data interface{}
   Topic string
}
复制代码

在这里,我们已经将基础数据定义为接口,这意味着它可以是任何值。我们还将主题定义为结构的成员。订阅者可能会收听 多个主题 ,因此,我们通过主题来让订阅者可以区分不同的事件的做法是不错的。

介绍 channels

现在我们已经为事件总线定义了我们主要的数据结构,我们还需要一种方法来传递它。为此,我们可以定义一个可以传播 DataEventDataChannel 类型。

// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice [] DataChannel
复制代码

DataChannelSlice 的创建是为了保留 DataChannel 的切片并轻松引用它们。

事件总线

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}
复制代码

EventBussubscribers ,这是一个包含 DataChannelSlices 的 map。我们使用互斥锁来保护并发访问的读写。

通过使用 map 和定义 topics ,它允许我们轻松地组织事件。主题被视为 map 的键。当有人发布它时,我们可以通过键轻松找到主题,然后将事件传播到 channel 中以进行进一步处理。

订阅主题

对于订阅主题,使用 channel 。它就像传统方法中的回调一样。当发布者向主题发布数据时, channel 将接收数据。

func (eb *EventBus)Subscribe(topic string, ch DataChannel)  {
   eb.rm.Lock()
   if prev, found := eb.subscribers[topic]; found {
      eb.subscribers[topic] = append(prev, ch)
   } else {
      eb.subscribers[topic] = append([]DataChannel{}, ch)
   }
   eb.rm.Unlock()
}
复制代码

简单地说,我们将订阅者添加到 channel 切片中然后给该结构加锁,最后在操作后将其解锁。

发布主题

要发布事件,发布者需要提供广播给订阅者所需要的主题和数据。

func (eb *EventBus) Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      // 这样做是因为切片引用相同的数组,即使它们是按值传递的
      // 因此我们正在使用我们的元素创建一个新切片,从而能正确地保持锁定
      channels := append(DataChannelSlice{}, chans...)
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic: topic}, channels)
   }
   eb.rm.RUnlock()
}
复制代码

在此方法中,首先我们检查主题是否存在任何订阅者。然后我们只是简单地遍历与主题相关的 channel 切片并把事件发布给它们。

请注意,我们在发布方法中使用了 goroutine 来避免阻塞发布者

开始

首先,我们需要创建一个事件总线的实例。在实际场景中,你可以从包中导出单个 EventBus使其像单例一样运行

var eb = &EventBus{
   subscribers: map[string]DataChannelSlice{},
}
复制代码

为了测试新创建的事件总线,我们将创建一个以随机间隔时间发布到指定主题的方法。

func publisTo(topic string, data string)  {
   for {
      eb.Publish(topic, data)
      time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   }
}
复制代码

接下来,我们需要一个可以收听主题的 main 函数。它使用辅助方法打印出事件的数据。

func printDataEvent(ch string, data DataEvent)  {
   fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func main()  {
   ch1 := make(chan DataEvent)
   ch2 := make(chan DataEvent)
   ch3 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   eb.Subscribe("topic2", ch2)
   eb.Subscribe("topic2", ch3)
   go publisTo("topic1", "Hi topic 1")
   go publisTo("topic2", "Welcome to topic 2")
   for {
      select {
      case d := <-ch1:
         go printDataEvent("ch1", d)
      case d := <-ch2:
         go printDataEvent("ch2", d)
      case d := <-ch3:
         go printDataEvent("ch3", d)
      }
   }
}
复制代码

我们创建了三个可以订阅主题的 channels 订阅者(ch1,ch2,ch3)。其中 ch2 和 ch3 这两个监听同一事件。

我们使用 select 语句从最快返回的 channel 中获取数据。然后它使用另一个 goroutine 打印输出数据。用 goroutine 也不是必需的。但在某些情况下,你必须对事件进行一些繁重的操作处理。为了防止阻塞 select,我们使用了 goroutine。

示例输出将如下所示

Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...
复制代码

你可以看到事件总线通过 channel 分发事件。

基于简单 channel 的事件总线的源代码。

完整的代码

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type DataEvent struct {
	Data  interface{}
	Topic string
}

// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice []DataChannel

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
	subscribers map[string]DataChannelSlice
	rm          sync.RWMutex
}

func (eb *EventBus) Publish(topic string, data interface{}) {
	eb.rm.RLock()
	if chans, found := eb.subscribers[topic]; found {
		// 这样做是因为切片引用相同的数组,即使它们是按值传递的
		// 因此我们正在使用我们的元素创建一个新切片,从而正确地保持锁定
		channels := append(DataChannelSlice{}, chans...)
		go func(data DataEvent, dataChannelSlices DataChannelSlice) {
			for _, ch := range dataChannelSlices {
				ch <- data
			}
		}(DataEvent{Data: data, Topic: topic}, channels)
	}
	eb.rm.RUnlock()
}

func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
	eb.rm.Lock()
	if prev, found := eb.subscribers[topic]; found {
		eb.subscribers[topic] = append(prev, ch)
	} else {
		eb.subscribers[topic] = append([]DataChannel{}, ch)
	}
	eb.rm.Unlock()
}

var eb = &EventBus{
	subscribers: map[string]DataChannelSlice{},
}

func printDataEvent(ch string, data DataEvent) {
	fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}

func publisTo(topic string, data string) {
	for {
		eb.Publish(topic, data)
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
	}
}

func main() {
	ch1 := make(chan DataEvent)
	ch2 := make(chan DataEvent)
	ch3 := make(chan DataEvent)

	eb.Subscribe("topic1", ch1)
	eb.Subscribe("topic2", ch2)
	eb.Subscribe("topic2", ch3)

	go publisTo("topic1", "Hi topic 1")
	go publisTo("topic2", "Welcome to topic 2")

	for {
		select {
		case d := <-ch1:
			go printDataEvent("ch1", d)
		case d := <-ch2:
			go printDataEvent("ch2", d)
		case d := <-ch3:
			go printDataEvent("ch3", d)
		}
	}
}
复制代码

使用 channel 取代回调的理由

传统的回调方式要求实现某种接口。

例如,

type Subscriber interface {
   onData(event Event)
}
复制代码

使用回调的话,如果你想订阅一个事件,你需要实现该接口,以便事件总线可以传播它。

type MySubscriber struct {
}
func (m MySubscriber) onData(event Event)  {
   // 处理事件
}
复制代码

channel 允许你在没有接口的情况下在一个简单的函数中注册订阅者。

func main() {
   ch1 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   fmt.Println((<-ch1).Data)
   ...
}
复制代码

结论

本文的目的是指出编写事件总线的不同实现方法。

这可能不是理想的解决方案。

例如, channel 被阻塞直到有人消费它们。这有一定的局限性。

我已经使用切片来存储主题的所有订阅者。这用于简化文章。这需要用 SET 替换,以至于列表中不存在重复的订阅者。

传统的回调方法可以使用提供的相同的原理去简单地实现。你可以轻松地在 goroutine 中进行异步装饰发布事件。

我很想听听你对这篇文章的看法。 :)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK