[译]异步代码中的阻塞操作

Stjepan Glavina是Rust流行的库Crossbeam的作者,最近一年专注于async-std的开发。他最近写了两篇关于rust异步编程的文章,我翻译成中文,学习一下。

本篇原文是:Blocking inside async code

以下是翻译:

大家好,很久没写博文了,这次回来感觉真好。首先带来一个好消息。在Crossbeam上花了两年的时间后,2019年我把我的焦点放在了运行时异步编程研究上(比如async-stdtokio)。尤其是我想让异步运行时(async runtimes)更有效、更健壮,同时也更简单。

在这篇文章中,我想谈谈所有的运行时都面临的一个有趣的问题:从异步代码中调用阻塞函数。

Async(异步) 和 Sync(同步)

我们最终在stable rust版本中加上了async/await,现在准备重写所有的同步代码,让它们变成异步的。我们应该这么做吗?我不知道。

sync库和async库的差异愈来越大,比如stdasync-std库。看起来两者很相近,除了一个有阻塞函数(sync),另一个有非阻塞函数(async)。类似的还有surfattohttpc库,它们都是http client库,一个是sync库,一个是async库。现在所有的新库的开发者都面临一个简单的抉择:应该提供一个sync库还是async库,还是两者都提供?

目前重复的API看起来很不幸,但是我个人很乐观,相信最终我们会找出一个办法出来。在任何情况下,我们都需要找到尽可能无缝地集成sync和async代码的方法。

从同步到异步,再到同步

rust的main函数是同步的,所以为了进入异步世界,我们需要显式的去做。使用async-std,我们可以通过调用block_on()函数进入异步世界:

1
2
3
4
5
6
7
8
9
use async_std::task;
// This is sync code.
fn main() {
task::block_on(foo());
}
// This is async code.
async fn foo() {}

如果要从异步世界进入同步世界,可以在异步代码中调用同步代码:

1
2
3
4
5
6
7
// This is async code.
async fn foo() {
bar();
}
// This is sync code.
fn bar() {}

所以为了从异步进入同步,我们不需要做任何额外的设置-只需调用同步函数就这么简单,除了...额...我们需要仔细关注需要花费很长时间的同步函数。孔子说在异步世界调用同步函数一定要三思。

阻塞影响并发

异步运行时的一个假设就是每次对future轮询的时候,它能很快返回Ready状态或者Pending状态。在异步代码中长时间阻塞是异步编程的一个很大的禁忌,一定要避免发生。

为了说明为什么,下面使用surf并发获取40个网页:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
use async_std::task;
use std::time::Instant;
// Fetch the HTML contents of a web page.
async fn get(url: &str) -> String {
surf::get(url).recv_string().await.unwrap()
}
fn main() {
task::block_on(async {
let start = Instant::now();
let mut tasks = Vec::new();
// Fetch the list of contributors for the first 40 minor Rust releases.
for i in 0..40 {
let url = format!("https://thanks.rust-lang.org/rust/1.{}.0/", i);
// Spawn a task fetching the list.
tasks.push(task::spawn(async move {
let html = get(&url).await;
// Display the number of contributors to this Rust release.
for line in html.lines() {
if line.contains("individuals") {
println!("{}", line.trim());
}
}
}))
}
// Wait for all tasks to complete.
for t in tasks {
t.await;
}
// Display elapsed time.
dbg!(start.elapsed());
});
}

这个程序在我的机器上需要1.5秒就可以完成。注意因为get函数是异步的,所以获取40个页面是并发执行的。

现在让我们把get改成阻塞方式。我们使用attohttpc代替surf,它们比较类似,除了提供一个同步的接口:

1
2
3
async fn get(url: &str) -> String {
attohttpc::get(url).send().unwrap().text().unwrap()
}

不出所料,这个程序现在效率更低,需要3秒钟完成。我的计算机有8个逻辑内核,这意味着异步std执行器生成8个工作线程,因此我们一次只能获取8个web页面。

这个例子的教训是:阻塞会损害并发性。很重要的一点是,我们不要在异步代码内部阻塞,否则执行器将无法执行有用的工作-相反,它只会浪费时间阻塞。

阻塞无处不在

通过上面的我们看到了异步代码中的阻塞是如何影响性能的。当然,这个例子有点”做作“,因为您只需要使用surf而不是attohttpc,问题就解决了。但坏消息是,阻塞是不易察觉的:它无处不在,你甚至没有意识到!

考虑标准输入标准输出。很明显,读取标准输入块时,不应该在异步代码中使用std::io::Stdin。但是如果你看到println!你会皱眉头吗!我敢打赌,我们大部分时间都假设打印到标准输出不会阻塞,而它确实是阻塞的。

如果你想知道什么场景下println!()会阻塞,可以假想我们在shell中执行program1 | program2,这样program1的输出就通过管道传输到program2中。如果program2读取输入的速度非常慢,那么program1将不得不在打印某些内容并且管道已满时阻塞。

密集的计算也会导致阻塞。假设我们通过调用v.sort()对一个非常大的Vec进行排序。如果排序需要一秒钟左右的时间来完成,我们应该考虑将该计算从异步执行器中移除。

有时甚至有一些程序员不太小心会掉进的“陷阱”。例如,假设我们使用Rayon在异步代码中调用v.par_sort(),人们可能会天真地认为这是可以的,因为排序发生在Rayon的执行器中,而事实是异步执行器仍然会阻塞以等待Rayon的结果。

但性能下降并不是唯一的问题。如果异步执行器的每一个线程都陷在读取标准输入之类的事情上,那么整个程序也有可能陷入死锁状态,无法继续执行!

最后,值得一提的是,即使是简单的内存访问也可能被阻塞!例如,考虑驻留在旋转磁盘上的swap memory。如果线程正在访问磁盘上的swap memory,它将不得不阻塞,直到该页从物理磁盘中取出并移到主内存中。

所以阻塞是非常普遍的,很难从异步代码中完全分离出来。我相信我们必须接受这样一个事实:不管我们如何小心地消除阻塞,阻塞总是存在于异步代码中。

可能的解决方案

当我们预期在异步代码中阻塞时,我们应该考虑将阻塞逻辑移动到不同的线程池中,这样执行器就可以继续运行而不必等待它。像async stdtokio这样的运行时提供了spawn_blocking()函数来帮助解决这个问题。

为了演示如何使用该函数,让我们看看fs::read_to_string()是如何在async std中实现的:

1
2
3
4
async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned();
spawn_blocking(move || std::fs::read_to_string(path)).await
}

函数spawn_blocking()将闭包生成到专用于运行阻塞函数的特殊线程池中。然后,异步执行器不必阻塞闭包的结果,而是异步await返回的JoinHandle的结果。

注意,我们不能将对path的引用传递到闭包中,因为在同步版本完成之前,可能会取消read_to_string()函数。不幸的是,将路径传递到闭包的唯一方法是克隆它。这有点低效,也有点笨重。

幸运的是,Tokio有一种运行阻塞函数的替代方法:它可以就地执行闭包,并告诉当前线程停止作为异步执行器的一部分,并将该职责移交给一个新线程。在某种程度上,它与spawn_blocking()相反——我们没有将闭包发送到新线程并继续事件循环,而是将事件循环发送到新线程并继续运行闭包。

这是block_in_place()实现异步read_to_string()的方式:

1
2
3
async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
block_in_place(|| std::fs::read_to_string(path))
}

注意我们不必再clone path,这是因为在内部sync read_to_string()完成之前你不可能取消外部的async read_to_string()

虽然spawn_blocking()block_in_place()都解决了异步执行器陷入阻塞代码的问题,但它们之间有一个重要的区别。注意spawn_blocking()实际上是一个异步函数,因为它返回一个可以等待的future,而block_in_place()只是一个普通的同步函数。

通过例子看看有什么区别:

1
let (s1, s2) = futures::join!(read_to_string("foo.txt"), read_to_string("bar.txt"));

如果read_to_string()是通过spawn_blocking()实现的,那么这两个文件可以并行的读取,而如果是通过block_in_place()实现的,那么这两个文件是串行读取的,一个读完才读下一个。

结论

关键结论是:

  • 在异步代码中阻塞将使性能受损,甚至导致死锁。
  • 我们需要使用spawn_blocking()block_in_place()隔离程序的阻塞部分。
  • 阻塞无处不在,很难完全隔离它。

此外,有时甚至很难说什么代码是阻塞的,什么代码不是阻塞的。如果一个函数需要1秒来完成,我们可能会认为它是阻塞的。但如果需要1毫秒呢?好吧,取决于特定的用例-有时我们应该考虑阻塞,有时我们不应该。这完全取决于你的场景。

阻塞是可怕的,我们需要防御地将它与异步代码隔离开来。但是我们只能做这么多,阻塞仍然不可避免地会潜入到我们的异步代码中。这听起来可能是一个令人悲伤和失望的状况,但我很乐观。我相信有比spawn_blocking()block_in_place() 更好的解决方案,我将在下面的博客文章中讨论。