Mutex是最常用的一种同步原语,它提供了互斥锁的功能,多线程可以互斥访问共享数据以及通过锁保护临界区。Rust标准库提供了Mutex的实现,接下来我们看看它是怎么实现的。
Mutex的定义
Mutex
包含三个字段。
一个是内部实现的锁(sys::Mutex
),根据不同的操作系统,可能选择不同的实现。
一个是poison
,用来标记锁是否被破坏,是否中毒了。
最后一个是data
,用来存储被保护的数据。
|
|
另外一个关联的数据结构是MutexGuard
, 它是Mutex
的一个智能指针,用来管理锁的生命周期。它实现了Deref
和Drop
,所以可以通过*
来访问被保护的数据,当MutexGuard
离开作用域时,会自动释放锁。
|
|
当请求锁时,调用内部的sys::Mutex
上锁,并且返回一个MutexGuard
,其实严格的说,返回一个LockResult
,它是一个Result
,当锁中毒时,返回Err
,否则返回Ok
,Ok
中包含了MutexGuard
。
|
|
poison
如果你不太了解,可以看看这篇文章,我们不进一步介绍它了。MutexGuard
功能已经介绍了, 接下来我们看看它的实现:
|
|
MutexGuard
的解引用返回被保护的数据,返回的是一个引用和可变引用。
当MutexGuard
离开作用域时,会自动释放锁,它的实现如下,可以看到它调用了self.lock.inner.unlock()
:
|
|
try_lock
调用inner
的try_lock
,每什么好说的:
|
|
目前rust还提供了一个不稳定的方法unlock
,主动的立即释放锁,其实就是调用了drop(guard)
:
|
|
那么看起来,锁的主要逻辑是inner
实现的,接下来我们看看inner
的实现。
inner
的类型是sys::Mutex
,它位于library/std/src/sys/mod.rs,根据操作系统的不同,有不同的实现,我们主要看linux(unix)的实现。
unix的实现位于/library/std/src/sys/unix/locks, 根据具体的操作系统,也有两种实现方案,我们主要看linux的实现。
|
|
它有两种实现futex_mutex
和pthread_mutex.rs
, Linux操作系统下,使用的是futex_mutex
。
futex_mutex
会使用Linux操作系统的futex_wait
和futex_wake
系统调用。
它只包含一个futex
字段,它是一个AtomicU32
,用来表示锁的状态,它有三种状态:
- 0: 未加锁状态·
- 1: 加锁,且没有其它线程等待
- 2: 加锁,且有其它线程等待(竞争)
|
|
new
创建一个未加锁的Muext, 而try_lock
会尝试将futex
从0改为1,如果成功,表示加锁成功,否则失败。
注意这里使用了 Acquire
和Relaxed
Ordering,交换成功新值内存可见(Acquire
),失败无所谓了(Relaxed
):
|
|
rust
impl Mutex {
...
#[inline]
pub fn lock(&self) {
if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
self.lock_contended(); // 如果第一次抢不到锁,进入锁竞争场景的处理
}
}
#[cold]
fn lock_contended(&self) {
// 获取锁的状态
let mut state = self.spin();
// 如果未加速,去抢!
if state == 0 {
match self.futex.compare_exchange(0, 1, Acquire, Relaxed) {
Ok(_) => return, // 抢成功,返回
Err(s) => state = s, // 失败,把当前的状态赋值给state
}
}
loop {
// 运气来了,在多个人抢一个未加锁的锁时,你,命中天子抢到了锁,并且把锁状态从0改为2
if state != 2 && self.futex.swap(2, Acquire) == 0 {
// 成功把锁的状态从0改成了2,获取了锁,返回
return;
}
// 等待锁状态变化,如果锁状态不是2,就不等了
futex_wait(&self.futex, 2, None);
// 别的线程释放了锁,spin检查锁的状态,再集中精力继续抢
state = self.spin();
}
}
fn spin(&self) -> u32 {
let mut spin = 100;
loop {
// 获取当前锁的状态, 可能是0,1,2三种状态
let state = self.futex.load(Relaxed);
// 如果是未加锁,或者是有其它线程等待,则返回
// 如果spin次数用完了,也返回
if state != 1 || spin == 0 {
return state;
}
crate::hint::spin_loop();
spin -= 1;
}
}
...
}
|
|
rust
impl Mutex {
...
pub unsafe fn unlock(&self) {
if self.futex.swap(0, Release) == 2 {
// 如果还有等待的线程,就唤醒一个
self.wake();
}
}
#[cold]
fn wake(&self) {
futex_wake(&self.futex);
}
...
}`
可以看到,Rust的Mutex的实现还是比较简单的,它的核心是利用操作系统的futex
相关方法,加一个AtomicU32
标志来实现。