sync.Mutex是Go标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock
方法的调用上,直到锁被释放。
sync.Mutex
的实现也是经过多次的演化,功能逐步加强,增加了公平的处理和饥饿机制。
初版的 Mutex
首先我们来看看Russ Cox在2008提交的第一版的Mutex
实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| type Mutex struct { key int32; sema int32; } func xadd(val *int32, delta int32) (new int32) { for { v := *val; if cas(val, v, v+delta) { return v+delta; } } panic("unreached") } func (m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { return; } sys.semacquire(&m.sema); } func (m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { return; } sys.semrelease(&m.sema); }
|
可以看到,这简单几行就可以实现一个排外锁。通过cas
对 key
进行加一, 如果key
的值是从0
加到1
, 则直接获得了锁。否则通过semacquire
进行sleep, 被唤醒的时候就获得了锁。
2012年, commit dd2074c8做了一次大的改动,它将waiter count
(等待者的数量)和锁标识
分开来(内部实现还是合用使用state
字段)。新来的 goroutine 占优势,会有更大的机会获取锁。
获取锁, 指当前的gotoutine拥有锁的所有权,其它goroutine只有等待。
2015年, commit edcad863, Go 1.5中mutex
实现为全协作式的,增加了spin机制,一旦有竞争,当前goroutine就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。
2016年 commit 0556e262, Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。
目前这个版本的mutex
实现是相当的复杂, 如果你粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync函数的调用、正常模式和饥饿模式的改变等。
本文尝试解析当前sync.Mutex
的实现,梳理一下Lock
和Unlock
的实现。
源代码分析
根据Mutex
的注释,当前的Mutex
有如下的性质。这些注释将极大的帮助我们理解Mutex
的实现。
互斥锁有两种状态:正常状态和饥饿状态。
在正常状态下,所有等待锁的goroutine按照FIFO顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。
如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。
正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。
在分析源代码之前, 我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。
当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。
而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。
同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
sync.Mutex
只包含两个字段:
1 2 3 4
| type Mutex struct { state int32 sema uint32 }
|
state
是一个共用的字段, 第0个 bit 标记这个mutex
是否已被某个goroutine所拥有, 下面为了描述方便称之为state
已加锁,或者mutex
已加锁。 如果第0个 bit为0, 下文称之为state
未被锁, 此mutex目前没有被某个goroutine所拥有。
第1个 bit 标记这个mutex
是否已唤醒, 也就是有某个唤醒的goroutine
要尝试获取锁。
第2个 bit 标记这个mutex
状态, 值为1表明此锁已处于饥饿状态。
同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
Lock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
|
Unlock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func (m *Mutex) Unlock() { new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { runtime_Semrelease(&m.sema, true) } }
|
出个问题
最后我出一个问题,你可以根据Unlock
的代码分析,下面的哪个答案正确?
如果一个goroutine g1
通过Lock
获取了锁, 在持有锁的期间, 另外一个goroutine g2
调用Unlock
释放这个锁, 会出现什么现象?
- A、
g2
调用 Unlock
panic
- B、
g2
调用 Unlock
成功,但是如果将来 g1
调用 Unlock
会 panic
- C、
g2
调用 Unlock
成功,如果将来 g1
调用 Unlock
也成功
答案: B
运行下面的例子试试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package main import ( "sync" "time" ) func main() { var mu sync.Mutex go func() { mu.Lock() time.Sleep(10 * time.Second) mu.Unlock() }() time.Sleep(time.Second) mu.Unlock() select {} }
|