6

Go - atomic包使用及atomic.Value源码分析

 4 years ago
source link: https://studygolang.com/articles/26288
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

原子性:一个或多个操作在CPU的执行过程中不被中断的特性,称为原子性。这些操作对外表现成一个不可分割的整体,他们要么都执行,要么都不执行,外界不会看到他们只执行到一半的状态。

原子操作: 进行过程中不能被中断的操作,原子操作由底层硬件支持,而锁则是由操作系统提供的API实现,若实现相同的功能,前者通常会更有效率

最小案例:

package main

import (
	"sync"
	"fmt"
)

var count int

func add(wg *sync.WaitGroup) {
	defer wg.Done()
	count++
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(1000)
	for i := 0; i < 1000; i++ {
		go add(&wg)
	}
	wg.Wait()
	fmt.Println(count)
}
复制代码

count 不会等于1000,因为 count++ 这一步实际是三个操作:

count
count = count + 1
count

因此就会出现多个goroutine读取到相同的数值,然后更新同样的数值到内存,导致最终结果比预期少

2. Go中sync/atomic包

Go语言提供的原子操作都是非入侵式的,由标准库中 sync/aotomic 中的众多函数代表

atomic包中支持六种类型

int32
uint32
int64
uint64
uintptr
unsafe.Pointer

对于每一种类型,提供了五类原子操作:

  • LoadXXX(addr) : 原子性的获取 *addr 的值,等价于:
    return *addr
    复制代码
  • StoreXXX(addr, val) : 原子性的将 val 的值保存到 *addr ,等价于:
    addr = val
    复制代码
  • AddXXX(addr, delta) : 原子性的将 delta 的值添加到 *addr 并返回新值( unsafe.Pointer 不支持),等价于:
    *addr += delta
    return *addr
    复制代码
  • SwapXXX(addr, new) old : 原子性的将 new 的值保存到 *addr 并返回旧值,等价于:
    old = *addr
    *addr = new
    return old
    复制代码
  • CompareAndSwapXXX(addr, old, new) bool : 原子性的比较 *addrold ,如果相同则将 new 赋值给 *addr 并返回 true ,等价于:
    if *addr == old {
        *addr = new
        return true
    }
    return false
    复制代码
6reYn2n.png!web

因此第一部分的案例可以修改如下,即可通过

// 修改方式1
func add(wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		if atomic.CompareAndSwapInt32(&count, count, count+1) {
			break
		}
	}
}
// 修改方式2
func add(wg *sync.WaitGroup) {
	defer wg.Done()
	atomic.AddInt32(&count, 1)
}
复制代码

3. 扩大原子操作的适用范围:atomic.Value

Go语言在1.4版本的时候向 sync/atomic 包中添加了新的类型 Value ,此类型相当于一个容器,被用来"原子地"存储(Store)和加载任意类型的值

  • type Value
    atomic.Value
    

比如作者写文章时是22岁,写着写着就23岁了..

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	// 此处依旧选用简单的数据类型,因为代码量少
	config := atomic.Value{}
	config.Store(22)

	wg := sync.WaitGroup{}
	wg.Add(10)
	for i := 0; i < 10; i++ {
		go func(i int) {
			defer wg.Done()
			// 在某一个goroutine中修改配置
			if i == 0 {
				config.Store(23)
			}
			// 输出中夹杂22,23
			fmt.Println(config.Load())
		}(i)
	}
	wg.Wait()
}
复制代码

4. atomic.Value源码分析

atomic.Value 被设计用来存储任意类型的数据,所以它内部的字段是一个 interface{} 类型

type Value struct {
	v interface{}
}
复制代码

还有一个 ifaceWords 类型,作为空interface的内部表示格式, typ代表原始类型,data代表真正的值

// ifaceWords is interface{} internal representation.
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}
复制代码

4.1 unsafe.Pointer

Go语言并不支持直接操作内存,但是它的标准库提供一种 不保证向后兼容的指针类型 unsafe.Pointer , 让程序可以灵活的操作内存,它的特别之处在于: 可以绕过Go语言类型系统的检查

也就是说: 如果两种类型具有相同的内存结构,我们可以将 unsafe.Pointer 当作桥梁,让这两种类型的指针相互转换,从而实现同一份内存拥有两种解读方式

例如int类型和int32类型内部的存储结构是一致的,但是对于指针类型的转换需要这么做:

var a int32
// 获得a的*int类型指针
(*int)(unsafe.Pointer(&a))
复制代码

4.2 实现原子性的读取任意结构操作

func (v *Value) Load() (x interface{}) {
    // 将*Value指针类型转换为*ifaceWords指针类型
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 原子性的获取到v的类型typ的指针
	typ := LoadPointer(&vp.typ)
	// 如果没有写入或者正在写入,先返回,^uintptr(0)代表过渡状态,见下文
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		return nil
	}
	// 原子性的获取到v的真正的值data的指针,然后返回
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}
复制代码

4.3 实现原子性的存储任意结构操作

在此之前有一段较为重要的代码,其中 runtime_procPin 方法可以将一个goroutine死死占用当前使用的 P (此处参考 Goroutine调度器(一):P、M、G关系 , 不发散了) 不允许其他的goroutine抢占,而 runtime_procUnpin 则是释放方法

// Disable/enable preemption, implemented in runtime.
func runtime_procPin()
func runtime_procUnpin()
复制代码

Store 方法

func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")
	}
	// 将现有的值和要写入的值转换为ifaceWords类型,这样下一步就能获取到它们的原始类型和真正的值
	vp := (*ifaceWords)(unsafe.Pointer(v))
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	for {
		// 获取现有的值的type
		typ := LoadPointer(&vp.typ)
		// 如果typ为nil说明这是第一次Store
		if typ == nil {
			// 如果你是第一次,就死死占住当前的processor,不允许其他goroutine再抢
			runtime_procPin()
			// 使用CAS操作,先尝试将typ设置为^uintptr(0)这个中间状态
			// 如果失败,则证明已经有别的线程抢先完成了赋值操作
			// 那它就解除抢占锁,然后重新回到 for 循环第一步
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin()
				continue
			}
			// 如果设置成功,说明当前goroutine中了jackpot
			// 那么就原子性的更新对应的指针,最后解除抢占锁
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin()
			return
		}
		// 如果typ为^uintptr(0)说明第一次写入还没有完成,继续循环等待
		if uintptr(typ) == ^uintptr(0) {
			continue
		}
		// 如果要写入的类型和现有的类型不一致,则panic
		if typ != xp.typ {
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		// 更新data
		StorePointer(&vp.data, xp.data)
		return
	}
}
复制代码

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK