流
到目前为止,在本章中,我们主要坚持使用单个 future。一个大的例外是我们使用的异步通道。回想一下我们在本章早些时候的 “消息传递” 中如何使用异步通道的接收器。异步 recv
方法会随着时间的推移生成一系列项。这是一个更通用模式的实例,通常称为流。
当我们查看第 13 章中的 Iterator
trait 时,我们之前已经见过项目序列,但是迭代器和异步通道接收器之间有两个区别。第一个区别是时间元素:迭代器是同步的,而通道接收器是异步的。第二个区别是 API。当直接使用 Iterator
时,我们调用其同步 next
方法。对于 trpl::Receiver
,我们改为调用异步 recv
方法,但这些 API 在其他方面感觉非常相似。
这种相似性并非巧合。流就像迭代的异步形式。虽然 trpl::Receiver
专门等待接收消息,但通用的流 API 需要更加通用:它将像 Iterator
一样提供下一个项,但以异步方式进行。实际上,这大致是它在 Rust 中的工作方式,因此我们实际上可以从任何迭代器创建流。与迭代器一样,我们可以通过调用流的 next
方法,然后等待输出,来使用流,如清单 17-30 所示。
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
我们从一个数字数组开始,将其转换为迭代器,然后调用 map
将所有值加倍。然后,我们使用 trpl::stream_from_iter
函数将迭代器转换为流。然后我们使用 while let
循环遍历流中的项,当它们到达时
不幸的是,当我们尝试运行代码时,它无法编译。相反,正如我们在输出中看到的那样,它报告没有可用的 next
方法。
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:8:40
|
8 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to '/Users/chris/dev/rust-lang/book/listings/ch17-async-await/listing-17-30/target/debug/deps/async_await-bbd5bb8f6851cb5f.long-type-18426562901668632191.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
1 + use trpl::StreamExt;
|
help: there is a method `try_next` with a similar name
|
8 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
For more information about this error, try `rustc --explain E0599`.
正如输出所建议的那样,问题是我们需要在作用域中使用正确的 trait 才能使用 next
方法。鉴于我们目前的讨论,您可能会合理地期望它是 Stream
,但我们这里需要的 trait 实际上是 StreamExt
。这里的 Ext
代表 “extension”(扩展):这是 Rust 社区中用于使用另一个 trait 扩展一个 trait 的常见模式。
您可能想知道为什么是 StreamExt
而不是 Stream
,以及是否根本存在 Stream
类型。简而言之,答案是在整个 Rust 生态系统中,Stream
trait 定义了一个底层接口,该接口有效地结合了 Iterator
和 Future
trait。StreamExt
trait 在 Stream
之上提供了一组更高级别的 API,包括 next
方法以及许多其他实用方法,例如来自 Iterator
的方法。我们将在本章末尾更详细地回到 Stream
和 StreamExt
trait。目前,这足以让我们继续前进。
我们在这里需要做的就是为 trpl::StreamExt
添加一个 use
语句,如清单 17-31 所示。
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
将所有这些部分放在一起后,事情就按照我们想要的方式工作了!更重要的是,既然我们在作用域中有了 StreamExt
,我们就可以像使用迭代器一样使用它的所有实用方法。例如,在清单 17-32 中,我们使用 filter
方法来过滤掉除三和五的倍数之外的所有内容。
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
StreamExt::filter
方法过滤 Stream
当然,这不是很令人兴奋。我们可以使用普通的迭代器来完成,而根本不需要任何异步。因此,让我们看看我们可以做的一些其他事情,这些事情是流独有的。
组合流
许多事物自然地表示为流:队列中可用的项,或者通过一次仅从文件系统中拉取块来处理比计算机内存更多的的数据,或者通过网络随时间到达的数据。由于流是 future,我们也可以将它们与任何其他类型的 future 一起使用,并且我们可以以有趣的方式组合它们。例如,我们可以批量处理事件以避免触发过多的网络调用,在长时间运行的操作序列上设置超时,或者限制用户界面事件以避免执行不必要的工作。
让我们从构建一个小的消息流开始,类似于我们可能从 WebSocket 或其他实时通信协议中看到的那样。在清单 17-33 中,我们创建了一个函数 get_messages
,它返回 impl Stream<Item = String>
。为了实现它,我们创建了一个异步通道,遍历英文字母表的前十个字母,并将它们发送到通道中。
我们还使用了一种新类型:ReceiverStream
,它将 trpl::channel
中的 rx
接收器转换为具有 next
方法的 Stream
。回到 main
中,我们使用 while let
循环来打印来自流的所有消息。
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
rx
接收器用作 ReceiverStream
当我们运行此代码时,我们得到了完全符合预期的结果
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
虽然我们可以使用常规的 Receiver
API,甚至常规的 Iterator
API 来做到这一点。让我们添加一些需要流的东西,例如添加一个超时,该超时应用于流中的每个项,以及对我们发出的项的延迟。
在清单 17-34 中,我们首先使用 timeout
方法向流添加超时,该方法来自 StreamExt
trait。然后我们更新 while let
循环的主体,因为流现在返回 Result
。Ok
变体表示消息及时到达;Err
变体表示在任何消息到达之前超时已过。我们对该结果进行 match
,并在我们成功接收到消息时打印该消息,或者打印关于超时的通知。最后,请注意,我们在对消息应用超时后将其固定,因为超时助手会生成一个 future,该 future 需要固定才能被轮询。
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
StreamExt::timeout
方法在流中的项上设置时间限制但是,由于消息之间没有延迟,因此此超时不会更改程序的行为。让我们向我们发送的消息添加可变延迟。在 get_messages
中,我们将 enumerate
迭代器方法与 messages
数组一起使用,以便我们可以获取我们正在发送的每个项的索引以及项本身。然后,我们对偶数索引项应用 100 毫秒的延迟,对奇数索引项应用 300 毫秒的延迟,以模拟我们在现实世界中可能从消息流中看到的不同延迟。由于我们的超时为 200 毫秒,因此这应该会影响一半的消息。
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
tx
发送消息,并使用异步延迟,而无需使 get_messages
成为异步函数为了在 get_messages
函数中的消息之间休眠而不阻塞,我们需要使用异步。但是,我们不能将 get_messages
本身变成异步函数,因为那样我们会返回 Future<Output = Stream<Item = String>>
而不仅仅是 Stream<Item = String>>
。调用者将不得不等待 get_messages
本身才能访问流。但请记住:给定 future 中的一切都是线性发生的;并发发生在 future 之间。等待 get_messages
将需要它发送所有消息,包括在发送每条消息之间休眠,然后再返回接收器流。结果,超时最终将毫无用处。流本身不会有延迟:延迟将全部发生在流可用之前。
相反,我们将 get_messages
保留为一个返回流的常规函数,并生成一个任务来处理异步 sleep
调用。
注意:像这样调用 spawn_task
可以工作,因为我们已经设置了运行时。在没有首先设置运行时的情况下调用 spawn_task
的这种特定实现将导致 panic。其他实现选择了不同的权衡:它们可能会生成一个新的运行时,从而避免 panic,但最终会增加一些额外的开销,或者只是不提供在不引用运行时的情况下生成任务的独立方法。您应该确保您知道您的运行时选择了什么权衡,并相应地编写代码!
现在我们的代码有了更有趣的结果!在每隔一对消息之间,我们都会看到报告的错误:Problem: Elapsed(())
。
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
超时不会阻止消息最终到达——我们仍然会收到所有原始消息。这是因为我们的通道是无界的:它可以容纳我们内存中可以容纳的尽可能多的消息。如果消息在超时之前未到达,我们的流处理程序将对此进行说明,但是当它再次轮询流时,消息现在可能已经到达。
如果需要,您可以通过使用其他类型的通道或更通用的其他类型的流来获得不同的行为。让我们在本节的最后一个示例中实际看一下其中一个,方法是将时间间隔流与此消息流结合起来。
合并流
首先,让我们创建另一个流,如果我们让它直接运行,它将每毫秒发出一个项。为简单起见,我们可以使用 sleep
函数在延迟后发送消息,并将其与我们在 get_messages
中使用的从通道创建流的相同方法结合起来。不同之处在于,这次我们将返回已过去的时间间隔计数,因此返回类型将为 impl Stream<Item = u32>
,我们可以调用函数 get_intervals
。
在清单 17-36 中,我们首先在任务中定义一个 count
。(我们也可以在任务外部定义它,但是限制任何给定变量的作用域更清晰。)然后我们创建一个无限循环。循环的每次迭代都异步休眠一毫秒,递增计数,然后通过通道发送它。由于这一切都包装在 spawn_task
创建的任务中,因此所有这些都将与运行时一起清理,包括无限循环。
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
这种无限循环,只有在整个运行时被拆除时才会结束,在异步 Rust 中非常常见:许多程序需要无限期地运行。使用异步,这不会阻塞任何其他内容,只要在循环的每次迭代中至少有一个 await 点即可。
回到我们 main 函数的异步块中,我们首先调用 get_intervals
。然后我们使用 merge
方法合并 messages
和 intervals
流。最后,我们遍历组合流而不是 messages
(清单 17-37)。
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
此时,messages
和 intervals
都不需要固定或可变,因为两者都将组合成单个 merged
流。但是,对 merge
的此调用无法编译!(while let
循环中的 next
调用也无法编译,但我们将在修复此问题后返回到该调用。)这两个流具有不同的类型。messages
流的类型为 Timeout<impl Stream<Item = String>>
,其中 Timeout
是为 timeout
调用实现 Stream
的类型。同时,intervals
流的类型为 impl Stream<Item = u32>
。要合并这两个流,我们需要转换其中一个流以匹配另一个流。
在清单 17-38 中,我们使用 intervals
流重新工作,因为 messages
已经处于我们想要的基本格式,并且必须处理超时错误。首先,我们可以使用 map
辅助方法将 intervals
转换为字符串。其次,我们需要匹配来自 messages
的 Timeout
。虽然我们实际上不希望 intervals
超时,但我们可以创建一个比我们正在使用的其他持续时间更长的超时。在这里,我们使用 Duration::from_secs(10)
创建一个 10 秒超时。最后,我们需要使 stream
可变,以便 while let
循环的 next
调用可以遍历流,并将其固定以便安全地执行此操作。
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
intervals
流的类型与 messages
流的类型对齐这使我们几乎达到了我们需要达到的目标。一切类型检查都通过了。但是,如果您运行此代码,则会出现两个问题。首先,它永远不会停止!您需要使用 ctrl-c 停止它。其次,来自英文字母表的消息将被埋没在所有间隔计数器消息中
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
清单 17-39 显示了一种解决最后两个问题的方法。首先,我们在 intervals
流上使用 throttle
方法,以便它不会淹没 messages
流。节流是一种限制函数被调用速率的方法——或者,在这种情况下,流被轮询的频率。每百毫秒一次应该可以,因为这与我们的消息到达的频率大致相同。
为了限制我们将从流中接受的项数,我们可以使用 take
方法。我们将其应用于合并的流,因为我们想要限制最终输出,而不仅仅是一个流或另一个流。
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
throttle
和 take
管理合并的流现在,当我们运行程序时,它会在从流中拉取二十个项后停止,并且间隔不会淹没消息。我们也没有得到 Interval: 100
或 Interval: 200
等等,而是只得到 Interval: 1
、Interval: 2
等等——即使我们有一个源流可以每毫秒生成一个事件。这是因为 throttle
调用会生成一个新的流,包装原始流,以便原始流仅以节流速率而不是其自身的“本机”速率被轮询。我们没有一堆未处理的间隔消息,我们只是选择忽略它们。相反,我们从一开始就没有生成这些间隔消息!这是 Rust 的 future 固有的“惰性”再次发挥作用,使我们能够选择我们的性能特征。
Interval #1
Message: 'a'
Interval #2
Interval #3
Problem: Elapsed(())
Interval #4
Message: 'b'
Interval #5
Message: 'c'
Interval #6
Interval #7
Problem: Elapsed(())
Interval #8
Message: 'd'
Interval #9
Message: 'e'
Interval #10
Interval #11
Problem: Elapsed(())
Interval #12
还有最后一件事我们需要处理:错误!对于这两个基于通道的流,当通道的另一端关闭时,send
调用可能会失败——这只是运行时如何执行构成流的 future 的问题。到目前为止,我们通过调用 unwrap
忽略了这一点,但在一个行为良好的应用程序中,我们应该显式地处理错误,至少通过结束循环,这样我们就不会尝试发送更多消息!清单 17-40 显示了一个简单的错误策略:打印问题,然后 break
跳出循环。与往常一样,处理消息发送错误的正确方法会有所不同——只需确保您有一个策略。
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
这是一个很好的提示,可以转向我们的最后一节,并通过讨论 future(包括流)、任务和线程如何相互关联以及如何一起使用它们来结束对 Rust 中异步的介绍。