使用 Async 实现并发
在本节中,我们将把 async 应用于我们在第 16 章中使用线程解决的一些相同的并发挑战。由于我们已经在那里讨论了很多关键思想,因此在本节中,我们将重点关注线程和 future 之间的区别。
在许多情况下,使用 async 进行并发的 API 与使用线程的 API 非常相似。在其他情况下,它们的最终形状却大相径庭。即使线程和 async 之间的 API *看起来* 相似,它们通常也具有不同的行为——并且几乎总是具有不同的性能特征。
计数
我们在第 16 章中解决的第一个任务是在两个单独的线程上计数。让我们使用 async 做同样的事情。trpl
crate 提供了一个 spawn_task
函数,它看起来与 thread::spawn
API 非常相似,以及一个 sleep
函数,它是 thread::sleep
API 的异步版本。我们可以一起使用这些函数来实现与线程相同的计数示例,如 Listing 17-6 所示。
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
spawn_task
与两个计数作为我们的起点,我们使用 trpl::run
设置我们的 main
函数,以便我们的顶层函数可以是异步的。
注意:从本章的这一点开始,每个示例都将包含这个完全相同的包装代码,其中 main
中包含 trpl::run
,因此我们通常会像对 main
一样跳过它。不要忘记将其包含在您的代码中!
然后我们在该块中编写两个循环,每个循环中都有一个 trpl::sleep
调用,它等待半秒(500 毫秒)然后再发送下一条消息。我们将一个循环放在 trpl::spawn_task
的主体中,另一个循环放在顶层 for
循环中。我们还在 sleep
调用后添加了 .await
。
这与基于线程的实现做的事情类似——包括当您运行时,您可能会在自己的终端中看到消息以不同的顺序出现这一事实。
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
此版本在 main 异步块主体中的 for 循环完成后立即停止,因为当 main 函数结束时,spawn_task
生成的任务将关闭。如果您想一直运行到任务完成,则需要使用 join handle 来等待第一个任务完成。对于线程,我们使用 join
方法“阻塞”直到线程完成运行。在 Listing 17-7 中,我们可以使用 await
来做同样的事情,因为任务句柄本身就是一个 future。它的 Output
类型是 Result
,因此我们在等待它之后也将其解包。
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
.await
运行任务直至完成此更新版本运行到 *两个* 循环都完成。
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
到目前为止,async 和线程看起来给了我们相同的基本结果,只是语法不同:在 join handle 上使用 .await
而不是调用 join
,以及等待 sleep
调用。
更大的区别是我们不需要生成另一个操作系统线程来执行此操作。实际上,我们甚至不需要在此处生成任务。由于异步块编译为匿名 future,我们可以将每个循环放在一个异步块中,并让运行时使用 trpl::join
函数运行它们直到完成。
在第 16 章中,我们展示了如何在调用 std::thread::spawn
时返回的 JoinHandle
类型上使用 join
方法。trpl::join
函数与之类似,但用于 future。当您给它两个 future 时,它会生成一个新的 future,其输出是一个元组,其中包含您传入的每个 future 的输出,一旦 *两者* 都完成。因此,在 Listing 17-8 中,我们使用 trpl::join
来等待 fut1
和 fut2
都完成。我们 *不* 等待 fut1
和 fut2
,而是等待 trpl::join
生成的新 future。我们忽略输出,因为它只是一个包含两个单元值的元组。
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
trpl::join
等待两个匿名 future当我们运行此代码时,我们看到两个 future 都运行完成
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
在这里,您每次都会看到完全相同的顺序,这与我们在线程中看到的非常不同。这是因为 trpl::join
函数是 *公平的*,这意味着它会同等频率地检查每个 future,在它们之间交替,并且永远不会让一个 future 在另一个 future 准备好时抢先一步。对于线程,操作系统决定检查哪个线程以及允许其运行多长时间。使用异步 Rust,运行时决定检查哪个任务。(实际上,细节变得复杂,因为异步运行时可能会在底层使用操作系统线程作为其管理并发的一部分,因此保证公平性对于运行时来说可能需要更多的工作——但这仍然是可能的!)运行时不必为任何给定的操作保证公平性,并且运行时通常提供不同的 API,让您选择是否需要公平性。
尝试一些等待 future 的不同变体,看看它们的作用
- 从任一或两个循环周围删除异步块。
- 在定义每个异步块后立即等待它。
- 仅将第一个循环包装在异步块中,并在第二个循环的主体之后等待生成的 future。
对于额外的挑战,看看您是否可以在运行代码 *之前* 弄清楚每种情况下的输出是什么!
消息传递
在 future 之间共享数据也将是熟悉的:我们将再次使用消息传递,但这使用异步版本的类型和函数。我们将采用与第 16 章中稍有不同的路径,以说明基于线程和基于 future 的并发之间的一些关键区别。在 Listing 17-9 中,我们将从一个异步块开始——*不* 生成单独的任务,就像我们生成单独的线程一样。
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
tx
和 rx
在这里,我们使用 trpl::channel
,它是我们在第 16 章中使用线程使用的多生产者、单消费者通道 API 的异步版本。API 的异步版本与基于线程的版本只有一点不同:它使用可变而不是不可变的接收器 rx
,并且其 recv
方法生成一个我们需要等待而不是直接生成值的 future。现在我们可以从发送者向接收者发送消息。请注意,我们不必生成单独的线程甚至任务;我们只需要等待 rx.recv
调用即可。
std::mpsc::channel
中的同步 Receiver::recv
方法会阻塞,直到它收到消息。trpl::Receiver::recv
方法不会阻塞,因为它是异步的。它不会阻塞,而是将控制权交还给运行时,直到收到消息或通道的发送端关闭。相比之下,我们不等待 send
调用,因为它不会阻塞。它不需要阻塞,因为我们正在发送到的通道是无界的。
注意:由于所有这些异步代码都在 trpl::run
调用中的异步块中运行,因此其中的所有内容都可以避免阻塞。但是,*外部* 的代码将阻塞 run
函数返回。这就是 trpl::run
函数的全部意义:它允许您 *选择* 在一组异步代码上阻塞的位置,从而选择在同步代码和异步代码之间转换的位置。在大多数异步运行时中,run
实际上被命名为 block_on
,正是因为这个原因。
请注意此示例中的两件事:首先,消息将立即到达!其次,尽管我们在这里使用了 future,但还没有并发。列表中的所有内容都按顺序发生,就像没有涉及 future 一样。
让我们通过发送一系列消息并在它们之间休眠来解决第一部分,如 Listing 17-10 所示
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
.await
休眠除了发送消息之外,我们还需要接收它们。在这种情况下,我们可以手动执行此操作,只需执行四次 rx.recv().await
即可,因为我们知道有多少消息传入。但是,在现实世界中,我们通常会等待一些 *未知* 数量的消息。在这种情况下,我们需要一直等待,直到我们确定没有更多消息为止。
在 Listing 16-10 中,我们使用 for
循环来处理从同步通道接收到的所有项目。但是,Rust 还没有办法对 *异步* 项目序列编写 for
循环。相反,我们需要使用一种我们以前从未见过的新型循环,即 while let
条件循环。while let
循环是我们在第 6 章中看到的 if let
构造的循环版本。只要它指定的模式继续匹配该值,循环就会继续执行。
rx.recv
调用生成一个 Future
,我们等待它。运行时将暂停 Future
,直到它准备就绪。一旦消息到达,future 将解析为 Some(message)
,次数与消息到达的次数相同。当通道关闭时,无论是否到达 *任何* 消息,future 都将解析为 None
,以指示没有更多值,我们应该停止轮询——也就是说,停止等待。
while let
循环将所有这些组合在一起。如果调用 rx.recv().await
的结果是 Some(message)
,我们可以访问该消息,并且可以在循环体中使用它,就像我们对 if let
所做的那样。如果结果是 None
,则循环结束。每次循环完成时,它都会再次命中等待点,因此运行时会再次暂停它,直到另一条消息到达。
代码现在成功发送和接收所有消息。不幸的是,仍然存在一些问题。首先,消息不是以半秒的间隔到达的。它们在程序启动两秒(2,000 毫秒)后一次性全部到达。其次,此程序也永远不会退出!相反,它永远等待新消息。您需要使用 ctrl-c 将其关闭。
让我们首先了解为什么消息在完全延迟后一次性全部到达,而不是在每个消息之间都有延迟。在给定的异步块中,.await
关键字在代码中出现的顺序也是它们在运行程序时发生的顺序。
Listing 17-10 中只有一个异步块,因此其中的所有内容都线性运行。仍然没有并发。所有 tx.send
调用都会发生,其中穿插着所有 trpl::sleep
调用及其相关的等待点。只有这样,while let
循环才能遍历 recv
调用上的任何 .await
点。
为了获得我们想要的行为,即在接收每条消息之间发生睡眠延迟,我们需要将 tx
和 rx
操作放在它们自己的异步块中。然后,运行时可以使用 trpl::join
分别执行它们,就像在计数示例中一样。再次,我们等待调用 trpl::join
的结果,而不是单独的 future。如果我们按顺序等待单独的 future,我们最终只会回到顺序流中——这正是我们试图 *不* 做的事情。
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send
和 recv
分离到它们自己的 async
块中并等待这些块的 future使用 Listing 17-11 中的更新代码,消息以 500 毫秒的间隔打印,而不是在两秒后匆忙打印。
但是,程序仍然永远不会退出,因为 while let
循环与 trpl::join
交互的方式
- 只有在传递给它的 *两个* future 都完成后,从
trpl::join
返回的 future 才会完成。 tx
future 在发送vals
中的最后一条消息后完成睡眠后完成。- 在
while let
循环结束之前,rx
future 不会完成。 - 在等待
rx.recv
产生None
之前,while let
循环不会结束。 - 只有在通道的另一端关闭后,等待
rx.recv
才会返回None
。 - 只有当我们调用
rx.close
或当发送端tx
被丢弃时,通道才会关闭。 - 我们没有在任何地方调用
rx.close
,并且在传递给trpl::run
的最外层异步块结束之前,tx
不会被丢弃。 - 该块无法结束,因为它被阻塞在等待
trpl::join
完成,这使我们回到此列表的顶部!
我们可以通过在某处调用 rx.close
手动关闭 rx
,但这没有多大意义。在处理一些任意数量的消息后停止将使程序关闭,但我们可能会错过消息。我们需要一些其他方法来确保在函数结束 *之前* 丢弃 tx
。
现在,我们在其中发送消息的异步块仅借用 tx
,但如果我们可以将 tx
移动到该异步块中,则一旦该块结束,它将被丢弃。在第 13 章中,我们学习了如何在闭包中使用 move
关键字,在第 16 章中,我们看到在使用线程时,我们经常需要将数据移动到闭包中。相同的基本动态适用于异步块,因此 move
关键字在异步块中的工作方式与在闭包中的工作方式相同。
在 Listing 17-12 中,我们将用于发送消息的异步块从普通 async
块更改为 async move
块。当我们运行 *此* 版本的代码时,它会在发送和接收最后一条消息后正常关闭。
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { eprintln!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
此异步通道也是一个多生产者通道,因此如果我们要从多个 future 发送消息,我们可以在 tx
上调用 clone
。在 Listing 17-13 中,我们克隆 tx
,在第一个异步块外部创建 tx1
。我们将 tx1
移动到该块中,就像我们之前对 tx
所做的那样。然后,稍后,我们将原始 tx
移动到一个 *新的* 异步块中,我们在其中以稍慢的延迟发送更多消息。我们碰巧将这个新的异步块放在用于接收消息的异步块之后,但它也可以放在它之前。关键是 future 等待的顺序,而不是它们创建的顺序。
用于发送消息的两个异步块都需要是 async move
块,以便在这些块完成时丢弃 tx
和 tx1
。否则,我们将最终回到我们开始时所在的同一个无限循环中。最后,我们从 trpl::join
切换到 trpl::join3
以处理额外的 future。
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
现在我们看到了来自两个发送 future 的所有消息。由于发送 future 在发送后使用稍微不同的延迟,因此消息也以这些不同的间隔接收。
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
这是一个良好的开端,但它将我们限制为仅处理少量 future:两个使用 join
,或三个使用 join3
。让我们看看我们如何处理更多 future。