4

cache2go-源码阅读 - zhouweixin

 2 years ago
source link: https://www.cnblogs.com/zhouweixin/p/16538769.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

cache2go 是非常简短的 go 开源项目了,很适合作为第一个读源码项目。

如果你有一定的 go 开发经验,读起来会感觉到比较容易。

如果你刚刚接触 go 语音,基础知识还不完全了解,希望阅读本文时,遇到一个不会的知识点,去攻克一个,带着为了看懂本文源码的目的去学习基础知识。比如:

  • time.Timer
  • defer
  • sync.RWMutex

作者这样介绍:Concurrency-safe golang caching library with expiration capabilities,简单来说就是具有过期功能的并发安全 golang 缓存库,因此它具有两大特性:

该项目非常简单,全部逻辑由三个文件实现:

  • cache.go:缓存多个表。
  • cachetable.go:缓存一个表。
  • cacheitem.go:缓存表中的一个条目。

数据结构图:

1298908-20220801140307243-1119242614.jpg

接下来会自下而上地分析源码。

cacheitem.go

该文件中包含两块重要内容:

  • 结构体 CacheItem,用来缓存表中的一个条目。
  • 函数 NewCacheItem,用来创建 CacheItem 实例。

CacheItem

CacheItem 用来缓存表中的一个条目,属性解释:

  • sync.RWMutex:读写锁,保证并发读写安全。
  • key:键。
  • value:值,即数据。
  • lifeSpan:该条目的存活周期,即过期时间。
  • createdOn:创建时间。
  • accessedOn:上次访问时间。
  • accessCount:访问次数。
  • aboutToExpire:从缓存中删除项目之前触发的回调方法,可以在删除之前做一些自定义操作。

源码如下:

// CacheItem is an individual cache item
// Parameter data contains the user-set value in the cache.
type CacheItem struct {
	sync.RWMutex

	// The item's key.
	key interface{}
	// The item's data.
	data interface{}
	// How long will the item live in the cache when not being accessed/kept alive.
	lifeSpan time.Duration

	// Creation timestamp.
	createdOn time.Time
	// Last access timestamp.
	accessedOn time.Time
	// How often the item was accessed.
	accessCount int64

	// Callback method triggered right before removing the item from the cache
	aboutToExpire []func(key interface{})
}

Get 方法

下面是一些比较简单的 Get 方法,一些有写场景的属性会多两行获取锁与释放锁的代码。

// LifeSpan returns this item's expiration duration.
func (item *CacheItem) LifeSpan() time.Duration {
	// immutable
	return item.lifeSpan
}

// AccessedOn returns when this item was last accessed.
func (item *CacheItem) AccessedOn() time.Time {
	item.RLock()
	defer item.RUnlock()
	return item.accessedOn
}

// CreatedOn returns when this item was added to the cache.
func (item *CacheItem) CreatedOn() time.Time {
	// immutable
	return item.createdOn
}

// AccessCount returns how often this item has been accessed.
func (item *CacheItem) AccessCount() int64 {
	item.RLock()
	defer item.RUnlock()
	return item.accessCount
}

// Key returns the key of this cached item.
func (item *CacheItem) Key() interface{} {
	// immutable
	return item.key
}

// Data returns the value of this cached item.
func (item *CacheItem) Data() interface{} {
	// immutable
	return item.data
}

KeepAlive

保活函数:

  • 前两行代码表示:加锁保证并发安全读写。
  • 后两行代码表示:当被访问时,更新访问时间,同时访问次数加 1。
// KeepAlive marks an item to be kept for another expireDuration period.
func (item *CacheItem) KeepAlive() {
	item.Lock()
	defer item.Unlock()
	item.accessedOn = time.Now()
	item.accessCount++
}

AddAboutToExpireCallback

新增回调函数,回调函数无返回值,仅有一个参数 interface{},即支持任意的参数。

// AddAboutToExpireCallback appends a new callback to the AboutToExpire queue
func (item *CacheItem) AddAboutToExpireCallback(f func(interface{})) {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

SetAboutToExpireCallback

设置回调函数需要完全替代,不同于新增,需要先清空,再覆盖。

// SetAboutToExpireCallback configures a callback, which will be called right
// before the item is about to be removed from the cache.
func (item *CacheItem) SetAboutToExpireCallback(f func(interface{})) {
	if len(item.aboutToExpire) > 0 {
		item.RemoveAboutToExpireCallback()
	}
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = append(item.aboutToExpire, f)
}

RemoveAboutToExpireCallback

通过直接置空,删除所有的回调函数。

// RemoveAboutToExpireCallback empties the about to expire callback queue
func (item *CacheItem) RemoveAboutToExpireCallback() {
	item.Lock()
	defer item.Unlock()
	item.aboutToExpire = nil
}

NewCacheItem

创建 CacheItem 实例

func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	t := time.Now()
	return &CacheItem{
		key:           key,
		lifeSpan:      lifeSpan,
		createdOn:     t,
		accessedOn:    t,
		accessCount:   0,
		aboutToExpire: nil,
		data:          data,
	}
}

cachetable.go

该文件中总共有 3 个类:CacheTable、CacheItemPair 和 CacheItemPairList。

下面由简单到复杂逐个分析。

CacheItemPair

CacheItemPair 用来记录缓存访问的次数。

// CacheItemPair maps key to access counter
type CacheItemPair struct {
	Key         interface{}
	AccessCount int64
}

CacheItemPairList

CacheItemPairList 是 CacheItemPair 的切片,通过实现方法 Swap、Len 和 Less 实现了 sort.Interface,支持排序。

需要注意方法 Less 的实现,是元素 i 大于元素 j,这种实现是为了降序排序。降序排序是为了方法 CacheTable.MostAccessed 返回访问次数最多的条目列表。

// CacheItemPairList is a slice of CacheItemPairs that implements sort.
// Interface to sort by AccessCount.
type CacheItemPairList []CacheItemPair

func (p CacheItemPairList) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
func (p CacheItemPairList) Len() int           { return len(p) }
func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }

CacheTable

CacheTable 用来缓存一个表,属性解释:

  • sync.RWMutex:读写锁,保证并发读写安全。
  • name:表名。
  • items:表中的条目列表。
  • cleanupTimer:过期清除定时器。
  • cleanupInterval:过期清除的时间。
  • logger:打印日志的对象。
  • loadData:读取不存在 key 的回调函数,可以用来做初始化缓存的逻辑。
  • addedItem:新增条目时的回调函数,增加灵活性。
  • aboutToDeleteItem:删除条目前的回调函数,增加灵活性。

源码如下:

// CacheTable is a table within the cache
type CacheTable struct {
	sync.RWMutex

	// The table's name.
	name string
	// All cached items.
	items map[interface{}]*CacheItem

	// Timer responsible for triggering cleanup.
	cleanupTimer *time.Timer
	// Current timer duration.
	cleanupInterval time.Duration

	// The logger used for this table.
	logger *log.Logger

	// Callback method triggered when trying to load a non-existing key.
	loadData func(key interface{}, args ...interface{}) *CacheItem
	// Callback method triggered when adding a new item to the cache.
	addedItem []func(item *CacheItem)
	// Callback method triggered before deleting an item from the cache.
	aboutToDeleteItem []func(item *CacheItem)
}

下面会先介绍核心方法,再看简单的方法。

Add 新增条目

代码逻辑通过流程图描述了一下,其中的「过期检查」单独抽出来后面分析。

1298908-20220802150638869-1943616770.jpg

NotFoundAdd 和 Add 核心逻辑是一样的,具体区别不做额外描述,源代码如下:

// Add adds a key/value pair to the cache.
// Parameter key is the item's cache-key.
// Parameter lifeSpan determines after which time period without an access the item
// will get removed from the cache.
// Parameter data is the item's value.
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
	item := NewCacheItem(key, lifeSpan, data)

	// Add item to cache.
	table.Lock()
	table.addInternal(item)

	return item
}

func (table *CacheTable) addInternal(item *CacheItem) {
	// Careful: do not run this method unless the table-mutex is locked!
	// It will unlock it for the caller before running the callbacks and checks
	table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
	table.items[item.key] = item

	// Cache values so we don't keep blocking the mutex.
	expDur := table.cleanupInterval
	addedItem := table.addedItem
	table.Unlock()

	// Trigger callback after adding an item to cache.
	if addedItem != nil {
		for _, callback := range addedItem {
			callback(item)
		}
	}

	// If we haven't set up any expiration check timer or found a more imminent item.
	if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
		table.expirationCheck()
	}
}

// NotFoundAdd checks whether an item is not yet cached. Unlike the Exists
// method this also adds data if the key could not be found.
func (table *CacheTable) NotFoundAdd(key interface{}, lifeSpan time.Duration, data interface{}) bool {
	table.Lock()

	if _, ok := table.items[key]; ok {
		table.Unlock()
		return false
	}

	item := NewCacheItem(key, lifeSpan, data)
	table.addInternal(item)

	return true
}

expirationCheck 过期检查

过期检查的处理,是一个值得学习的点,这里并不是我们印象中用循环定期扫描哪些 key 过期了,也不是给每个条目分别定义一个定时器。

每次新增条目时,扫描得到最近过期条目的过期时间,仅定义一个定时器。该定时器触发时清除缓存,并生成下一个定时器,如此接力处理。

过期检查中会调用方法 table.deleteInternal 来清除过期的 key,这块儿在讲 Delete 方法时会再详细分析。

1298908-20220802153500968-1516910181.jpg
// Expiration check loop, triggered by a self-adjusting timer.
func (table *CacheTable) expirationCheck() {
	table.Lock()
	if table.cleanupTimer != nil {
		table.cleanupTimer.Stop()
	}
	if table.cleanupInterval > 0 {
		table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
	} else {
		table.log("Expiration check installed for table", table.name)
	}

	// To be more accurate with timers, we would need to update 'now' on every
	// loop iteration. Not sure it's really efficient though.
	now := time.Now()
	smallestDuration := 0 * time.Second
	for key, item := range table.items {
		// Cache values so we don't keep blocking the mutex.
		item.RLock()
		lifeSpan := item.lifeSpan
		accessedOn := item.accessedOn
		item.RUnlock()

		if lifeSpan == 0 {
			continue
		}
		if now.Sub(accessedOn) >= lifeSpan {
			// Item has excessed its lifespan.
			table.deleteInternal(key)
		} else {
			// Find the item chronologically closest to its end-of-lifespan.
			if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
				smallestDuration = lifeSpan - now.Sub(accessedOn)
			}
		}
	}

	// Setup the interval for the next cleanup run.
	table.cleanupInterval = smallestDuration
	if smallestDuration > 0 {
		table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
			go table.expirationCheck()
		})
	}
	table.Unlock()
}

Delete 方法

从流程图可以看出,这块儿大部分逻辑是在加锁、释放锁,有这么多锁主要是有如下几个原因:

  • 一部分是表级别的,一部分是条目级别的;
  • 表级别锁出现两次获取与释放,这种实现主要是考虑到 deleteInternal 的复用性,同时支持 Delete 和 expirationCheck 的调用,做了一些锁回溯的逻辑。思考:假如 Mutex 是可重入锁,是不是不需要回溯处理了?

1298908-20220802161928486-307032618.png

// Delete an item from the cache.
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
	table.Lock()
	defer table.Unlock()

	return table.deleteInternal(key)
}

func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
	r, ok := table.items[key]
	if !ok {
		return nil, ErrKeyNotFound
	}

	// Cache value so we don't keep blocking the mutex.
	aboutToDeleteItem := table.aboutToDeleteItem
	table.Unlock()

	// Trigger callbacks before deleting an item from cache.
	if aboutToDeleteItem != nil {
		for _, callback := range aboutToDeleteItem {
			callback(r)
		}
	}

	r.RLock()
	defer r.RUnlock()
	if r.aboutToExpire != nil {
		for _, callback := range r.aboutToExpire {
			callback(key)
		}
	}

	table.Lock()
	table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
	delete(table.items, key)

	return r, nil
}

Value 取值

取值本身是比较简单的,只不过这里要进行一些额外处理:

  • 取不值时,是否有自定义逻辑,比如降级查询后缓存进去。
  • 取到值时,更新访问时间,达到保活的目的。
1298908-20220802154439612-69892073.png
// Value returns an item from the cache and marks it to be kept alive. You can
// pass additional arguments to your DataLoader callback function.
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
	table.RLock()
	r, ok := table.items[key]
	loadData := table.loadData
	table.RUnlock()

	if ok {
		// Update access counter and timestamp.
		r.KeepAlive()
		return r, nil
	}

	// Item doesn't exist in cache. Try and fetch it with a data-loader.
	if loadData != nil {
		item := loadData(key, args...)
		if item != nil {
			table.Add(key, item.lifeSpan, item.data)
			return item, nil
		}

		return nil, ErrKeyNotFoundOrLoadable
	}

	return nil, ErrKeyNotFound
}

MostAccessed 最常访问的条目

这个方法用到了前文提到的 CacheItemPair 和 CacheItemPairList。

  • 首先遍历条目,取出 key 和 accessCount 存储到 p 中,用来排序;
  • 接着用有序的 p 映射出所有条目的顺序,返回有序的条目。
// MostAccessed returns the most accessed items in this cache table
func (table *CacheTable) MostAccessed(count int64) []*CacheItem {
	table.RLock()
	defer table.RUnlock()

	p := make(CacheItemPairList, len(table.items))
	i := 0
	for k, v := range table.items {
		p[i] = CacheItemPair{k, v.accessCount}
		i++
	}
	sort.Sort(p)

	var r []*CacheItem
	c := int64(0)
	for _, v := range p {
		if c >= count {
			break
		}

		item, ok := table.items[v.Key]
		if ok {
			r = append(r, item)
		}
		c++
	}

	return r
}

Foreach 方法

为开发者提供更加丰富的自定义操作。

// Foreach all items
func (table *CacheTable) Foreach(trans func(key interface{}, item *CacheItem)) {
	table.RLock()
	defer table.RUnlock()

	for k, v := range table.items {
		trans(k, v)
	}
}

清空缓存的方法比较简单,一方面是数据的清空,另一方面是定时器的清空。

// Flush deletes all items from this cache table.
func (table *CacheTable) Flush() {
	table.Lock()
	defer table.Unlock()

	table.log("Flushing table", table.name)

	table.items = make(map[interface{}]*CacheItem)
	table.cleanupInterval = 0
	if table.cleanupTimer != nil {
		table.cleanupTimer.Stop()
	}
}

查询相关方法

Count 和 Exists 方法是比较简单的,不用多说。

// Count returns how many items are currently stored in the cache.
func (table *CacheTable) Count() int {
	table.RLock()
	defer table.RUnlock()
	return len(table.items)
}

// Exists returns whether an item exists in the cache. Unlike the Value method
// Exists neither tries to fetch data via the loadData callback nor does it
// keep the item alive in the cache.
func (table *CacheTable) Exists(key interface{}) bool {
	table.RLock()
	defer table.RUnlock()
	_, ok := table.items[key]

	return ok
}

Set 相关方法

下面这些 Set 方法比较简单,也不多做赘述。

// SetDataLoader configures a data-loader callback, which will be called when
// trying to access a non-existing key. The key and 0...n additional arguments
// are passed to the callback function.
func (table *CacheTable) SetDataLoader(f func(interface{}, ...interface{}) *CacheItem) {
	table.Lock()
	defer table.Unlock()
	table.loadData = f
}

// SetAddedItemCallback configures a callback, which will be called every time
// a new item is added to the cache.
func (table *CacheTable) SetAddedItemCallback(f func(*CacheItem)) {
	if len(table.addedItem) > 0 {
		table.RemoveAddedItemCallbacks()
	}
	table.Lock()
	defer table.Unlock()
	table.addedItem = append(table.addedItem, f)
}

//AddAddedItemCallback appends a new callback to the addedItem queue
func (table *CacheTable) AddAddedItemCallback(f func(*CacheItem)) {
	table.Lock()
	defer table.Unlock()
	table.addedItem = append(table.addedItem, f)
}

// SetAboutToDeleteItemCallback configures a callback, which will be called
// every time an item is about to be removed from the cache.
func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) {
	if len(table.aboutToDeleteItem) > 0 {
		table.RemoveAboutToDeleteItemCallback()
	}
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
}

// AddAboutToDeleteItemCallback appends a new callback to the AboutToDeleteItem queue
func (table *CacheTable) AddAboutToDeleteItemCallback(f func(*CacheItem)) {
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = append(table.aboutToDeleteItem, f)
}

// SetLogger sets the logger to be used by this cache table.
func (table *CacheTable) SetLogger(logger *log.Logger) {
	table.Lock()
	defer table.Unlock()
	table.logger = logger
}

删除相关方法

过于简单,不做赘述

// RemoveAddedItemCallbacks empties the added item callback queue
func (table *CacheTable) RemoveAddedItemCallbacks() {
	table.Lock()
	defer table.Unlock()
	table.addedItem = nil
}

// RemoveAboutToDeleteItemCallback empties the about to delete item callback queue
func (table *CacheTable) RemoveAboutToDeleteItemCallback() {
	table.Lock()
	defer table.Unlock()
	table.aboutToDeleteItem = nil
}

cache.go

Cache 函数是该缓存库的入口函数,该函数存在一段双检逻辑,需要特别了解下原因:

  • mutex.Lock() 获取锁过程中,可能另一协程已经完成了初始化。因此,需要再次校验。
// Cache returns the existing cache table with given name or creates a new one
// if the table does not exist yet.
func Cache(table string) *CacheTable {
	mutex.RLock()
	t, ok := cache[table]
	mutex.RUnlock()

	if !ok {
		mutex.Lock()
		t, ok = cache[table]
		// Double check whether the table exists or not.
		if !ok {
			t = &CacheTable{
				name:  table,
				items: make(map[interface{}]*CacheItem),
			}
			cache[table] = t
		}
		mutex.Unlock()
	}

	return t
}

examples

样例也比较简单,读者可以自行阅读下。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK