处理任意数量的 Future

在前一节中,当我们从使用两个 future 切换到三个时,我们也必须从使用 join 切换到使用 join3。每次更改要 join 的 future 数量时都必须调用不同的函数,这将非常烦人。幸运的是,我们有一个宏形式的 join,我们可以将任意数量的参数传递给它。它还处理等待 future 本身。因此,我们可以重写 Listing 17-13 中的代码,以使用 join! 代替 join3,如 Listing 17-14 所示

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-14: 使用 join! 等待多个 future

这绝对比需要在 joinjoin3join4 等之间来回切换要好得多!然而,即使是这种宏形式也只在提前知道 future 数量时才有效。但在现实世界的 Rust 中,将 future 推送到集合中,然后等待该集合中的某些或所有 future 完成是一种常见的模式。

要检查某些集合中的所有 future,我们将需要迭代并 join 所有 future。trpl::join_all 函数接受任何实现了 Iterator trait 的类型,我们在第 13 章中学习过它,所以它看起来正是我们需要的。让我们尝试将 future 放入 vector 中,并将 join! 替换为 join_all

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-15: 将匿名 future 存储在 vector 中并调用 join_all

不幸的是,这无法编译。相反,我们得到这个错误

error[E0308]: mismatched types
  --> src/main.rs:43:37
   |
8  |           let tx1_fut = async move {
   |  _______________________-
9  | |             let vals = vec![
10 | |                 String::from("hi"),
11 | |                 String::from("from"),
...  |
19 | |             }
20 | |         };
   | |_________- the expected `async` block
21 |
22 |           let rx_fut = async {
   |  ______________________-
23 | |             while let Some(value) = rx.recv().await {
24 | |                 println!("received '{value}'");
25 | |             }
26 | |         };
   | |_________- the found `async` block
...
43 |           let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                       ^^^^^^ expected `async` block, found a different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:8:23: 20:10}`
              found `async` block `{async block@src/main.rs:22:22: 26:10}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and and casting it to a trait object

这可能令人惊讶。毕竟,它们都没有返回任何内容,因此每个代码块都生成一个 Future<Output = ()>。但是,Future 是一个 trait,而不是具体的类型。具体的类型是编译器为 async 代码块生成的各个数据结构。您不能将两个不同的手写结构体放入 Vec 中,编译器生成的不同结构体也适用相同的道理。

为了使这项工作正常进行,我们需要使用 trait 对象,就像我们在第 12 章的 “从 run 函数返回错误” 中所做的那样。(我们将在第 18 章中详细介绍 trait 对象。)使用 trait 对象使我们可以将这些类型生成的每个匿名 future 视为相同的类型,因为它们都实现了 Future trait。

注意:在第 8 章中,我们讨论了在 Vec 中包含多种类型的另一种方法:使用枚举来表示 vector 中可能出现的每种不同类型。但是,我们在这里无法做到这一点。首先,我们无法命名不同的类型,因为它们是匿名的。其次,我们首先想到使用 vector 和 join_all 的原因是为了能够处理 future 的动态集合,在运行时之前我们不知道它们都会是什么。

我们首先将 vec! 中的每个 future 包装在 Box::new 中,如 Listing 17-16 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-16: 尝试使用 Box::new 对齐 Vec 中 future 的类型

不幸的是,这仍然无法编译。实际上,我们遇到了与之前相同的基本错误,但是我们为第二个和第三个 Box::new 调用都得到一个错误,并且我们还得到了引用 Unpin trait 的新错误。我们稍后会回到 Unpin 错误。首先,让我们通过显式提供 futures 的类型作为 trait 对象(Listing 17-17)来修复 Box::new 调用中的类型错误。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-17: 通过使用显式类型声明来修复其余的类型不匹配错误

我们在这里必须编写的类型有点复杂,所以让我们逐步分解它

  • 最内层的类型是 future 本身。我们通过编写 Future<Output = ()> 显式地指出 future 的输出是 unit 类型 ()
  • 然后,我们用 dyn 注释 trait 以将其标记为动态。
  • 整个 trait 引用都包装在 Box 中。
  • 最后,我们显式声明 futures 是一个包含这些条目的 Vec

这已经有了很大的不同。现在,当我们运行编译器时,我们只有提及 Unpin 的错误。尽管其中有三个错误,但请注意,每个错误的内容都非常相似。

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
   --> src/main.rs:46:24
    |
46  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `join_all`
   --> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
  --> src/main.rs:46:9
   |
46 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
  --> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `{async block@src/main.rs:8:23: 20:10}` cannot be unpinned
  --> src/main.rs:46:33
   |
46 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:8:23: 20:10}`, which is required by `Box<{async block@src/main.rs:8:23: 20:10}>: std::future::Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:8:23: 20:10}>` to implement `std::future::Future`
note: required by a bound in `JoinAll`
  --> /Users/chris/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.

非常多,需要消化,所以让我们把它分解开来。消息的第一部分告诉我们,第一个 async 代码块 (src/main.rs:8:23: 20:10) 没有实现 Unpin trait,并建议使用 pin!Box::pin 来解决它。在本章的后面,我们将深入探讨关于 PinUnpin 的更多细节。但是,目前,我们可以按照编译器的建议来摆脱困境!在 Listing 17-18 中,我们首先更新 futures 的类型注释,用 Pin 包装每个 Box。其次,我们使用 Box::pin 来 pin future 本身。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{
    future::Future,
    pin::{pin, Pin},
    time::Duration,
};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

        trpl::join_all(futures).await;
    });
}
Listing 17-18: 使用 PinBox::pin 使 Vec 类型检查通过

如果我们编译并运行它,我们最终会得到我们希望的输出

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

唷!

这里还有更多我们可以探索的地方。首先,使用 Pin<Box<T>> 会带来一些额外的开销,因为使用 Box 将这些 future 放在堆上——而我们这样做只是为了使类型对齐。毕竟,我们实际上并不需要堆分配:这些 future 是此特定函数的本地 future。如上所述,Pin 本身是一种包装类型,因此我们可以获得在 Vec 中拥有单一类型的好处——这是我们最初选择 Box 的原因——而无需进行堆分配。我们可以直接将 Pin 与每个 future 一起使用,使用 std::pin::pin 宏。

但是,我们仍然必须显式声明 pinned 引用的类型;否则 Rust 仍然不知道将它们解释为动态 trait 对象,这是我们在 Vec 中需要的。因此,我们在定义每个 future 时使用 pin!,并将 futures 定义为包含对动态 Future 类型的 pinned 可变引用的 Vec,如 Listing 17-19 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{
    future::Future,
    pin::{pin, Pin},
    time::Duration,
};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}
Listing 17-19: 直接将 Pinpin! 宏一起使用,以避免不必要的堆分配

我们走到这一步时忽略了我们可能具有不同的 Output 类型的事实。例如,在 Listing 17-20 中,a 的匿名 future 实现了 Future<Output = u32>b 的匿名 future 实现了 Future<Output = &str>,而 c 的匿名 future 实现了 Future<Output = bool>

文件名: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");
    });
}
Listing 17-20: 三个具有不同类型的 future

我们可以使用 trpl::join! 来等待它们,因为它允许您传入多种 future 类型,并生成这些类型的元组。我们不能使用 trpl::join_all,因为它要求传入的 future 都具有相同的类型。记住,这个错误是我们开始这次 Pin 冒险的原因!

这是一个基本的权衡:我们可以使用 join_all 处理动态数量的 future,只要它们都具有相同的类型,或者我们可以使用 join 函数或 join! 宏处理固定数量的 future,即使它们具有不同的类型。但这与在 Rust 中使用任何其他类型相同。Future 并不特殊,即使我们有一些很好的语法来使用它们,这是一件好事。

竞速 future

当我们使用 join 系列函数和宏 “join” future 时,我们需要所有 future 都完成才能继续。但是,有时我们只需要一组 future 中的某个 future 完成才能继续——有点像一个 future 与另一个 future 竞速。此操作通常因此被命名为 race

注意:在底层,race 构建在更通用的函数 select 之上,您将在现实世界的 Rust 代码中更频繁地遇到它。select 函数可以做很多 trpl::race 函数无法做到的事情,但它也具有一些额外的复杂性,我们现在可以跳过它。

在 Listing 17-21 中,我们使用 trpl::race 来运行两个 future,slowfast,使它们相互竞争。每个 future 在开始运行时都会打印一条消息,通过调用并等待 sleep 暂停一段时间,然后在完成时打印另一条消息。然后我们将两者都传递给 trpl::race 并等待其中一个完成。(这里的结果不会太令人惊讶:fast 获胜!)请注意,与我们在 我们的第一个 Async 程序 中首次使用 race 时不同,我们在这里只是忽略了它返回的 Either 实例,因为所有有趣的行为都发生在 async 代码块的主体中。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}
Listing 17-21: 使用 race 获取最先完成的 future 的结果

请注意,如果您翻转 race 的参数顺序,即使 fast future 始终首先完成,“started” 消息的顺序也会改变。这是因为此特定 race 函数的实现是不公平的。它始终按照传递的顺序运行作为参数传递的 future。其他实现公平的,并且会随机选择首先轮询哪个 future。但是,无论我们使用的 race 实现是否公平,在另一个任务可以开始之前,其中一个 future 都将运行到其主体中的第一个 .await

回想一下 我们的第一个 Async 程序,在每个 await 点,如果正在等待的 future 尚未准备好,Rust 都会给运行时一个暂停任务并切换到另一个任务的机会。反之亦然:Rust 在 await 点暂停 async 代码块并将控制权交还给运行时。await 点之间的所有内容都是同步的。

这意味着如果您在没有 await 点的 async 代码块中执行大量工作,则该 future 将阻止任何其他 future 取得进展。您有时可能会听到这被称为一个 future 饿死 其他 future。在某些情况下,这可能没什么大不了的。但是,如果您正在进行某种昂贵的设置或长时间运行的工作,或者如果您有一个 future 将无限期地执行某个特定任务,则需要考虑何时以及何处将控制权交还给运行时。

同样,如果您有长时间运行的阻塞操作,async 可以成为一种有用的工具,可以为程序的不同部分提供相互关联的方式。

但是,在这些情况下,您如何将控制权交还给运行时呢?

让步

让我们模拟一个长时间运行的操作。Listing 17-22 引入了一个 slow 函数。它使用 std::thread::sleep 而不是 trpl::sleep,以便调用 slow 会阻塞当前线程一段时间(以毫秒为单位)。我们可以使用 slow 来代替实际世界中既耗时又阻塞的操作。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-22: 使用 thread::sleep 模拟慢速操作

在 Listing 17-23 中,我们使用 slow 来模拟在一对 future 中执行这种 CPU 密集型工作。首先,每个 future 仅在执行完大量慢速操作才将控制权交还给运行时。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-23: 使用 thread::sleep 模拟慢速操作

如果您运行此代码,您将看到此输出

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

与我们之前的示例一样,race 仍然在 a 完成后立即完成。但是,两个 future 之间没有交错。 a future 完成其所有工作,直到 trpl::sleep 调用被 await,然后 b future 完成其所有工作,直到其自身的 trpl::sleep 调用被 await,然后 a future 完成。为了使两个 future 都可以在其慢速任务之间取得进展,我们需要 await 点,以便我们可以将控制权交还给运行时。这意味着我们需要一些可以 await 的东西!

我们已经可以在 Listing 17-23 中看到这种交接的发生:如果我们删除了 a future 末尾的 trpl::sleep,它将在 b future 根本没有运行的情况下完成。也许我们可以使用 sleep 函数作为起点?

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 35);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-24: 使用 sleep 让操作切换停止进行

在 Listing 17-24 中,我们在每次调用 slow 之间添加了带有 await 点的 trpl::sleep 调用。现在,两个 future 的工作是交错的

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

a future 仍然会在将控制权交给 b 之前运行一段时间,因为它在首次调用 trpl::sleep 之前调用了 slow,但此后 future 会在每次其中一个 future 命中 await 点时来回切换。在这种情况下,我们在每次调用 slow 后都执行了此操作,但是我们可以根据对我们最有意义的方式来分解工作。

但是,我们实际上并不想在这里休眠:我们希望尽可能快地取得进展。我们只需要将控制权交还给运行时。我们可以直接使用 yield_now 函数来做到这一点。在 Listing 17-25 中,我们将所有这些 sleep 调用替换为 yield_now

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 35);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}
Listing 17-25: 使用 yield_now 让操作切换停止进行

这既更清楚地表达了实际意图,并且可以比使用 sleep 快得多,因为像 sleep 使用的计时器通常对其粒度有限制。例如,我们正在使用的 sleep 版本始终至少休眠一毫秒,即使我们传递给它一个纳秒的 Duration 也是如此。同样,现代计算机非常快:它们可以在一毫秒内完成很多事情!

您可以通过设置一个小基准来亲自查看这一点,例如 Listing 17-26 中的基准。(这不是执行性能测试的特别严格的方法,但足以显示此处的差异。)在这里,我们跳过所有状态打印,将一纳秒的 Duration 传递给 trpl::sleep,并让每个 future 单独运行,future 之间没有切换。然后我们运行 1,000 次迭代,并查看使用 trpl::sleep 的 future 与使用 trpl::yield_now 的 future 相比花费的时间。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::{Duration, Instant};

fn main() {
    trpl::run(async {
        let one_ns = Duration::from_nanos(1);
        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::sleep(one_ns).await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'sleep' version finished after {} seconds.",
            time.as_secs_f32()
        );

        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::yield_now().await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'yield' version finished after {} seconds.",
            time.as_secs_f32()
        );
    });
}
Listing 17-26: 比较 sleepyield_now 的性能

带有 yield_now 的版本快得多

这意味着 async 即使对于计算密集型任务也可能有用,具体取决于您的程序还在做什么,因为它提供了一个有用的工具来构建程序不同部分之间的关系。这是一种协作式多任务处理,其中每个 future 都具有通过 await 点确定何时交出控制权的能力。因此,每个 future 也都有责任避免阻塞太长时间。在某些基于 Rust 的嵌入式操作系统中,这是唯一一种多任务处理!

当然,在现实世界的代码中,您通常不会在每行代码上交替函数调用和 await 点。虽然像这样让出控制权相对便宜,但它并非免费!在许多情况下,尝试分解计算密集型任务可能会使其速度明显降低,因此有时为了整体性能,最好让操作短暂阻塞。您应该始终进行测量以查看代码的实际性能瓶颈在哪里。但是,如果您确实看到大量工作以串行方式发生,而您期望它们并发发生,那么底层的动态是一个需要牢记的重要因素!

构建我们自己的 Async 抽象

我们还可以将 future 组合在一起以创建新模式。例如,我们可以使用我们已经拥有的 async 构建块构建一个 timeout 函数。完成后,结果将是另一个构建块,我们可以使用它来构建更进一步的 async 抽象。

Listing 17-27 显示了我们期望此 timeout 如何与慢速 future 一起工作。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_millis(100)).await;
            "I finished!"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}
Listing 17-27: 使用我们想象的 timeout 运行具有时间限制的慢速操作

让我们实现它!首先,让我们考虑一下 timeout 的 API

  • 它本身需要是一个 async 函数,以便我们可以 await 它。
  • 它的第一个参数应该是要运行的 future。我们可以使其泛型以使其可以与任何 future 一起使用。
  • 它的第二个参数将是最大等待时间。如果我们使用 Duration,这将使其易于传递给 trpl::sleep
  • 它应该返回一个 Result。如果 future 成功完成,则 Result 将为 Ok,其中包含 future 生成的值。如果超时首先经过,则 Result 将为 Err,其中包含超时等待的持续时间。

Listing 17-28 显示了此声明。

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}
Listing 17-28: 定义 timeout 的签名

这满足了我们对类型的目标。现在让我们考虑一下我们需要的行为:我们想要将传入的 future 与持续时间进行竞速。我们可以使用 trpl::sleep 从持续时间创建计时器 future,并使用 trpl::race 将该计时器与调用者传入的 future 一起运行。

我们还知道 race 是不公平的,并且按照传递的顺序轮询参数。因此,我们首先将 future_to_try 传递给 race,以便即使 max_time 是非常短的持续时间,它也有机会完成。如果 future_to_try 首先完成,race 将返回 Left,其中包含 future 的输出。如果 timer 首先完成,race 将返回 Right,其中包含计时器的输出 ()

在 Listing 17-29 中,我们匹配等待 trpl::race 的结果。如果 future_to_try 成功并且我们得到 Left(output),则返回 Ok(output)。如果休眠计时器反而经过并且我们得到 Right(()),则忽略带有 _() 并返回 Err(max_time)

文件名: src/main.rs
extern crate trpl; // required for mdbook test

use std::{future::Future, time::Duration};

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}
Listing 17-29: 使用 racesleep 定义 timeout

这样,我们就有了一个可工作的 timeout,它由另外两个 async 助手构建而成。如果我们运行我们的代码,它将在超时后打印失败模式

Failed after 2 seconds

由于 future 可以与其他 future 组合,因此您可以使用较小的 async 构建块来构建非常强大的工具。例如,您可以使用相同的方法将超时与重试结合起来,然后将它们与网络调用之类的东西一起使用——这是本章开头的示例之一!

在实践中,您通常会直接使用 async.await,其次是使用 joinjoin_allrace 等函数和宏。您只需要偶尔使用 pin 才能将它们与这些 API 一起使用。

我们现在已经看到了许多同时处理多个 future 的方法。接下来,我们将了解如何在一段时间内按顺序处理多个 future,使用 stream。以下是您可能想要首先考虑的更多事项

  • 我们使用带有 join_allVec 来等待某个组中的所有 future 完成。您如何使用 Vec 来按顺序处理一组 future 呢?这样做有什么权衡?

  • 查看 futures crate 中的 futures::stream::FuturesUnordered 类型。使用它与使用 Vec 有什么不同?(不要担心它来自 crate 的 stream 部分;它可以与任何 future 集合完美配合。)