使用 Async 实现并发

在本节中,我们将把 async 应用于我们在第 16 章中使用线程解决的一些相同的并发挑战。由于我们已经在那里讨论了很多关键思想,因此在本节中,我们将重点关注线程和 future 之间的区别。

在许多情况下,使用 async 进行并发的 API 与使用线程的 API 非常相似。在其他情况下,它们的最终形状却大相径庭。即使线程和 async 之间的 API *看起来* 相似,它们通常也具有不同的行为——并且几乎总是具有不同的性能特征。

计数

我们在第 16 章中解决的第一个任务是在两个单独的线程上计数。让我们使用 async 做同样的事情。trpl crate 提供了一个 spawn_task 函数,它看起来与 thread::spawn API 非常相似,以及一个 sleep 函数,它是 thread::sleep API 的异步版本。我们可以一起使用这些函数来实现与线程相同的计数示例,如 Listing 17-6 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: 使用 spawn_task 与两个计数

作为我们的起点,我们使用 trpl::run 设置我们的 main 函数,以便我们的顶层函数可以是异步的。

注意:从本章的这一点开始,每个示例都将包含这个完全相同的包装代码,其中 main 中包含 trpl::run,因此我们通常会像对 main 一样跳过它。不要忘记将其包含在您的代码中!

然后我们在该块中编写两个循环,每个循环中都有一个 trpl::sleep 调用,它等待半秒(500 毫秒)然后再发送下一条消息。我们将一个循环放在 trpl::spawn_task 的主体中,另一个循环放在顶层 for 循环中。我们还在 sleep 调用后添加了 .await

这与基于线程的实现做的事情类似——包括当您运行时,您可能会在自己的终端中看到消息以不同的顺序出现这一事实。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

此版本在 main 异步块主体中的 for 循环完成后立即停止,因为当 main 函数结束时,spawn_task 生成的任务将关闭。如果您想一直运行到任务完成,则需要使用 join handle 来等待第一个任务完成。对于线程,我们使用 join 方法“阻塞”直到线程完成运行。在 Listing 17-7 中,我们可以使用 await 来做同样的事情,因为任务句柄本身就是一个 future。它的 Output 类型是 Result,因此我们在等待它之后也将其解包。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: 使用带有 join handle 的 .await 运行任务直至完成

此更新版本运行到 *两个* 循环都完成。

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

到目前为止,async 和线程看起来给了我们相同的基本结果,只是语法不同:在 join handle 上使用 .await 而不是调用 join,以及等待 sleep 调用。

更大的区别是我们不需要生成另一个操作系统线程来执行此操作。实际上,我们甚至不需要在此处生成任务。由于异步块编译为匿名 future,我们可以将每个循环放在一个异步块中,并让运行时使用 trpl::join 函数运行它们直到完成。

在第 16 章中,我们展示了如何在调用 std::thread::spawn 时返回的 JoinHandle 类型上使用 join 方法。trpl::join 函数与之类似,但用于 future。当您给它两个 future 时,它会生成一个新的 future,其输出是一个元组,其中包含您传入的每个 future 的输出,一旦 *两者* 都完成。因此,在 Listing 17-8 中,我们使用 trpl::join 来等待 fut1fut2 都完成。我们 *不* 等待 fut1fut2,而是等待 trpl::join 生成的新 future。我们忽略输出,因为它只是一个包含两个单元值的元组。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: 使用 trpl::join 等待两个匿名 future

当我们运行此代码时,我们看到两个 future 都运行完成

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

在这里,您每次都会看到完全相同的顺序,这与我们在线程中看到的非常不同。这是因为 trpl::join 函数是 *公平的*,这意味着它会同等频率地检查每个 future,在它们之间交替,并且永远不会让一个 future 在另一个 future 准备好时抢先一步。对于线程,操作系统决定检查哪个线程以及允许其运行多长时间。使用异步 Rust,运行时决定检查哪个任务。(实际上,细节变得复杂,因为异步运行时可能会在底层使用操作系统线程作为其管理并发的一部分,因此保证公平性对于运行时来说可能需要更多的工作——但这仍然是可能的!)运行时不必为任何给定的操作保证公平性,并且运行时通常提供不同的 API,让您选择是否需要公平性。

尝试一些等待 future 的不同变体,看看它们的作用

  • 从任一或两个循环周围删除异步块。
  • 在定义每个异步块后立即等待它。
  • 仅将第一个循环包装在异步块中,并在第二个循环的主体之后等待生成的 future。

对于额外的挑战,看看您是否可以在运行代码 *之前* 弄清楚每种情况下的输出是什么!

消息传递

在 future 之间共享数据也将是熟悉的:我们将再次使用消息传递,但这使用异步版本的类型和函数。我们将采用与第 16 章中稍有不同的路径,以说明基于线程和基于 future 的并发之间的一些关键区别。在 Listing 17-9 中,我们将从一个异步块开始——*不* 生成单独的任务,就像我们生成单独的线程一样。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}
Listing 17-9: 创建异步通道并将两个半部分分配给 txrx

在这里,我们使用 trpl::channel,它是我们在第 16 章中使用线程使用的多生产者、单消费者通道 API 的异步版本。API 的异步版本与基于线程的版本只有一点不同:它使用可变而不是不可变的接收器 rx,并且其 recv 方法生成一个我们需要等待而不是直接生成值的 future。现在我们可以从发送者向接收者发送消息。请注意,我们不必生成单独的线程甚至任务;我们只需要等待 rx.recv 调用即可。

std::mpsc::channel 中的同步 Receiver::recv 方法会阻塞,直到它收到消息。trpl::Receiver::recv 方法不会阻塞,因为它是异步的。它不会阻塞,而是将控制权交还给运行时,直到收到消息或通道的发送端关闭。相比之下,我们不等待 send 调用,因为它不会阻塞。它不需要阻塞,因为我们正在发送到的通道是无界的。

注意:由于所有这些异步代码都在 trpl::run 调用中的异步块中运行,因此其中的所有内容都可以避免阻塞。但是,*外部* 的代码将阻塞 run 函数返回。这就是 trpl::run 函数的全部意义:它允许您 *选择* 在一组异步代码上阻塞的位置,从而选择在同步代码和异步代码之间转换的位置。在大多数异步运行时中,run 实际上被命名为 block_on,正是因为这个原因。

请注意此示例中的两件事:首先,消息将立即到达!其次,尽管我们在这里使用了 future,但还没有并发。列表中的所有内容都按顺序发生,就像没有涉及 future 一样。

让我们通过发送一系列消息并在它们之间休眠来解决第一部分,如 Listing 17-10 所示

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: 通过异步通道发送和接收多条消息,并在每条消息之间使用 .await 休眠

除了发送消息之外,我们还需要接收它们。在这种情况下,我们可以手动执行此操作,只需执行四次 rx.recv().await 即可,因为我们知道有多少消息传入。但是,在现实世界中,我们通常会等待一些 *未知* 数量的消息。在这种情况下,我们需要一直等待,直到我们确定没有更多消息为止。

在 Listing 16-10 中,我们使用 for 循环来处理从同步通道接收到的所有项目。但是,Rust 还没有办法对 *异步* 项目序列编写 for 循环。相反,我们需要使用一种我们以前从未见过的新型循环,即 while let 条件循环。while let 循环是我们在第 6 章中看到的 if let 构造的循环版本。只要它指定的模式继续匹配该值,循环就会继续执行。

rx.recv 调用生成一个 Future,我们等待它。运行时将暂停 Future,直到它准备就绪。一旦消息到达,future 将解析为 Some(message),次数与消息到达的次数相同。当通道关闭时,无论是否到达 *任何* 消息,future 都将解析为 None,以指示没有更多值,我们应该停止轮询——也就是说,停止等待。

while let 循环将所有这些组合在一起。如果调用 rx.recv().await 的结果是 Some(message),我们可以访问该消息,并且可以在循环体中使用它,就像我们对 if let 所做的那样。如果结果是 None,则循环结束。每次循环完成时,它都会再次命中等待点,因此运行时会再次暂停它,直到另一条消息到达。

代码现在成功发送和接收所有消息。不幸的是,仍然存在一些问题。首先,消息不是以半秒的间隔到达的。它们在程序启动两秒(2,000 毫秒)后一次性全部到达。其次,此程序也永远不会退出!相反,它永远等待新消息。您需要使用 ctrl-c 将其关闭。

让我们首先了解为什么消息在完全延迟后一次性全部到达,而不是在每个消息之间都有延迟。在给定的异步块中,.await 关键字在代码中出现的顺序也是它们在运行程序时发生的顺序。

Listing 17-10 中只有一个异步块,因此其中的所有内容都线性运行。仍然没有并发。所有 tx.send 调用都会发生,其中穿插着所有 trpl::sleep 调用及其相关的等待点。只有这样,while let 循环才能遍历 recv 调用上的任何 .await 点。

为了获得我们想要的行为,即在接收每条消息之间发生睡眠延迟,我们需要将 txrx 操作放在它们自己的异步块中。然后,运行时可以使用 trpl::join 分别执行它们,就像在计数示例中一样。再次,我们等待调用 trpl::join 的结果,而不是单独的 future。如果我们按顺序等待单独的 future,我们最终只会回到顺序流中——这正是我们试图 *不* 做的事情。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: 将 sendrecv 分离到它们自己的 async 块中并等待这些块的 future

使用 Listing 17-11 中的更新代码,消息以 500 毫秒的间隔打印,而不是在两秒后匆忙打印。

但是,程序仍然永远不会退出,因为 while let 循环与 trpl::join 交互的方式

  • 只有在传递给它的 *两个* future 都完成后,从 trpl::join 返回的 future 才会完成。
  • tx future 在发送 vals 中的最后一条消息后完成睡眠后完成。
  • while let 循环结束之前,rx future 不会完成。
  • 在等待 rx.recv 产生 None 之前,while let 循环不会结束。
  • 只有在通道的另一端关闭后,等待 rx.recv 才会返回 None
  • 只有当我们调用 rx.close 或当发送端 tx 被丢弃时,通道才会关闭。
  • 我们没有在任何地方调用 rx.close,并且在传递给 trpl::run 的最外层异步块结束之前,tx 不会被丢弃。
  • 该块无法结束,因为它被阻塞在等待 trpl::join 完成,这使我们回到此列表的顶部!

我们可以通过在某处调用 rx.close 手动关闭 rx,但这没有多大意义。在处理一些任意数量的消息后停止将使程序关闭,但我们可能会错过消息。我们需要一些其他方法来确保在函数结束 *之前* 丢弃 tx

现在,我们在其中发送消息的异步块仅借用 tx,但如果我们可以将 tx 移动到该异步块中,则一旦该块结束,它将被丢弃。在第 13 章中,我们学习了如何在闭包中使用 move 关键字,在第 16 章中,我们看到在使用线程时,我们经常需要将数据移动到闭包中。相同的基本动态适用于异步块,因此 move 关键字在异步块中的工作方式与在闭包中的工作方式相同。

在 Listing 17-12 中,我们将用于发送消息的异步块从普通 async 块更改为 async move 块。当我们运行 *此* 版本的代码时,它会在发送和接收最后一条消息后正常关闭。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: 一个在 future 之间发送和接收消息的工作示例,该示例在完成时正确关闭

此异步通道也是一个多生产者通道,因此如果我们要从多个 future 发送消息,我们可以在 tx 上调用 clone。在 Listing 17-13 中,我们克隆 tx,在第一个异步块外部创建 tx1。我们将 tx1 移动到该块中,就像我们之前对 tx 所做的那样。然后,稍后,我们将原始 tx 移动到一个 *新的* 异步块中,我们在其中以稍慢的延迟发送更多消息。我们碰巧将这个新的异步块放在用于接收消息的异步块之后,但它也可以放在它之前。关键是 future 等待的顺序,而不是它们创建的顺序。

用于发送消息的两个异步块都需要是 async move 块,以便在这些块完成时丢弃 txtx1。否则,我们将最终回到我们开始时所在的同一个无限循环中。最后,我们从 trpl::join 切换到 trpl::join3 以处理额外的 future。

文件名:src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}
Listing 17-13: 将多个生产者与异步块一起使用

现在我们看到了来自两个发送 future 的所有消息。由于发送 future 在发送后使用稍微不同的延迟,因此消息也以这些不同的间隔接收。

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

这是一个良好的开端,但它将我们限制为仅处理少量 future:两个使用 join,或三个使用 join3。让我们看看我们如何处理更多 future。