取消

当一个挂起的future被丢弃时会发生什么?运行时将不再对其轮询,因此它不会进一步推进。换句话说,其执行已经被取消了。

在实际应用中,这种情况经常发生在处理超时场景中。例如:

#![allow(unused)]
fn main() {
use tokio::time::timeout;
use tokio::sync::oneshot;
use std::time::Duration;

async fn http_call() {
    // [...]
}

async fn run() {
    // 用设置为10毫秒后超时的`Timeout`包裹future。
    let duration = Duration::from_millis(10);
    if let Err(_) = timeout(duration, http_call()).await {
        println!("10毫秒内未收到值");
    }
}
}

当超时时,由http_call返回的future会被取消。让我们想象这是http_call的主体:

#![allow(unused)]
fn main() {
use std::net::TcpStream;

async fn http_call() {
    let (stream, _) = TcpStream::connect(/* */).await.unwrap();
    let request: Vec<u8> = /* */;
    stream.write_all(&request).await.unwrap();
}
}

每个await点都变成了一个取消点。由于http_call不能被运行时抢占,因此只能在通过.await将控制权交回执行器后被丢弃。这递归适用,例如,stream.write_all(&request)在其实现中可能有多个yield点。完全有可能看到http_call在被取消前推送了_部分_请求,从而断开连接,永远无法完成消息体的传输。

清理

Rust的取消机制非常强大,它允许调用者无需任何来自任务本身的协作就能取消正在进行的任务。同时,这也相当危险。可能需要进行优雅的取消,以确保在中止操作之前执行某些清理任务。

例如,考虑这个虚构的SQL事务API:

#![allow(unused)]
fn main() {
async fn transfer_money(
    connection: SqlConnection,
    payer_id: u64,
    payee_id: u64,
    amount: u64
) -> Result<(), anyhow::Error> {
    let transaction = connection.begin_transaction().await?;
    update_balance(payer_id, amount, &transaction).await?;
    decrease_balance(payee_id, amount, &transaction).await?;
    transaction.commit().await?;
}
}

在取消时,理想情况下应明确地中止挂起的事务,而非使其悬而未决。遗憾的是,Rust并未提供一种针对这种异步清理操作的万无一失的机制。

最常见的策略是依赖Drop特质来安排所需的清理工作。这可以通过:

  • 在运行时上派生一个新的任务
  • 在通道上排队一条消息
  • 派生一个后台线程

最佳选择视情况而定。

取消已派生的任务

使用tokio::spawn派生任务后,你将无法再丢弃它;它属于运行时。尽管如此,如果有需要,你可以使用其JoinHandle来取消它:

#![allow(unused)]
fn main() {
async fn run() {
    let handle = tokio::spawn(/* 某个异步任务 */);
    // 取消已派生的任务
    handle.abort();
}
}

进一步阅读

  • 使用tokioselect!宏来“竞速”两个不同的future时要极度小心。除非你能确保取消安全,否则在循环中重试同一任务是危险的。更多细节请查看select!的文档。 如果你需要交织两个异步数据流(如socket和通道),推荐使用StreamExt::merge代替。
  • 与“突然”的取消相比,依赖于CancellationToken可能是更可取的。