Mutex是最常用的一种同步原语,它提供了互斥锁的功能,多线程可以互斥访问共享数据以及通过锁保护临界区。Rust标准库提供了Mutex的实现,接下来我们看看它是怎么实现的。
Mutex的定义
Mutex包含三个字段。
一个是内部实现的锁(sys::Mutex),根据不同的操作系统,可能选择不同的实现。
一个是poison,用来标记锁是否被破坏,是否中毒了。
最后一个是data,用来存储被保护的数据。
1 2 3 4 5 6 7 8 9 10 11
| pub struct Mutex<T: ?Sized> { inner: sys::Mutex, poison: poison::Flag, data: UnsafeCell<T>, }
impl<T> Mutex<T> { pub const fn new(t: T) -> Mutex<T> { Mutex { inner: sys::Mutex::new(), poison: poison::Flag::new(), data: UnsafeCell::new(t) } } }
|
另外一个关联的数据结构是MutexGuard, 它是Mutex的一个智能指针,用来管理锁的生命周期。它实现了Deref和Drop,所以可以通过*来访问被保护的数据,当MutexGuard离开作用域时,会自动释放锁。
1 2 3 4 5
| ```rust pub struct MutexGuard<'a, T: ?Sized + 'a> { lock: &'a Mutex<T>, poison: poison::Guard, }
|
当请求锁时,调用内部的sys::Mutex上锁,并且返回一个MutexGuard,其实严格的说,返回一个LockResult,它是一个Result,当锁中毒时,返回Err,否则返回Ok,Ok中包含了MutexGuard。
1 2 3 4 5 6 7 8 9
| ```rust pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> { unsafe { self.inner.lock(); MutexGuard::new(self) } }
pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;
|
poison如果你不太了解,可以看看这篇文章,我们不进一步介绍它了。
MutexGuard功能已经介绍了, 接下来我们看看它的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| impl<T: ?Sized> Deref for MutexGuard<'_, T> { type Target = T;
fn deref(&self) -> &T { unsafe { &*self.lock.data.get() } } }
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.lock.data.get() } } }
|
MutexGuard的解引用返回被保护的数据,返回的是一个引用和可变引用。
当MutexGuard离开作用域时,会自动释放锁,它的实现如下,可以看到它调用了self.lock.inner.unlock():
1 2 3 4 5 6 7 8 9
| impl<T: ?Sized> Drop for MutexGuard<'_, T> { #[inline] fn drop(&mut self) { unsafe { self.lock.poison.done(&self.poison); self.lock.inner.unlock(); } } }
|
try_lock调用inner的try_lock,每什么好说的:
1 2 3 4 5 6 7 8 9
| pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> { unsafe { if self.inner.try_lock() { Ok(MutexGuard::new(self)?) } else { Err(TryLockError::WouldBlock) } } }
|
目前rust还提供了一个不稳定的方法unlock,主动的立即释放锁,其实就是调用了drop(guard):
1 2 3
| pub fn unlock(guard: MutexGuard<'_, T>) { drop(guard); }
|
那么看起来,锁的主要逻辑是inner实现的,接下来我们看看inner的实现。
inner的类型是sys::Mutex,它位于library/std/src/sys/mod.rs,根据操作系统的不同,有不同的实现,我们主要看linux(unix)的实现。
unix的实现位于/library/std/src/sys/unix/locks, 根据具体的操作系统,也有两种实现方案,我们主要看linux的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| if #[cfg(any( target_os = "linux", target_os = "android", all(target_os = "emscripten", target_feature = "atomics"), target_os = "freebsd", target_os = "openbsd", target_os = "dragonfly", ))] { mod futex_mutex; mod futex_rwlock; mod futex_condvar; pub(crate) use futex_mutex::Mutex; pub(crate) use futex_rwlock::RwLock; pub(crate) use futex_condvar::Condvar; } ...
|
它有两种实现futex_mutex和pthread_mutex.rs, Linux操作系统下,使用的是futex_mutex。
futex_mutex会使用Linux操作系统的futex_wait和futex_wake系统调用。
它只包含一个futex字段,它是一个AtomicU32,用来表示锁的状态,它有三种状态:
- 0: 未加锁状态·
- 1: 加锁,且没有其它线程等待
- 2: 加锁,且有其它线程等待(竞争)
1 2 3 4 5 6 7 8 9
| use crate::sync::atomic::{ AtomicU32, Ordering::{Acquire, Relaxed, Release}, }; use crate::sys::futex::{futex_wait, futex_wake};
pub struct Mutex { futex: AtomicU32, }
|
new创建一个未加锁的Muext, 而try_lock会尝试将futex从0改为1,如果成功,表示加锁成功,否则失败。
注意这里使用了 Acquire和Relaxed Ordering,交换成功新值内存可见(Acquire),失败无所谓了(Relaxed):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| impl Mutex { #[inline] pub const fn new() -> Self { Self { futex: AtomicU32::new(0) } }
#[inline] pub fn try_lock(&self) -> bool { self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok() }
... }
接下来是重头戏`lock`:
```rust impl Mutex { ... #[inline] pub fn lock(&self) { if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() { self.lock_contended(); } }
#[cold] fn lock_contended(&self) { let mut state = self.spin();
if state == 0 { match self.futex.compare_exchange(0, 1, Acquire, Relaxed) { Ok(_) => return, Err(s) => state = s, } }
loop { if state != 2 && self.futex.swap(2, Acquire) == 0 { return; }
futex_wait(&self.futex, 2, None);
state = self.spin(); } }
fn spin(&self) -> u32 { let mut spin = 100; loop { let state = self.futex.load(Relaxed);
if state != 1 || spin == 0 { return state; }
crate::hint::spin_loop(); spin -= 1; } } ...
}
|
总得来说,抢锁的线程通过spin,避免上下文切换,能够提高性能。如果spin次数用完了,就进入等待状态,等待其它线程释放锁,然后再抢。
剩下一个方法就是解锁了。解锁比较简单,就是把锁的状态从1或者2改为0,如果原来是2,表示还有其它线程等待,就唤醒一个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| impl Mutex { ... pub unsafe fn unlock(&self) { if self.futex.swap(0, Release) == 2 { self.wake(); } }
#[cold] fn wake(&self) { futex_wake(&self.futex); } ... }
|
可以看到,Rust的Mutex的实现还是比较简单的,它的核心是利用操作系统的futex相关方法,加一个AtomicU32标志来实现。