[译]构建你自己的block_on

原文: Build your own block_on()

如果你想搞清楚 future crate中的block_on是如何工作的,那么今天就让我们写一个自己的block_on函数。

这篇博文的灵感来自两个crate: wakefulextremewakeful设计了一种从函数中创建Waker的简单方法,而extreme则是block_on()的及其简洁的实现。

我们的实现目标将与extreme略有不同。与其追求零依赖和最少的代码行数,不如追求一个安全高效但仍然非常简单的实现。

我们将使用的依赖项是pin-utils, crossbeam, 和 async-task

函数签名

block_on的签名如下。使用future作为参数,在当前线程中运行它(如果future是pending状态则阻塞),然后返回它的输出:

1
2
3
fn block_on<F: Future>(future: F) -> F::Output {
todo!()
}

现在让我们实现todo!()部分。

初次尝试

注意Futurepoll方法的第一个参数是pinned future,所以首先我们需要pin住这个future。有一个简单方法可以安全的实现,就是使用Box::pin()。我们最好pin这个future在栈上,而不是堆上。

不幸的是,pin future到栈上的唯一的安全办法就是使用pin-utils crate: pin_utils::pin_mut!(future);

pin_mut宏把future从类型F的变量转换成Pin<&mut F>的变量。

下一步我们需要实现当这个future被唤醒后的处理逻辑。在我们的场景下,唤醒应该简单的解锁运行这个future的线程。

构造一个Waker很麻烦 - 只需看看extreme的实现就知道了。 extreme是手工构建Waker的最简单的实现, 还包括那么多的 raw pointer,那么多的非安全代码... 现在先让我们跳过这一部分,留个空格以后填。

1
let waker = todo!();

最后,让我们从Waker中创建一个task context, 在一个循环中轮询这个future。如果它已经完成,则返回输出结果,如果它是pending状态,则则色当前线程:

1
2
3
4
5
6
7
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => thread::park(),
}
}

如果你对Context感到困惑,那么只需理解它就是一个Waker的包装器 - 直此而已。当设计Rust中的async/await的时候,我们并不确定除了Waker之外是否传递给poll其它东西是否有用,因为我们设计了这个包装器,可以传递更多的信息。

不管怎样...我们快完成任务了。让我们回到Waker的构建,开始完成填空todo!()

如果你仔细想想,Waker真的是一个仔细优化的奇幻的Arc<dyn Fn() + Send + Sync>版本,wake()调用这个函数。换言之,Waker是一个回调,当future继续执行的时候就会被调用。

既然Waker很难去构建, sagebind提出了waker_fn(), 一个直接的把任意函数转换成Waker的方式。我借用waker_fn(),把它放在我的crate async-task中。

在我们的block_on实现中,回调只需解锁运行当前future的线程:

1
2
let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());

太简单了,比摆弄RawWakerRawWakerVTable好太多了。

内部实现上,waker_fn()创建一个Arc<impl Fn() + Send + Sync>,通过非安全代码把它转换成Waker,就像我们在extreme中看到的那样。

现在让我们列出block_on的完整实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
let thread = thread::current();
let waker = async_task::waker_fn(move || thread.unpark());
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => thread::park(),
}
}
}

如果你想运行代码,你可以下载代码v1.rs

parking的问题

但是,先别忙着庆祝,这里有个问题。如果future中的用户代码使用到了park/unpark API,它可能“偷走”回调中的unpark通知,查看这个issue以了解更详细的情况。

一个可能的解决方案就是使用不同于std::thread的park/unpark api。这种方案下future内部的代码不会干扰唤醒机能。

crossbeam有一个类似的park/unpark机制,而且它可以让我们创建任意多个packer,而不是每个线程一个。让我们在block_on每次调用的时候都创建一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
}

好啦,问题解决。

如果你想运行代码,可以执行文件v2.rs

通过cache优化

创建WakerParker并不是没有代价,创建对象都是有花费的,太不幸了,如何提升?

既然每次调用block_on都需要创建WakerParker,为什么我们不在thread-local storage中缓存它们呢?这样调用block_on()时线程可以重用相同的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
thread_local! {
static CACHE: (Parker, Waker) = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
(parker, waker)
};
}
CACHE.with(|(parker, waker)| {
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}

如果future可以很快执行,那么这个小小的改变可以使block_on显著地提高性能。

v3.rs代码。

递归怎么办?

我们完成了么? 嗯...还差最后一项。

如果在block_on中的future的代码中再递归调用block_on会怎样?我们可以允许递归调用或者禁止递归。

如果我们允许递归,我们需要确保block_on的递归调用不会共享相同的ParkerWaker,否则没有办法区分哪个block_on需要唤醒。

futures crate的block_on在递归调用的时候会panic。我对允许还是禁止递归调用没有强烈的倾向性 - 它们都有理。 但是既然我们在模仿futures版本,就让我们禁止吧。

为了探测递归调用,我们需要引入另一个thread-local变量,指示我们是否已经在block_on中还是不在,如果一个mutable borrow已经active则panic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fn block_on<F: Future>(future: F) -> F::Output {
pin_utils::pin_mut!(future);
thread_local! {
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = async_task::waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
let (parker, waker) = &mut *cache.try_borrow_mut().ok()
.expect("recursive `block_on`");
let cx = &mut Context::from_waker(&waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}

现在,我保证,我们的block_on()已经实现好了。最终版本的block_on()是正确的,健壮的,并且效率也高。

看代码v4.rs

benchmark

效率高不高拉出来溜溜。让我们和futures中实现做比较。

首先让我们写一个辅助future类型,它会yield多次然后完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Yields(u32);
impl Future for Yields {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 == 0 {
Poll::Ready(())
} else {
self.0 -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

例如yield 10次:

1
2
3
4
#[bench]
fn custom_block_on_10_yields(b: &mut Bencher) {
b.iter(|| block_on(Yields(10)));
}

让我们测试三次,分别yield 0次、10次、50次,分别使用我们自己实现的block_onfutures中的block_on。你可以在yield.rs找到全部代码。

以下是我机器上的运行结果:

1
2
3
test custom_block_on_0_yields ... bench: 3 ns/iter (+/- 0)
test custom_block_on_10_yields ... bench: 130 ns/iter (+/- 12)
test custom_block_on_50_yields ... bench: 638 ns/iter (+/- 20)
1
2
3
test futures_block_on_0_yields ... bench: 10 ns/iter (+/- 0)
test futures_block_on_10_yields ... bench: 236 ns/iter (+/- 10)
test futures_block_on_50_yields ... bench: 1,139 ns/iter (+/- 30)

结果显示我们的实现在这个场景下2到3倍的快。

还不错。

结论

Rust异步编程可能让人害怕,因为它包含太多机器相关的东西:Future trait、pinning、Context类型、Waker以及它们的朋友RawWakerRawWakerVTable、async和await、非安全的代码、raw pointer等等。

但问题是,很多丑陋的东西并不重要 - 它们只是无聊的样板,你可以使用pin-utils, async-taskcrossbeam等。

事实上,这次我们成功地使用几十行代码就构建了一个健壮高效的block_on(),无需理解大多数样板文件。在另一篇博文中,我们将构建一个真正的executor。