锁、SendArc

你刚实现的打补丁策略有一个重大缺陷:它存在竞争条件。如果两个客户端几乎同时为同一张票证发送补丁,服务器将以任意顺序应用这些补丁。无论谁最后排队等候其补丁,都会覆盖另一个客户端所做的更改。

版本号

我们可以通过使用版本号来尝试解决这个问题。每张票证创建时都会分配一个版本号,初始值设为0。每当客户端发送补丁时,他们必须包含票证的当前版本号以及期望的更改。只有当版本号与服务器存储的一致时,服务器才会应用该补丁。

在上述场景中,服务器会拒绝第二个补丁,因为版本号会被第一个补丁增量,从而与第二个客户端发送的不匹配。这种方法在分布式系统中相当常见(例如,当客户端和服务器不共享内存时),并被称为乐观并发控制。其思想是大多数时候冲突不会发生,因此我们可以针对常见情况进行优化。如果你愿意,你现在对Rust的了解足以让你作为额外练习自行实现这一策略。

加锁

我们也可以通过引入来修复竞态条件。每当客户端想要更新票证时,他们必须首先获取对该票证的锁。在锁激活期间,其他客户端不能修改票证。

Rust标准库提供了两种不同的锁原语:Mutex<T>RwLock<T>。我们先从Mutex<T>开始。它代表“互斥”,是最简单的锁类型:无论读取还是写入,都只允许一个线程访问数据。

Mutex<T>包裹了它所保护的数据,并且是泛型于数据类型。你不能直接访问数据:类型系统强制你先使用Mutex::lockMutex::try_lock获取锁。前者直到获取锁为止会阻塞,后者如果无法获取锁则会立即返回错误。两种方法都会返回一个守卫对象,该对象解引用后可访问数据,允许你修改它。当守卫被释放时,锁也会被释放。

#![allow(unused)]
fn main() {
use std::sync::Mutex;

// 由互斥锁保护的整数
let lock = Mutex::new(0);

// 获取互斥锁
let mut guard = lock.lock().unwrap();

// 通过守卫间接修改数据,
// 利用其`Deref`实现
*guard += 1;

// 当`guard`超出作用域时释放锁
// 可以通过显式丢弃守卫来完成
// 或者守卫自然离开作用域时隐式发生
drop(guard);
}

锁的粒度

我们的Mutex应该包装什么?最简单的选择是将整个TicketStore用单个Mutex包裹。这虽然可行,但会严重限制系统的性能:你无法并行读取票证,因为每次读取都必须等待锁释放。这被称为粗粒度锁定

更好的做法是使用细粒度锁定,即每张票证都有自己的锁。这样,只要客户端不尝试访问同一张票证,它们就可以继续并行处理票证。

#![allow(unused)]
fn main() {
// 新结构,每张票证都有自己的锁
struct TicketStore {
    tickets: BTreeMap<TicketId, Mutex<Ticket>>,
}
}

这种方法效率更高,但也有缺点:TicketStore必须开始意识到系统的多线程性质;到目前为止,TicketStore一直忽略了线程的存在。尽管如此,我们还是采用这种方法。

谁持有锁?

为了使整个方案工作,锁必须传递给想要修改票证的客户端。客户端随后可以直接修改票证(就像他们拥有&mut Ticket一样),并在完成后释放锁。

这有点棘手。我们不能通过通道发送Mutex<Ticket>,因为Mutex不可克隆,而且我们不能将其移出TicketStore。那我们能发送MutexGuard吗?

让我们用一个小例子测试这个想法:

use std::thread::spawn;
use std::sync::Mutex;
use std::sync::mpsc::sync_channel;

fn main() {
    let lock = Mutex::new(0);
    let (sender, receiver) = sync_channel(1);
    let guard = lock.lock().unwrap();

    spawn(move || {
        receiver.recv().unwrap();
    });

    // 尝试通过通道发送守卫到另一个线程
    sender.send(guard);
}

编译器对此代码不满意:

错误[E0277]: `MutexGuard<'_, i32>`不能安全地在线程间发送
   --> src/main.rs:10:7
    |
10  |   spawn(move || {
    |  _-----_^
    | | |
    | | 需要此边界的调用
11  | |     receiver.recv().unwrap();;
12  | | });
    | |_^ `MutexGuard<'_, i32>`不能安全地在线程间发送
    |
    = 帮助: 类型`MutexGuard<'_, i32>`没有实现`Send`特质,这是`{closure@src/main.rs:10:7: 10:14}: Send`所需要的
    = 注意: 这是因为`std::sync::mpsc::Receiver<MutexGuard<'_, i32>>`需要实现`Send`
注意: 因为它在这个闭包内被使用

MutexGuard<'_, i32>不是Send:这意味着什么?

Send

Send是一个标记特质,表明一种类型可以安全地从一个线程转移到另一个线程。Send也是一个自动特质,就像Sized一样;编译器会根据类型的定义自动实现(或不实现)它。你也可以手动为你的类型实现Send,但这需要unsafe,因为你必须保证类型确实可以在线程间安全发送,而这是编译器无法自动验证的原因。

通道需求

Sender<T>SyncSender<T>Receiver<T>只有当TSend时才是Send。这是因为它们用于在线程间发送值,如果值本身不是Send,那么在线程间发送它是不安全的。

MutexGuard

MutexGuard不是Send,因为在某些平台上,Mutex用来实现锁的底层操作系统原语要求必须由获取它的同一线程释放锁。如果我们把MutexGuard发送到另一个线程,锁就会被不同的线程释放,导致未定义行为。

我们的挑战

总结一下:

  • 我们不能通过通道发送MutexGuard。所以我们不能在服务器端加锁然后在客户端修改票证。
  • 我们可以发送Mutex通过通道,只要它保护的数据是Send,对于Ticket来说就是这种情况。 同时,我们不能将Mutex移出TicketStore,也不能克隆它。

我们如何解决这个难题?我们需要从不同的角度审视问题。 锁定Mutex时,我们不需要拥有值。共享引用就足够了,因为Mutex使用内部可变性:

#![allow(unused)]
fn main() {
impl<T> Mutex<T> {
    // `&self`,而不是`self`!
    pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
        // 实现细节
    }
}
}

因此,发送给客户端一个共享引用就足够了。然而,我们不能直接这么做,因为引用必须是'static,而实际情况并非如此。在某种程度上,我们需要一个“拥有式的共享引用”。事实证明,Rust有一个符合要求的类型:Arc

Arc来救援

Arc代表原子引用计数Arc包裹着一个值并跟踪对这个值存在的引用数量。当最后一个引用被释放时,值就被回收。被Arc包裹的值是不可变的:你只能获取到它的共享引用。

#![allow(unused)]
fn main() {
use std::sync::Arc;

let data: Arc<u32> = Arc::new(0);
let data_clone = Arc::clone(&data);

// `Arc<T>`实现了`Deref<T>`,所以可以将`&Arc<T>`转换为`&T`,使用解引用强制转换
let data_ref: &u32 = &data;
}

如果你觉得似曾相识,你是对的:Arc听起来非常类似于我们在讨论内部可变性时介绍的Rc,引用计数指针。不同之处在于线程安全性:Rc不是Send,而Arc是。这归结于引用计数的实现方式:Rc使用一个“正常”的整数,而Arc使用一个原子整数,可以在线程间安全共享和修改。

Arc<Mutex<T>>

如果我们将ArcMutex配对,最终得到一个类型,它可以:

  • 在线程间发送,因为:
    • 如果TSend,则ArcSend
    • 如果TSend,则Mutex也是Send
    • TTicket,它是Send
  • 可以克隆,因为无论T是什么,Arc都是Clone。克隆Arc会增加引用计数,数据不会被复制。
  • 可以用来修改它包装的数据,因为Arc让你可以获得Mutex<T>的共享引用,进而可以获取锁。

我们现在有了实现票证存储锁定策略所需的所有部件。

深入阅读