9

golang中的锁源码实现:Mutex

 3 years ago
source link: http://legendtkl.com/2016/10/23/golang-mutex/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

golang中的锁源码实现:Mutex

2016-10-232019-11-08 底层技术

上一篇文章 中我提到了锁,准确地说是信号量(semaphore, mutext是semaphore的一种)的实现方式有两种:wait的时候忙等待或者阻塞自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//忙等待
wait(S) {
while(S<=0)
; //no-op
S--
}
//阻塞
wait(semaphore *S) {
S->value--;
if (S->value < 0) {
add this process to S->list;
block()
}
}

忙等待和阻塞方式各有优劣:

  • 忙等待会使CPU空转,好处是如果在当前时间片内锁被其他进程释放,当前进程直接就能拿到锁而不需要CPU进行进程调度了。适用于锁占用时间较短的情况,且不适合于单处理器。
  • 阻塞不会导致CPU空转,但是进程切换也需要代价,比如上下文切换,CPU Cache Miss。

下面看一下golang的源码里面是怎么实现锁的。golang里面的锁有两个特性:
1.不支持嵌套锁
2.可以一个goroutine lock,另一个goroutine unlock

golang中的互斥锁定义在src/sync/mutex.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)

看上去也是使用信号量的方式来实现的。sema就是信号量,一个非负数;state表示Mutex的状态。mutexLocked表示锁是否可用(0可用,1被别的goroutine占用),mutexWoken=2表示mutex是否被唤醒,mutexWaiterShift=2表示统计阻塞在该mutex上的goroutine数目需要移位的数值。将3个常量映射到state上就是

1
2
3
4
5
6
7
8
state:   |32|31|...|3|2|1|
\__________/ | |
| | |
| | mutex的占用状态(1被占用,0可用)
| |
| mutex的当前goroutine是否被唤醒
|
当前阻塞在mutex上的goroutine数

1.Lock

下面看一下mutex的lock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

这里要解释一下atomic.CompareAndSwapInt32()atomic包是由golang提供的low-level的原子操作封装,主要用来解决进程同步为题,官方并不建议直接使用。我在上一篇文章中说过,操作系统级的锁的实现方案是提供原子操作,然后基本上所有锁相关都是通过这些原子操作来实现。CompareAndSwapInt32()就是int32型数字的compare-and-swap实现。cas(&addr, old, new)的意思是if *addr==old, *addr=new。大部分操作系统支持CAS,x86指令集上的CAS汇编指令是CMPXCHG。下面我们继续看上面的lock函数。

1
2
3
4
5
6
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

首先先忽略race.Enabled相关代码,这个是go做race检测时候用的,这个时候需要带上-race,则race.Enabled被置为true。Lock函数的入口处先调用CAS尝试去获得锁,如果m.state==0,则将其置为1,并返回。

继续往下看,首先将m.state的值保存到old变量中,new=old|mutexLocked。直接看能让for退出的第三个if条件,首先调用CAS试图将m.state设置成new的值。然后看一下if里面,如果m.state之前的值也就是old如果没有被占用则表示当前goroutine拿到了锁,则break。我们先看一下new的值的变化,第一个if条件里面new = old + 1<<mutexWaiterShift,结合上面的mutex的state各个位的意义,这句话的意思表示mutex的等待goroutine数目加1。还有awoke为true的情况下,要将m.state的标志位取消掉,也就是这句new &^= mutexWoken的作用。继续看第三个if条件里面,如果里面的if判断失败,则走到runtime_Semacquire()。

看一下这个函数runtime_Semacquire()函数,由于golang1.5之后把之前C语言实现的代码都干掉了,所以现在很低层的代码都是go来实现的。通过源码中的定义我们可以知道这个其实就是信号量的wait操作:等待*s>0,然后减1。编译器里使用的是sync_runtime.semacquire()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, true)
}

func semacquire(addr *uint32, profile bool) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// Easy case.
if cansemacquire(addr) {
return
}

// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
for {
lock(&root.lock)
// Add ourselves to nwait to disable "easy case" in semrelease.
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
root.queue(addr, s)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3)
}
releaseSudog(s)
}

上面的代码有点多,我们只看和锁相关的代码。

1
2
3
4
5
root := semroot(addr)   //seg 1

atomic.Xadd(&root.nwait, 1) // seg 2

root.queue(addr, s) //seg 3

seg 1代码片段semroot()返回结构体semaRoot。存储方式是先对信号量的地址做移位,然后做哈希(对251取模,这个地方为什么是左移3位和对251取模不太明白)。semaRoot相当于和mutex.sema绑定。看一下semaRoot的结构:一个sudog链表和一个nwait整型字段。nwait字段表示该信号量上等待的goroutine数目。head和tail表示链表的头和尾巴,同时为了线程安全,需要使用一个互斥量来保护链表。这个时候细心的同学应该注意到一个问题,我们前面不是从Mutex跟过来的吗,相当于Mutex的实现了使用了Mutex本身?实际上semaRoot里面的mutex只是内部使用的一个简单版本,和sync.Mutex不是同一个。现在把这些倒推回去,runtime_Semacquire()的作用其实就是semaphore的wait(&s):如果*s<0,则将当前goroutine塞入信号量s关联的goroutine waiting list,并休眠。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

type semaRoot struct {
lock mutex
head *sudog
tail *sudog
nwait uint32 // Number of waiters. Read w/o the lock.
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
root semaRoot
pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
}

现在mutex.Lock()还剩下runtime_canSpin(iter)这一段,这个地方其实就是锁的自旋版本。golang对于自旋锁的取舍做了一些限制:1.多核; 2.GOMAXPROCS>1; 3.至少有一个运行的P并且local的P队列为空。golang的自旋尝试只会做几次,并不会一直尝试下去,感兴趣的可以跟一下源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq on on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}

Unlock

Mutex的Unlock函数定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}

old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}
old = m.state
}
}

函数入口处的四行代码和race detection相关,暂时不用管。接下来的四行代码是判断是否是嵌套锁。new是m.state-1之后的值。我们重点看for循环内部的代码。

1
2
3
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}

这两句是说:如果阻塞在该锁上的goroutine数目为0或者mutex处于lock或者唤醒状态,则返回。

1
2
3
4
5
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema)
return
}

这里先将阻塞在mutex上的goroutine数目减一,然后将mutex置于唤醒状态。runtime_Semreleaseruntime_Semacquire的作用刚好相反,将阻塞在信号量上goroutine唤醒。有人可能会问唤醒的是哪个goroutine,那么我们可以看一下goroutine wait list的入队列和出队列代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (root *semaRoot) queue(addr *uint32, s *sudog) {
s.g = getg()
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = root.tail
if root.tail != nil {
root.tail.next = s
} else {
root.head = s
}
root.tail = s
}

func (root *semaRoot) dequeue(s *sudog) {
if s.next != nil {
s.next.prev = s.prev
} else {
root.tail = s.prev
}
if s.prev != nil {
s.prev.next = s.next
} else {
root.head = s.next
}
s.elem = nil
s.next = nil
s.prev = nil
}

如上所示,wait list入队是插在队尾,出队是从头出。

  • 《Go语言学习笔记》

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK