深入理解 Async Traits
在本章中,我们以各种方式使用了 Future、Pin、Unpin、Stream 和 StreamExt traits。然而,到目前为止,我们一直避免深入研究它们的工作原理或它们如何组合在一起的细节。在日常编写 Rust 代码的大部分时间里,这样做是没问题的。但是,有时您会遇到一些情况,理解更多这些细节至关重要。在本节中,我们将足够深入地探讨这些细节,以帮助您应对这些情况,同时将真正深入的探讨留给其他文档!
Future
回到 Future 和 Async 语法,我们注意到 Future 是一个 trait。让我们首先仔细看看它是如何工作的。以下是 Rust 定义 Future 的方式
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
这个 trait 定义包含了一堆新类型,以及一些我们以前没有见过的语法,所以让我们逐段分解这个定义。
首先,Future 的关联类型 Output 说明了 future 解析为何值。这类似于 Iterator trait 的 Item 关联类型。其次,Future 也有 poll 方法,它为 self 参数接受一个特殊的 Pin 引用,以及一个对 Context 类型的可变引用,并返回一个 Poll<Self::Output>。我们将在本节稍后讨论更多关于 Pin 和 Context 的内容。现在,让我们关注该方法返回的内容,即 Poll 类型
#![allow(unused)] fn main() { enum Poll<T> { Ready(T), Pending } }
Poll 类型很像 Option:它有一个带有值的变体 (Ready(T)),和一个没有值的变体 (Pending)。但它的含义却大相径庭!Pending 变体表示 future 仍有工作要做,因此调用者需要稍后再次检查。Ready 变体表示 Future 已完成其工作,并且 T 值可用。
注意:对于大多数 future,在 future 返回 Ready 后,调用者不应再次调用 poll。许多 future 在变为 ready 后再次轮询会 panic!可以安全再次轮询的 Future 将在其文档中明确说明。这类似于 Iterator::next 的行为!
在底层,当您调用 .await 时,Rust 会将其编译为调用 poll 的代码,有点像(虽然不完全)像这样
match hello("async").poll() {
Ready(_) => {
// We’re done!
}
Pending => {
// But what goes here?
}
}
当 Future 仍然是 Pending 时,我们应该做什么?我们需要某种方法再次尝试……一遍又一遍,直到 future 最终 ready。换句话说,一个循环
let hello_fut = hello("async");
loop {
match hello_fut.poll() {
Ready(_) => {
break;
}
Pending => {
// continue
}
}
}
但是,如果 Rust 将其编译成完全相同的代码,那么每个 .await 都会是阻塞的——这与我们想要达到的目标正好相反!相反,Rust 需要确保循环可以将控制权交给可以暂停此 future 的工作并处理其他 future,并在稍后再次检查此 future 的东西。“东西” 是一个 async 运行时,而这种调度和协调工作是运行时的主要工作之一。
回想一下我们在 计数 章节中对等待 rx.recv 的描述。recv 调用返回一个 Future,并且 await 它会轮询它。在我们最初的讨论中,我们注意到运行时将暂停 future,直到它准备好,并带有 Some(message) 或当通道关闭时的 None。凭借我们对 Future 更深入的理解,特别是 Future::poll,我们可以看到它是如何工作的。当 future 返回 Poll::Pending 时,运行时知道 future 尚未准备好。相反,当 poll 返回 Poll::Ready(Some(message)) 或 Poll::Ready(None) 时,运行时知道 future 已准备就绪并推进它。
运行时如何做到这一点的确切细节超出了即使是本深入探讨章节的范围。这里的关键是了解 future 的基本机制:运行时轮询它负责的每个 future,并在 future 尚未准备就绪时将其放回睡眠状态。
Pinning 以及 Pin 和 Unpin Traits
当我们在处理 Listing 17-17 时引入 pinning 的概念时,我们遇到了一个非常棘手的错误消息。这是再次相关的部分
error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
--> src/main.rs:46:33
|
46 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
--> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
当我们仔细阅读此错误消息时,它不仅告诉我们需要 pin 值,还告诉我们为什么需要 pinning。trpl::join_all 函数返回一个名为 JoinAll 的 struct。该 struct 又泛型化于类型 F,该类型被约束为实现 Future trait。最后,直接 await Future 需要所讨论的 future 实现 Unpin trait。这很多!但是如果我们更深入地研究 Future 类型实际是如何工作的,特别是围绕 pinning,我们就可以理解它了。
让我们再次查看 Future 的定义,特别关注 poll 方法的 self 类型
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }
这是我们第一次看到一个方法,其中 self 具有这样的类型注解。当我们像这样指定 self 的类型时,我们是在告诉 Rust 调用此方法时 self 必须是什么类型。这些 self 的类型注解类似于其他函数参数的类型注解,但限制是类型注解必须是方法实现的类型,或者是对该类型的引用或智能指针,或者是包装对该类型引用的 Pin。我们将在第 18 章中看到更多关于此语法的内容。现在,我们只需要知道,如果我们想要轮询 future(以检查它是 Pending 还是 Ready(Output)),我们需要对该类型的可变引用,该引用被包装在 Pin 中。
Pin 是一种包装类型。在某些方面,它类似于我们在第 15 章中看到的 Box、Rc 和其他智能指针类型,它们也包装了其他类型。然而,与这些类型不同,Pin 仅适用于其他指针类型,如引用(& 和 &mut)和智能指针(Box、Rc 等)。确切地说,Pin 适用于实现 Deref 或 DerefMut traits 的类型,我们在第 15 章中介绍了这些 traits。您可以将此限制视为等同于仅适用于指针,因为实现 Deref 或 DerefMut 意味着您的类型表现得像指针类型。Pin 本身也不是指针,它也没有任何自己的行为,例如 Rc 或 Arc 的引用计数。它纯粹是编译器可以用来维护相关保证的工具,通过将指针包装在类型中。
回想一下 .await 是如何根据对 poll 的调用来实现的,这开始解释了我们上面看到的错误消息——但这与 Unpin 有关,而不是 Pin。那么,Pin 和 Unpin 到底是什么,它们之间有什么关系,以及为什么 Future 需要 self 在 Pin 类型中才能调用 poll 呢?
在 我们的第一个 Async 程序 中,我们描述了 future 中的一系列 await 点如何被编译成一个状态机——并注意到编译器如何帮助确保状态机遵循 Rust 关于安全性的所有正常规则,包括借用和所有权。为了实现这一点,Rust 会查看每个 await 点和下一个 await 点或 async 代码块的结尾之间需要哪些数据。然后,它在它创建的状态机中创建一个相应的变体。每个变体都会获得访问权限,以访问将在源代码的该部分中使用的数据,无论是通过取得该数据的所有权,还是通过获得对它的可变或不可变引用。
到目前为止一切顺利:如果我们在给定的 async 代码块中弄错了所有权或引用,借用检查器会告诉我们。当我们想要移动与该代码块对应的 future 时——例如将其移动到 Vec 中以传递给 join_all——事情变得棘手起来。
当我们移动 future 时——无论是通过推入数据结构以用作 join_all 的迭代器,还是从函数返回它们——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为 async 代码块创建的 future 可能会在任何给定变体的字段中最终包含对自身的引用。任何具有对自身引用的对象移动都是不安全的,因为引用总是指向它们引用的事物的实际内存地址。如果您移动数据结构本身,则必须更新对它的任何引用,否则它们将指向旧的位置。
原则上,您可以让 Rust 编译器尝试在每次对象移动时更新对它的每个引用。这可能会带来很大的性能开销,特别是考虑到可能存在需要更新的整个引用网络。另一方面,如果我们能确保所讨论的数据结构不会在内存中移动,我们就不必更新任何引用。而这正是 Rust 的借用检查器已经保证的:您不能使用安全代码移动任何具有活动引用的条目。
Pin 基于此为我们提供了我们需要的确切保证。当我们通过将指向值的指针包装在 Pin 中来 pin 值时,它将不再移动。因此,如果您有 Pin<Box<SomeType>>,您实际上是 pin 了 SomeType 值,而不是 Box 指针。事实上,pinned box 指针可以自由移动。请记住:我们关心确保最终被引用的数据保持在原位。如果指针移动,但它指向的数据在同一位置,则没有问题。
但是,即使它们碰巧位于 Pin 指针后面,大多数类型也可以安全地移动。我们只需要在条目具有内部引用时才考虑 pinning。像数字和布尔值这样的原始值没有任何像这样的内部结构,因此它们显然是安全的。您通常在 Rust 中使用的大多数类型也是如此。例如,Vec 没有任何内部引用需要这样保持最新,因此您可以移动它而无需担心。如果您有 Pin<Vec<String>>,即使 Vec<String> 在没有其他对其的引用的情况下始终可以安全移动,您也必须通过 Pin 的安全但限制性的 API 来完成所有操作。我们需要一种方法来告诉编译器,在这些情况下移动条目实际上是可以的。为此,我们有 Unpin。
Unpin 是一个 marker trait,就像我们在第 16 章中看到的 Send 和 Sync 一样。回想一下,marker traits 本身没有任何功能。它们的存在只是为了告诉编译器,在特定上下文中,使用实现给定 trait 的类型是安全的。Unpin 通知编译器,给定的类型不需要维护关于所讨论的值是否可以移动的任何特定保证。
就像 Send 和 Sync 一样,编译器会自动为所有它可以证明安全的类型实现 Unpin。手动实现 Unpin 是不安全的,因为它要求您 为具有内部引用的类型维护所有使 Pin 和 Unpin 本身安全的保证。在实践中,这是一种非常罕见的事情!
现在我们已经了解了足够多的信息来理解为 join_all 调用报告的错误。我们最初尝试将 async 代码块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>> 中,但正如我们所见,这些 future 可能具有内部引用,因此它们没有实现 Unpin。它们需要被 pin,然后我们可以将 Pin 类型传递到 Vec 中,确信 future 中的底层数据不会被移动。
Pin 和 Unpin 主要对于构建较低级别的库,或者当您构建运行时本身时很重要,而不是对于日常 Rust 代码。但是,当您看到它们时,现在您就会知道该怎么做了!
注意:Pin 和 Unpin 的这种组合允许一整类复杂的类型在 Rust 中是安全的,否则这些类型很难实现,因为它们是自引用的。今天,需要 Pin 的类型最常出现在 async Rust 中,但您也可能会——非常罕见地!——在其他上下文中看到它。
Pin 和 Unpin 在底层如何工作的具体机制在 std::pin 的 API 文档中得到了广泛的介绍,因此如果您想更深入地了解它们,这是一个很好的起点。
如果您想更详细地了解事情的 “底层” 工作原理,官方的 Rust 异步编程 书籍已涵盖了您
Stream Trait
现在我们对 Future、Pin 和 Unpin traits 有了更深入的了解,我们可以将注意力转向 Stream trait。正如介绍 stream 的部分所述,stream 就像异步迭代器。与 Iterator 和 Future 不同,截至撰写本文时,标准库中没有 Stream trait 的定义,但是在整个生态系统中使用的非常常见的定义。
让我们回顾一下 Iterator 和 Future traits 的定义,以便我们可以构建一个将它们合并在一起的 Stream trait 的外观。从 Iterator 中,我们有序列的概念:它的 next 方法提供了一个 Option<Self::Item>。从 Future 中,我们有随着时间推移的 readiness 概念:它的 poll 方法提供了一个 Poll<Self::Output>。为了表示随时间变为 ready 的条目序列,我们定义了一个 Stream trait,它将这些特性组合在一起
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; } }
Stream trait 为 stream 生成的条目的类型定义了一个关联类型 Item。这类似于 Iterator:可能存在零到多个条目,并且与 Future 不同,Future 始终只有一个 Output(即使它是 unit 类型 ())。
Stream 还定义了一个获取这些条目的方法。我们称之为 poll_next,以明确它像 Future::poll 一样轮询,并像 Iterator::next 一样生成一系列条目。它的返回类型结合了 Poll 和 Option。外部类型是 Poll,因为它必须像 future 一样检查 readiness。内部类型是 Option,因为它需要像迭代器一样发出信号,指示是否还有更多消息。
非常类似于此的内容很可能会最终标准化为 Rust 标准库的一部分。与此同时,它是大多数运行时的工具包的一部分,因此您可以依赖它,并且我们下面介绍的所有内容通常都适用!
但是,在我们看到的流式处理章节的示例中,我们没有使用 poll_next 或 Stream,而是使用了 next 和 StreamExt。当然,我们可以通过手动编写我们自己的 Stream 状态机,直接使用 poll_next API,就像我们可以通过它们的 poll 方法直接使用 future 一样。但是,使用 await 要好得多,因此 StreamExt trait 提供了 next 方法,以便我们可以做到这一点。
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>; } trait StreamExt: Stream { async fn next(&mut self) -> Option<Self::Item> where Self: Unpin; // other methods... } }
注意:我们将使用的实际定义看起来与此略有不同,因为它支持尚不支持在 traits 中使用 async 函数的 Rust 版本。因此,它看起来像这样
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;
Next 类型只是一个简单的 struct,它实现了 Future 并提供了一种使用 Next<'_, Self> 命名对 self 的引用的生命周期的方法,以便 .await 可以与此一起使用!
StreamExt trait 也是所有可用于 stream 的有趣方法的所在地。StreamExt 会自动为每个实现 Stream 的类型实现,但它们是分开的,以便社区可以从便利 API 中区分出基础 trait 并对其进行迭代。
在 trpl crate 中使用的 StreamExt 版本中,该 trait 不仅定义了 next 方法,它还提供了 next 的实现,该实现正确处理了调用 Stream::poll_next 的细节。这意味着即使您需要编写自己的流式数据类型,您只需实现 Stream,然后任何使用您的数据类型的人都可以自动使用 StreamExt 及其方法。
这就是我们将要介绍的关于这些 traits 的较低级别细节的全部内容。为了总结,让我们考虑一下 future(包括 stream)、任务和线程是如何组合在一起的!