40

由浅入深聊聊Golang的sync.Map

 5 years ago
source link: https://www.tuicool.com/articles/qENJni6
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

今天在技术群中有小伙伴讨论并发安全的东西,其实之前就有写过map相关文章: 由浅入深聊聊Golang的map 。但是没有详细说明sync.Map是怎么一回事。 回想了一下,竟然脑中只剩下“两个map、一个只读一个读写,xxxxx”等,关键词。有印象能扯,但是有点乱,还是写一遍简单记录一下吧。

1.为什么需要sync.Map? 2.sync.Map如何使用? 3.理一理sync.Map源码实现? 4.sync.Map的优缺点? 5.思维扩散?

正文

1.为什么需要sync.Map?

关于map可以直接查看 由浅入深聊聊Golang的map ,不再赘述。

为什么需要呢? 原因很简单,就是:map在并发情况虚啊,只读是线程安全的,同时写线程不安全,所以为了 并发安全 & 高效 ,官方实现了一把。

1.1 并发写map会有什么问题?

来看看不使用sync.Map的map是如何实现并发安全的:

func main() {
	m := map[int]int {1:1}
	go do(m)
	go do(m)

	time.Sleep(1*time.Second)
	fmt.Println(m)
}

func do (m map[int]int) {
	i := 0
	for i < 10000 {
		m[1]=1
		i++
	}
}
复制代码

输出:

fatal error: concurrent map writes
复制代码

oh,no。 报错说的很明显,这哥们不能同时写。

1.2 低配版解决方案

加一把大锁。

// 大家好,我是那把大锁
var s sync.RWMutex
func main() {
	m := map[int]int {1:1}
	go do(m)
	go do(m)

	time.Sleep(1*time.Second)
	fmt.Println(m)
}

func do (m map[int]int) {
	i := 0
	for i < 10000 {
	    // 加锁
		s.Lock()
		m[1]=1
		// 解锁
		s.Unlock()
		i++
	}
}
复制代码

输出:

map[1:1]
复制代码

这回终于正常了,但是会有什么问题呢? 加大锁大概率都不是最优解,一般都会有 效率问题 。 通俗说就是 加大锁影响其他的元素操作 了。

解决思路:减少加锁时间。
方法: 1.空间换时间。  2.降低影响范围。
复制代码

sync.Map就是用了以上的思路。继续往下看。

nmQfiy6.jpg!web

2.sync.Map如何使用?

上代码:

func main() {
    // 关键人物出场
	m := sync.Map{}
	m.Store(1,1)
	go do(m)
	go do(m)

	time.Sleep(1*time.Second)
	fmt.Println(m.Load(1))
}

func do (m sync.Map) {
	i := 0
	for i < 10000 {
		m.Store(1,1)
		i++
	}
}
复制代码

输出:

1 true
复制代码

运行ok。这把秀了。

3.理一理sync.Map源码实现?

先白话文说下大概逻辑。让下文看的更快。(大概只有是这样流程就好) 写:直写。 读:先读read,没有再读dirty。

nIRj6jj.jpg!web

从“基础结构 + 增删改查”的思路来详细过一遍源码。

3.1 基础结构

sync.Map的核心数据结构:

type Map struct {
	mu Mutex
	read atomic.Value // readOnly
	dirty map[interface{}]*entry
	misses int
}
复制代码
说明 类型 作用 mu Mutex 加锁作用。保护后文的dirty字段 read atomic.Value 存读的数据。因为是atomic.Value类型,只读,所以并发是安全的。实际存的是readOnly的数据结构。 misses int 计数作用。每次从read中读失败,则计数+1。 dirty map[interface{}]*entry 包含最新写入的数据。当misses计数达到一定值,将其赋值给read。

这里有必要简单描述一下,大概的逻辑,

readOnly的数据结构:

type readOnly struct {
    m  map[interface{}]*entry
    amended bool 
}
复制代码
说明 类型 作用 m map[interface{}]*entry 单纯的map结构 amended bool Map.dirty的数据和这里的 m 中的数据不一样的时候,为true

entry的数据结构:

type entry struct {
    //可见value是个指针类型,虽然read和dirty存在冗余情况(amended=false),但是由于是指针类型,存储的空间应该不是问题
    p unsafe.Pointer // *interface{}
}
复制代码

这个结构体主要是想说明。虽然前文read和dirty存在冗余的情况,但是由于value都是指针类型,其实存储的空间其实没增加多少。

3.2 查询

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 因read只读,线程安全,优先读取
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    
    // 如果read没有,并且dirty有新数据,那么去dirty中查找
    if !ok && read.amended {
        m.mu.Lock()
        // 双重检查(原因是前文的if判断和加锁非原子的,害怕这中间发生故事)
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        
        // 如果read中还是不存在,并且dirty中有新数据
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // m计数+1
            m.missLocked()
        }
        
        m.mu.Unlock()
    }
    
    if !ok {
        return nil, false
    }
    return e.load()
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    
    // 将dirty置给read,因为穿透概率太大了(原子操作,耗时很小)
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}
复制代码

流程图:

UZ7vemu.png!web

这边有几个点需要强调一下:

如何设置阀值?

这里采用 miss计数和dirty长度 的比较,来进行阀值的设定。

为什么dirty可以直接换到read?

因为写操作只会操作dirty,所以保证了dirty是最新的,并且数据集是肯定包含read的。 (可能有同学疑问,dirty不是下一步就置为nil了,为何还包含?后文会有解释。)

为什么dirty置为nil?

我不确定这个原因。猜测:一方面是当read完全等于dirty的时候,读的话read没有就是没有了,即使穿透也是一样的结果,所以存的没啥用。另一方是当存的时候,如果元素比较多,影响插入效率。

3.3 删

func (m *Map) Delete(key interface{}) {
    // 读出read,断言为readOnly类型
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // 如果read中没有,并且dirty中有新元素,那么就去dirty中去找。这里用到了amended,当read与dirty不同时为true,说明dirty中有read没有的数据。
    
    if !ok && read.amended {
        m.mu.Lock()
        // 再检查一次,因为前文的判断和锁不是原子操作,防止期间发生了变化。
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        
        if !ok && read.amended {
            // 直接删除
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    
    if ok {
    // 如果read中存在该key,则将该value 赋值nil(采用标记的方式删除!)
        e.delete()
    }
}

func (e *entry) delete() (hadValue bool) {
    for {
    	// 再次再一把数据的指针
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        
        // 原子操作
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}
复制代码

流程图:

yAfQBvf.png!web

这边有几个点需要强调一下:

1.为什么dirty是直接删除,而read是标记删除?

read的作用是在dirty前头优先度,遇到相同元素的时候为了不穿透到dirty,所以采用标记的方式。 同时正是因为这样的机制+amended的标记,可以保证read找不到&&amended=false的时候,dirty中肯定找不到

2.为什么dirty是可以直接删除,而没有先进行读取存在后删除?

删除成本低。读一次需要寻找,删除也需要寻找,无需重复操作。

3.如何进行标记的?

将值置为nil。(这个很关键)

3.4 增(改)

func (m *Map) Store(key, value interface{}) {
    // 如果m.read存在这个key,并且没有被标记删除,则尝试更新。
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
    
    // 如果read不存在或者已经被标记删除
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
   
    if e, ok := read.m[key]; ok { // read 存在该key
    // 如果entry被标记expunge,则表明dirty没有key,可添加入dirty,并更新entry。
        if e.unexpungeLocked() { 
            // 加入dirty中,这儿是指针
            m.dirty[key] = e
        }
        // 更新value值
        e.storeLocked(&value) 
        
    } else if e, ok := m.dirty[key]; ok { // dirty 存在该key,更新
        e.storeLocked(&value)
        
    } else { // read 和 dirty都没有
        // 如果read与dirty相同,则触发一次dirty刷新(因为当read重置的时候,dirty已置为nil了)
        if !read.amended { 
            // 将read中未删除的数据加入到dirty中
            m.dirtyLocked() 
            // amended标记为read与dirty不相同,因为后面即将加入新数据。
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value) 
    }
    m.mu.Unlock()
}

// 将read中未删除的数据加入到dirty中
func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }
    
    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    
    // 遍历read。
    for k, e := range read.m {
        // 通过此次操作,dirty中的元素都是未被删除的,可见标记为expunged的元素不在dirty中!!!
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

// 判断entry是否被标记删除,并且将标记为nil的entry更新标记为expunge
func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    
    for p == nil {
        // 将已经删除标记为nil的数据标记为expunged
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

// 对entry尝试更新 (原子cas操作)
func (e *entry) tryStore(i *interface{}) bool {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}

// read里 将标记为expunge的更新为nil
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// 更新entry
func (e *entry) storeLocked(i *interface{}) {
    atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
复制代码

流程图:

ERnInmf.png!web

这边有几个点需要强调一下:

  1. read中的标记为已删除的区别?

标记为nil,说明是正常的delete操作,此时dirty中不一定存在 a. dirty赋值给read后,此时dirty不存在 b. dirty初始化后,肯定存在

标记为expunged,说明是在dirty初始化的时候操作的,此时dirty中肯定不存在。

  1. 可能存在性能问题?

初始化dirty的时候,虽然都是指针赋值,但read如果较大的话,可能会有些影响。

4.sync.Map的优缺点?

先说结论,后来证明。

优点:是官方出的,是亲儿子;通过读写分离,降低锁时间来提高效率; 缺点:不适用于大量写的场景,这样会导致read map读不到数据而进一步加锁读取,同时dirty map也会一直晋升为read map,整体性能较差。 适用场景:大量读,少量写

这里主要证明一下,为什么适合 大量读,少量写 。 代码的大概思路:通过比较单纯的map和sync.Map,在并发安全的情况下,只写和读写的效率

var s sync.RWMutex
var w sync.WaitGroup
func main() {
	mapTest()
	syncMapTest()
}
func mapTest() {
	m := map[int]int {1:1}
	startTime := time.Now().Nanosecond()
	w.Add(1)
	go writeMap(m)
	w.Add(1)
	go writeMap(m)
	//w.Add(1)
	//go readMap(m)

	w.Wait()
	endTime := time.Now().Nanosecond()
	timeDiff := endTime-startTime
	fmt.Println("map:",timeDiff)
}

func writeMap (m map[int]int) {
	defer w.Done()
	i := 0
	for i < 10000 {
		// 加锁
		s.Lock()
		m[1]=1
		// 解锁
		s.Unlock()
		i++
	}
}

func readMap (m map[int]int) {
	defer w.Done()
	i := 0
	for i < 10000 {
		s.RLock()
		_ = m[1]
		s.RUnlock()
		i++
	}
}

func syncMapTest() {
	m := sync.Map{}
	m.Store(1,1)
	startTime := time.Now().Nanosecond()
	w.Add(1)
	go writeSyncMap(m)
	w.Add(1)
	go writeSyncMap(m)
	//w.Add(1)
	//go readSyncMap(m)

	w.Wait()
	endTime := time.Now().Nanosecond()
	timeDiff := endTime-startTime
	fmt.Println("sync.Map:",timeDiff)
}

func writeSyncMap (m sync.Map) {
	defer w.Done()
	i := 0
	for i < 10000 {
		m.Store(1,1)
		i++
	}
}

func readSyncMap (m sync.Map) {
	defer w.Done()
	i := 0
	for i < 10000 {
		m.Load(1)
		i++
	}
}
复制代码
情况 结果 只写 map: 1,022,000 sync.Map: 2,164,000 读写 map: 8,696,000 sync.Map: 2,047,000

会发现大量写的场景下,由于sync.Map里头操作更多其实,所以效率没有单纯的map+metux高。

5.思维扩散?

想一想,mysql加锁,是不是有表级锁、行级锁,前文的sync.RWMutex加锁方式相当于表级锁。

而sync.Map其实也是相当于表级锁,只不过多读写分了两个map,本质还是一样的。 既然这样,那就自然知道优化方向了:就是把锁的粒度尽可能降低来提高运行速度。

思路:对一个大map进行hash,其内部是n个小map,根据key来来hash确定在具体的那个小map中,这样加锁的粒度就变成1/n了。 网上找了下,真有大佬实现了: 点这里

(是的,我偷懒了,哈哈,这是拷贝自己之前写的文章)

R3YZBbv.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK