C++中的同步原语

C++ 标准库中包含了一些基本的同步原语,尤其是C++ 20标准库又补充了一些。本文简单介绍这些并发原语,并通过示例演示它们基本的功能。

std::thread

std::thread是在C++11中引入的,它表示一个可执行的线程。线程允许多个函数并发执行。

线程在关联的线程对象构造完成后立即开始执行(受操作系统调度延迟影响),从作为构造函数参数提供的顶级函数开始。顶级函数的返回值会被忽略,如果它通过抛出异常终止,std::terminate 会被调用。顶级函数可以通过 std::promise 或通过修改共享变量(可能需要同步,参见 std::mutex 和 std::atomic)向调用者传达其返回值或异常。

std::thread 对象也可能处于不表示任何线程的状态(在默认构造、移动构造、分离或加入之后),而且执行线程可能不与任何线程对象关联(在分离之后)。

没有两个 std::thread 对象可以表示同一个执行线程;std::thread 不可复制构造或复制赋值,尽管它是可移动构造和可移动赋值的。

你需要手动管理线程的生命周期,包括启动和加入(或分离)线程。如果你忘记在一个std::thread对象销毁之前加入(join)或分离(detach)它的线程,程序将会终止(因为std::thread的析构函数会调用std::terminate)。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>
void threadFunction() {
std::cout << "Hello from thread!\n";
}
int main() {
std::thread t(threadFunction);
// 必须在t销毁之前对其调用join或detach
t.join(); // 等待线程结束
return 0;
}

std::jthread

std::jthread是在C++20中引入的,它提供了一些改进和附加功能,相比于std::thread,使得线程管理变得更加容易和安全。

它在std::thread的基础上增加了自动的线程加入功能。std::jthread的一个关键特性是它的析构函数会自动请求线程停止(如果支持的话)并等待线程完成,从而减少了程序员需要手动管理线程生命周期的需求。

此外,std::jthread支持协作式中断,它提供了一种机制,使得线程可以被请求停止执行。这是通过传递一个std::stop_token来实现的,线程函数可以定期检查这个stop_token来决定是否应该停止执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <thread>
void threadFunction(std::stop_token stoken) {
while (!stoken.stop_requested()) { // 检查是否请求停止线程
std::cout << "Hello from jthread!\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Stopping as requested.\n";
}
int main() {
std::jthread jt(threadFunction);
std::this_thread::sleep_for(std::chrono::seconds(3));
// 不需要手动调用join,析构函数会自动处理
// jt.request_stop(); 如果需要提前请求停止线程,则调用这个
return 0;
}

在上面的例子中,std::jthread的析构函数会自动调用request_stop来请求线程函数停止,然后等待线程完成。这使得使用std::jthreadstd::thread更加安全和方便,因为它消除了忘记加入或分离线程时可能出现的问题。

总结来说,std::jthreadstd::thread的基础上提供了自动加入和协作式中断的功能,从而简化了线程的管理。如果你使用的是C++20或更高版本,优先考虑使用std::jthread

atomic

std::atomic

std::atomic 是一个模板类,提供了一种机制来安全地在多线程环境中操作共享数据,而不需要使用互斥锁。std::atomic 类型保证了基本的原子操作,比如读取、写入、递增和递减等,都是原子性的,也就是说在一个操作执行完毕前,不会被其他线程打断。

原子性意味着当一个线程正在执行原子操作时,没有其他线程可以同时执行任何其他对同一数据的原子操作。此外,C++11 引入的内存模型定义了原子操作的内存顺序(memory order),这是一个非常复杂的主题,决定了在不同线程中操作之间的可见性和排序。我在《并发编程顶峰对决: Go vs Rust》讲了Rust的内存顺序模型,也提到了Rust的内存顺序模型和C++的内存顺序模型,这里就不赘述了,总之内存序包含下面几种类型,你应该正确且清晰的使用它们:

  • std::memory_order_relaxed
  • std:: memory_order_consume
  • std::memory_order_acquire
  • std::memory_order_release
  • std::memory_order_acq_rel
  • std::memory_order_seq_cst

std::atomic 类型的对象可以通过调用成员函数loadstore来读取和写入,也可以通过operator++operator--来递增和递减。std::atomic 类型的对象还可以通过调用成员函数exchange来交换值,通过调用成员函数compare_exchange_weakcompare_exchange_strong来比较和交换值。

waitnotifynotify_all函数可以用来等待和通知其他线程,这些函数在C++20中引入。有点像条件变量。

通过std::atomic类模板,我们可以创建原子类型的对象,比如std::atomic<int>std::atomic<bool>std::atomic<std::string>等等。std::atomic类模板还提供了一些特化版本,比如std::atomic_flagstd::atomic_boolstd::atomic_intstd::atomic_uintstd::atomic_llong等等。

下面是一个计数器的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 10000; ++i) {
counter.fetch_add(1);
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.push_back(std::thread(increment));
}
for (auto& t : threads) {
t.join();
}
std::cout << "Counter: " << counter.load() << std::endl;
return 0;
}

在这个例子中,10个线程并发地递增一个std::atomic计数器。由于counter是原子类型,所以每个线程的修改都是原子操作,不会相互干扰。最终的计数器值应该是100000。

std::atomic_flag

std::atomic_flag 是 C++11 中引入的原子类型,它是最简单的原子类型,提供了一个布尔标志,可以用来进行简单的锁定操作。由于其简单性,std::atomic_flag 通常可以实现为一个非常高效的原子类型,因此它在实现自旋锁等低级同步原语时非常有用。

std::atomic_flag 保证是 lock-free 的,即不会引起调用线程的阻塞。这是 std::atomic_flag 相对于其他原子类型的一个独特优点,因为其他原子类型在一些平台上可能不是 lock-free 的。

std::atomic_flag 提供以下几个主要操作:

  • clear(): 将标志设置为 false。
  • test_and_set(): 测试标志的当前值,然后将其设置为 true。这个操作是原子的,即测试和设置是一个不可分割的步骤。
  • test(): C++20 新增的操作,测试标志的当前值而不修改它。
  • wait(): C++20 新增的操作,如果标志为 true,则阻塞调用线程。
  • notify_one(): C++20 新增的操作,通知等待线程中的一个线程。
  • notify_all(): C++20 新增的操作,通知等待线程中的所有线程。

下面这个例子是检查 atomic_flag 是否已被设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <atomic>
#include <iostream>
std::atomic_flag flag = ATOMIC_FLAG_INIT;
int main() {
// 设置标志
flag.test_and_set();
// 检查标志是否已经设置
if (flag.test()) {
std::cout << "Flag is set.\n";
}
// 清除标志
flag.clear();
// 再次检查标志
if (!flag.test()) {
std::cout << "Flag is cleared.\n";
}
return 0;
}

std::atomic_ref

std::atomic_ref 是 C++20 引入的一个模板类,它提供了对非原子类型的原子操作。这意味着你可以将 std::atomic_ref 对象绑定到非原子类型的引用上,并执行原子操作,而无需将该类型本身声明为原子类型。这在你需要对现有数据结构中的某个成员进行原子操作,而不想更改数据结构定义时非常有用。

std::atomic_ref 的特性

  • std::atomic_ref 通过引用传递给它的对象,并提供原子访问。
  • 它对绑定的对象执行原子操作,如加载(load)、存储(store)、增加(fetch_add)、减少(fetch_sub)等。
  • std::atomic_ref 不拥有它所绑定的对象,故该对象的生命周期必须超过 std::atomic_ref 对象的生命周期。
  • 你可以在多个 std::atomic_ref 实例之间共享同一个对象,但是要保证这些实例不会同时访问该对象。

std::atomic_ref 的主要成员函数

  • store(T desired, std::memory_order order = std::memory_order_seq_cst): 将 desired 值原子地存储到引用的对象中。
  • T load(std::memory_order order = std::memory_order_seq_cst) const: 原子地加载并返回引用的对象的值。
  • T fetch_add(T arg, std::memory_order order = std::memory_order_seq_cst): 原子地将 arg 添加到引用的对象的当前值,并返回之前的值。
  • T fetch_sub(T arg, std::memory_order order = std::memory_order_seq_cst): 原子地从引用的对象的当前值中减去 arg,并返回之前的值。
  • bool compare_exchange_weak(T& expected, T desired, std::memory_order order = std::memory_order_seq_cst): 原子地比较引用的对象的值与 expected,如果相同,则将该对象的值设置为 desired。
  • bool compare_exchange_strong(T& expected, T desired, std::memory_order order = std::memory_order_seq_cst): 类似于 compare_exchange_weak,但具有更强的保证,防止假失败。
  • wait(): C++20 新增的操作,如果标志为 true,则阻塞调用线程。
  • notify_one(): C++20 新增的操作,通知等待线程中的一个线程。
  • notify_all(): C++20 新增的操作,通知等待线程中的所有线程。

以下是一个使用 std::atomic_ref 的简单示例,演示了如何对一个共享的 int 变量执行原子操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
int main() {
int shared_value = 0;
std::atomic_ref<int> atomic_ref(shared_value);
auto incrementer = [&atomic_ref]() {
for (int i = 0; i < 100; ++i) {
atomic_ref.fetch_add(1, std::memory_order_relaxed); // 原子地增加 shared_value 的值
}
};
std::vector<std::thread> threads;
// 创建 10 个线程,每个线程都增加 shared_value 的值
for (int i = 0; i < 10; ++i) {
threads.emplace_back(incrementer);
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
std::cout << "Final value of shared_value: " << shared_value << std::endl;
// 正确的输出应该是 1000
return 0;
}

在这个例子中,我们创建了一个普通的 int 类型变量 shared_value 和一个 std::atomic_ref<int> 实例 atomic_ref,后者引用了前者。然后我们启动了 10 个线程,每个线程都通过 atomic_ref 原子地对 shared_value 执行 100 次增加操作。在所有线程完成后,我们期望 shared_value 的最终值为 1000。

std::mutex

std::mutex 用于保护共享数据,避免多个线程同时访问导致的数据竞争和不一致性。当多个线程尝试同时修改同一数据时,std::mutex 提供了一种机制来确保只有一个线程能够访问数据,其余试图访问该数据的线程将被阻塞,直到拥有互斥锁的线程释放锁为止。

std::mutex 的主要操作

  • lock(): 阻塞当前线程,直到能够锁定该互斥锁。如果互斥锁已被其他线程锁定,则当前线程将等待(阻塞),直到互斥锁被解锁。
  • unlock(): 解锁互斥锁,允许其他正在等待的线程能够尝试锁定互斥锁。
  • try_lock(): 尝试锁定互斥锁,如果互斥锁当前未被锁定,则锁定它并立即返回 true;如果已被其他线程锁定,则不会阻塞当前线程,立即返回 false。

使用 std::mutex 保护共享数据的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
std::mutex mtx; // 用于同步的互斥锁
int counter = 0; // 共享数据
void increment_counter() {
mtx.lock(); // 获取锁
++counter; // 修改共享数据
mtx.unlock(); // 释放锁
}
int main() {
std::vector<std::thread> threads;
// 创建多个线程,模拟并发环境
for (int i = 0; i < 100; ++i) {
threads.push_back(std::thread(increment_counter));
}
// 等待所有线程完成
for (auto& th : threads) {
th.join();
}
std::cout << "Final counter value: " << counter << std::endl;
return 0;
}

在这个例子中,100个线程尝试并发地增加一个共享计数器。没有互斥锁的情况下,多个线程可能同时读写同一内存位置,导致计数器的值不正确。通过使用 std::mutex,我们确保了每次只有一个线程能够增加计数器,从而保证了最终结果的正确性。

可以使用std::lock_guard 管理 std::mutex 的锁定和解锁,类似Rust:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
std::mutex mtx; // 用于同步的互斥锁
int counter = 0; // 共享数据
void increment_counter() {
std::lock_guard<std::mutex> lock(mtx); // 创建 lock_guard 对象时自动获取锁,并在作用域结束时自动释放锁
++counter; // 修改共享数据
// lock_guard 对象析构时自动调用 unlock
}
int main() {
std::vector<std::thread> threads;
// 创建多个线程,模拟并发环境
for (int i = 0; i < 100; ++i) {
threads.push_back(std::thread(increment_counter));
}
// 等待所有线程完成
for (auto& th : threads) {
th.join();
}
std::cout << "Final counter value: " << counter << std::endl;
return 0;
}

除了基本的 std::mutex,还提供了几种其他类型的锁,用于满足不同的同步需求。以下是一些常见的锁类型:

  • std::recursive_mutex
    std::recursive_mutex 是一种特殊类型的互斥锁,它允许同一个线程多次对同一个互斥锁加锁(即递归锁定)。每次对 std::recursive_mutex 的成功锁定都必须由相应数量的解锁操作与之匹配。这适用于递归函数调用,其中函数可能会直接或间接地多次请求同一互斥锁。
  • std::timed_mutex
    std::timed_mutex 是互斥锁的一个扩展,它提供了尝试锁定一段时间的功能。如果锁在指定时间内未被获取,则尝试锁定操作失败并返回。它提供了两个额外的成员函数:try_lock_for()try_lock_until(),分别用于指定等待锁定的时间长度和绝对时间点。
  • std::recursive_timed_mutex
    std::recursive_timed_mutex 结合了 std::recursive_mutexstd::timed_mutex 的功能,允许一个线程对同一个互斥锁进行多次锁定,并提供了基于时间的锁定尝试。
  • std::shared_mutex
    std::shared_mutex 是一个读写锁,它允许多个线程同时读取共享数据(共享锁定),但一次只允许一个线程写入(独占锁定)。它提供了 lock_shared()unlock_shared() 来管理共享锁定,以及 lock()unlock() 来管理独占锁定。
  • std::shared_timed_mutex
    std::shared_timed_mutex 结合了 std::shared_mutexstd::timed_mutex 的特性,提供了时间限制的读写锁。它允许多个线程在一段时间内尝试以共享或独占方式锁定互斥锁。

还有一些辅助管理mutex的类:

  • std::lock_guard
    std::lock_guard 是一个作用域锁,当创建它的对象时自动获取互斥锁,并在该对象的生命周期结束时自动释放互斥锁。std::lock_guard 不支持显式的解锁操作或条件等待。

  • std::unique_lock
    std::unique_lock 是一个灵活的作用域锁,它提供了比 std::lock_guard 更多的功能,包括延迟锁定、时间限制的锁定尝试、递归锁定以及条件变量的支持。std::unique_lock 对象可以在其生命周期中多次锁定和解锁关联的互斥锁。

  • std::scoped_lock (C++17)
    std::scoped_lock 是 C++17 引入的一个作用域锁,它可以锁定一个或多个互斥锁,而无需担心死锁。它在内部使用了一个死锁避免算法(如锁的排序获取),确保在多个互斥锁的情况下不会发生死锁。

一个读写锁的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <iostream>
#include <shared_mutex>
#include <thread>
#include <vector>
std::shared_mutex rw_mutex;
int shared_data = 0; // 共享数据
void reader(int id) {
std::shared_lock<std::shared_mutex> lock(rw_mutex); // 获取共享锁
std::cout << "Reader #" << id << " read value: " << shared_data << std::endl;
}
void writer(int id) {
std::unique_lock<std::shared_mutex> lock(rw_mutex); // 获取独占锁
++shared_data;
std::cout << "Writer #" << id << " wrote value: " << shared_data << std::endl;
}
int main() {
std::vector<std::thread> readers;
std::vector<std::thread> writers;
// 创建读者线程
for (int i = 0; i < 5; ++i) {
readers.push_back(std::thread(reader, i));
}
// 创建写者线程
for (int i = 0; i < 2; ++i) {
writers.push_back(std::thread(writer, i));
}
// 等待读者线程完成
for (auto& th : readers) {
th.join();
}
// 等待写者线程完成
for (auto& th : writers) {
th.join();
}
return 0;
}

在这个例子中,std::shared_mutex 被用作一个读写锁来保护共享数据。读者线程使用 std::shared_lock 获取共享锁,这允许多个读者线程同时读取数据。写者线程使用 std::unique_lock 获取独占锁,这确保了在写入数据时只有一个写者线程可以访问数据。

std::condition_variable

std::condition_variable 用于在多线程程序中进行线程间的通知和等待操作。它允许一个或多个线程在某些条件成立之前挂起(等待),直到另一个线程通知它们条件已经满足。
std::condition_variable 通常与 std::mutex(或 std::unique_lock)一起使用,以保护共享数据并提供安全的同步机制。

主要方法:

  • wait(): 阻塞当前线程,直到其他线程调用 notify_one() 或 notify_all()。在等待期间,互斥锁会被释放,以允许其他线程修改共享数据。当条件变量被通知时,线程会被唤醒,并在返回前重新获取互斥锁。
  • notify_one(): 唤醒一个等待的线程。如果没有线程在等待,则调用没有任何效果。
  • notify_all(): 唤醒所有等待的线程。如果没有线程在等待,则调用没有任何效果。
  • wait_for(): 阻塞当前线程一段时间,或直到被通知。如果在指定的时间段内没有接收到通知,线程会自动唤醒。
  • wait_until(): 阻塞当前线程直到指定的时间点,或直到被通知。如果到达指定的时间点时没有接收到通知,线程会自动唤醒。

下面是一个使用 std::condition_variable 的简单示例,演示了生产者-消费者问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
std::mutex mtx; // 用于同步的互斥锁
std::condition_variable cv; // 条件变量
std::queue<int> product_queue; // 共享数据队列
void producer(int id) {
std::unique_lock<std::mutex> lck(mtx); // 加锁
product_queue.push(id); // 生产产品
std::cout << "Producer " << id << " produced a product." << std::endl;
lck.unlock(); // 解锁
cv.notify_one(); // 通知一个消费者
}
void consumer() {
std::unique_lock<std::mutex> lck(mtx); // 加锁
while (product_queue.empty()) { // 如果队列为空,则等待
cv.wait(lck); // 在这里,互斥锁会被释放
}
int product = product_queue.front();
product_queue.pop();
lck.unlock(); // 解锁
std::cout << "Consumer consumed product " << product << std::endl;
}
int main() {
std::thread consumers[2];
std::thread producers[2];
// 创建消费者线程
for (int i = 0; i < 2; ++i) {
consumers[i] = std::thread(consumer);
}
// 创建生产者线程
for (int i = 0; i < 2; ++i) {
producers[i] = std::thread(producer, i+1);
}
// 等待生产者线程结束
for (int i = 0; i < 2; ++i) {
producers[i].join();
}
// 等待消费者线程结束
for (int i = 0; i < 2; ++i) {
consumers[i].join();
}
return 0;
}

在这个例子中,生产者线程生产产品并将其放入队列中,然后通过调用 cv.notify_one() 唤醒一个等待的消费者线程。消费者线程在队列为空时调用 cv.wait() 进入等待状态,等待生产者的通知。当生产者生产了一个产品后,消费者线程被唤醒,从队列中取出产品并消费它。

注意事项: (和Go的Cond类似)

  • 使用 std::condition_variable 时,应该总是和一个互斥锁一起使用,以避免竞争条件。
  • 在调用 wait()wait_for()wait_until() 时,互斥锁必须已被锁定。这些函数会在开始等待时自动释放锁,并在线程被唤醒时重新获取锁。
  • std::condition_variablewait() 函数可能会出现"虚假唤醒",即在没有收到通知的情况下线程可能被唤醒。因此,通常需要在一个循环中使用 wait(),并检查等待条件是否满足。
  • notify_one()notify_all() 不需要持有互斥锁,但通常会在更新共享数据并持有互斥锁后调用它们。
  • std::condition_variable 只能与 std::unique_lock<std::mutex> 一起使用,不能直接与 std::mutex 一起使用。如果你需要和 std::mutex 一起使用条件变量,请使用 std::condition_variable_any

semaphore

在 C++20 之前,标准库没有提供信号量(semaphore)的实现,但是在 C++20 中引入了两种类型的信号量:std::counting_semaphorestd::binary_semaphore

std::counting_semaphore

std::counting_semaphore 是一种通用的同步原语,用于控制对有限数量资源的访问。它维护一个内部的计数器,表示可用资源的数量。计数器的值可以增加(通过 release() 函数)或减少(通过 acquire() 函数)。

信号量的主要操作包括:

  • acquire(): 减少信号量的计数器值。如果计数器的当前值大于零,调用 acquire() 将减少计数器的值,并允许线程继续执行。如果计数器值为零,则调用线程将阻塞,直到其他线程释放资源。
  • release(): 增加信号量的计数器值。调用 release() 会将计数器的值增加一定数量(默认为1),并可能唤醒正在等待的线程。
  • try_acquire(): 尝试获取资源,如果信号量的计数器值大于零,则减少计数器并返回 true;如果计数器值为零,则不阻塞,直接返回 false。

std::counting_semaphore 的计数器值可以大于1,因此它可以用于多个资源的同步。例如,可以用它来实现连接池,限制同时运行的线程数量,等等。

std::binary_semaphore

std::binary_semaphorestd::counting_semaphore 的一个特例,其计数器值限定为最多1。这意味着它可以被看作是一个可以阻塞线程的布尔标志。

std::binary_semaphore 的行为类似于互斥锁(mutex),但与互斥锁不同的是,std::binary_semaphore 不要求同一个线程执行 acquire()release()。这使得信号量可以用于线程间的通知和同步,而不仅仅是互斥。

下面是一个使用std::counting_semaphore的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <semaphore>
#include <thread>
#include <vector>
// 初始化信号量,允许同时有3个线程访问资源。
std::counting_semaphore<3> sem(3);
void access_resource(int thread_id) {
// 请求访问资源
sem.acquire();
std::cout << "Thread " << thread_id << " is accessing the resource." << std::endl;
// 模拟资源访问
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread " << thread_id << " is releasing the resource." << std::endl;
// 释放资源
sem.release();
}
int main() {
std::vector<std::thread> threads;
// 创建多个线程,模拟并发资源访问
for (int i = 0; i < 10; ++i) {
threads.emplace_back(access_resource, i);
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
return 0;
}

std::barrier

std::barrier 也是 C++20 引入的一个同步原语,它允许一组线程相互等待,直到所有线程都达到某个同步点(称为屏障点或栅栏点),然后再继续执行。std::barrier 可以用于协调并行算法中的线程,确保所有线程都完成了某个阶段的工作,然后再一起进入下一个阶段。

std::barrier 的主要特性

  • std::barrier 可以配置一个可调用对象(通常是一个函数或 lambda 表达式),当所有线程都到达屏障点时,这个可调用对象会被执行。这可以用于在所有线程继续之前进行一些初始化或清理工作。
  • std::barrier 是可重用的,这意味着一旦所有线程通过了屏障点,它可以被用于下一个同步点。
  • std::barrier 的构造函数接受一个表示线程总数的参数,以及一个可选的可调用对象。

std::barrier 的主要成员函数

  • arrive_and_wait(): 该函数使调用线程到达屏障点并等待其他线程。当最后一个线程调用 arrive_and_wait() 时,所有线程都被释放,并且可调用对象(如果有)被执行。
  • arrive(): 该函数使调用线程到达屏障点但不等待。它可以用于线程通知已到达屏障点,但随后立即继续执行其他任务。
  • wait(): 该函数使已到达屏障点的线程等待其他线程。它通常与 arrive() 配合使用。
  • arrive_and_drop(): 该函数使调用线程到达屏障点并永久退出屏障。它将屏障点的期望线程总数减少一个。这对于动态线程管理很有用。

以下是一个使用 std::barrier 的简单示例,演示了如何同步多个线程在屏障点上相互等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <barrier>
#include <thread>
#include <vector>
// 创建一个屏障,用于同步三个线程
std::barrier sync_point(3);
void work(int id) {
std::cout << "Thread #" << id << " is doing some work before the barrier." << std::endl;
// 执行前半部分的工作
// ...
// 等待屏障点,等待其他线程
sync_point.arrive_and_wait();
// 当所有线程都达到屏障点时,继续执行后半部分的工作
std::cout << "Thread #" << id << " is doing some work after the barrier." << std::endl;
// ...
}
int main() {
std::vector<std::thread> threads;
// 启动三个工作线程
for (int i = 0; i < 3; ++i) {
threads.emplace_back(work, i);
}
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
return 0;
}

在这个例子中,我们创建了一个 std::barrier 对象 sync_point,它配置为同步三个线程。每个线程都执行一些工作,然后调用 sync_point.arrive_and_wait() 来等待其他线程。一旦所有三个线程都到达屏障点(即都调用了 arrive_and_wait()),它们将一起继续执行后面的代码。

std::barrier 是一种强大的同步工具,特别适用于需要分阶段执行的并行算法,确保在算法的每个阶段开始前,所有线程都已完成前一个阶段的工作。这有助于避免竞争条件,确保算法的正确执行。

std::latch

std::latch 也是 C++20 引入的一个同步原语,它使一组线程可以等待直到一个给定数量的操作完成。它是一个一次性的屏障,一旦触发打开,就不能再重置或再次使用。std::latch 用于在多个线程之间同步操作,允许一个线程等待一个或多个线程完成某些操作。类似Java中的CountDownLatch。

std::latch 的主要特性

  • std::latch 在构造时接受一个计数值,这个值表示需要等待的操作数。
  • 当线程完成它的操作时,它调用 count_down() 方法来减少 std::latch 的计数器。
  • 线程可以调用 wait() 方法来阻塞,直到 std::latch 的计数器达到零。
  • std::latch 可以有多个线程同时等待计数器达到零。
  • 一旦 std::latch 的计数器达到零,所有调用 wait() 的线程都将被释放,之后的任何 wait() 调用都会立即返回。
  • std::latch 提供了一个 try_wait() 方法,该方法立即返回并告知调用者 std::latch 是否已经触发(计数器是否已经为零)。

std::latch 的主要成员函数

  • count_down(): 减少 std::latch 的计数器。如果计数器达到零,所有等待的线程都将被释放。
  • wait(): 阻塞调用线程,直到 std::latch 的计数器为零。
  • try_wait(): 检查 std::latch 的计数器是否为零,不阻塞调用线程。

以下是一个使用 std::latch 的简单示例,演示了如何同步多个线程完成初始化操作后,主线程才继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <latch>
#include <thread>
#include <vector>
// 创建一个 std::latch 对象,用于等待三个线程完成初始化
std::latch initialization_latch(3);
void initialize_system(int id) {
// 模拟一些初始化工作
std::cout << "System #" << id << " is initializing." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "System #" << id << " initialization complete." << std::endl;
// 完成工作后,调用 count_down() 减少 latch 的计数
initialization_latch.count_down();
}
int main() {
std::vector<std::thread> threads;
// 启动三个线程以执行系统初始化
for (int i = 0; i < 3; ++i) {
threads.emplace_back(initialize_system, i);
}
// 主线程等待所有初始化完成
initialization_latch.wait();
std::cout << "All systems are initialized. Main thread is proceeding." << std::endl;
// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}
return 0;
}

在这个例子中,std::latch 用于确保三个并发运行的初始化操作都完成了,主线程才开始执行后续的任务。每个初始化线程在完成初始化后调用 initialization_latch.count_down(),这将减少 std::latch 的计数器。主线程调用 initialization_latch.wait() 来等待所有的初始化操作完成。一旦所有的初始化操作都调用了 count_down()std::latch 的计数器达到零,主线程将继续执行。

std::latch 是一个非常有用的同步工具,特别是在涉及一次性事件或需要多个线程完成启动步骤之后才能继续的场景中。它简化了在这些情况下的线程协调。

std::promise、std::future 和 std::async

std::promisestd::future 是 C++11 引入的同步原语,它们提供了一种在线程之间传递值的机制,也可以用于线程之间的同步。std::async 是 C++11 引入的一个函数模板,用于异步执行一个函数或可调用对象,并返回一个 std::future 对象,以便在将来某个时间点获取该函数的结果。

std::promise

std::promise 允许你在某个线程中存储一个值或异常,该值或异常可以在将来某个时刻被另一个线程检索。当创建 std::promise 对象时,它与一个 std::future 对象相关联,std::future 对象可用于访问 std::promise 中存储的值。

主要成员函数包括:

  • set_value(const T& value): 用来设置值,这会导致与之相关联的 std::future 对象变为 ready 状态,之后可以从中获取这个值。
  • set_exception(std::exception_ptr p): 用来设置异常,这也会导致相关联的 std::future 对象变为 ready 状态,但尝试从中获取值会引发异常。
  • get_future(): 返回与 std::promise 对象相关联的 std::future 对象。

std::future

std::future 提供了一种访问异步操作结果的机制。它与 std::promise 对象相关联,用于获取通过 promise 设置的值或异常。

主要成员函数包括:

  • get(): 获取存储在 std::promise 中的值,如果值还未被设置,会阻塞调用线程直到值变为可用。如果 promise 中存储了异常,则调用 get() 会抛出该异常。
  • wait(): 等待异步操作完成,不返回结果。
  • valid(): 检查 future 对象是否与一个共享状态相关联(即它是否有值可以获取)。

std::async

std::async 是一个函数模板,用于启动一个异步任务,它的返回类型是 std::future,通过该 future 可以访问异步任务的结果。当调用 std::async 时,可以指定一个函数或可调用对象,以及传递给该函数的参数。std::async 可以指定启动策略,例如 std::launch::async(在新线程中运行)或 std::launch::deferred(延迟执行,直到调用 std::future::get()wait())。

以下是一个使用 std::promisestd::future 的简单示例,模拟了一个异步计算任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <future>
#include <thread>
int main() {
// 创建一个 promise 对象
std::promise<int> prom;
// 从 promise 中获取 future
std::future<int> fut = prom.get_future();
// 启动一个线程来完成一个计算任务并设置 promise 的值
std::thread t([&prom]() {
// 模拟一些计算
std::this_thread::sleep_for(std::chrono::seconds(2));
prom.set_value(42); // 设置 promise 的值
});
// 在主线程中,我们可以等待 future 变为 ready 并获取值
int result = fut.get(); // 这里会阻塞直到线程设置了 promise 的值
std::cout << "The result is: " << result << std::endl;
t.join(); // 等待线程完成
return 0;
}

在这个例子中,std::async 用于启动一个新线程来执行 compute 函数。通过返回的 std::future 对象,主线程可以在稍后获取异步计算的结果。如果在此期间尚未完成计算,调用 fut.get() 将会阻塞主线程。

std::promisestd::futurestd::async 提供了 C++ 中进行异步编程的基础设施,允许开发者在不同线程之间传递数据和同步操作,同时将并发复杂性降到最低。