Future、任务和线程
正如我们在前一章看到的,线程提供了一种并发方法。在本章中,我们看到了另一种并发方法,即使用 async 以及 future 和 stream。你可能想知道为什么你要选择其中一个。答案是:视情况而定!在许多情况下,不是线程*或* async,而是线程*和* async。
许多操作系统几十年来都提供了基于线程的并发模型,许多编程语言也因此支持它们。然而,它们并非没有缺点。在许多操作系统上,每个线程都会占用相当多的内存,并且启动和关闭它们会带来一些开销。只有当你的操作系统和硬件支持线程时,线程才是一种选择!与主流桌面和移动计算机不同,一些嵌入式系统根本没有操作系统,因此它们也没有线程!
async 模型提供了一组不同的——最终是互补的——权衡。在 async 模型中,并发操作不需要自己的线程。相反,它们可以在任务上运行,就像我们在整个 stream 章节中使用 trpl::spawn_task
从同步函数启动工作一样。任务很像线程——但它不是由操作系统管理的,而是由库级别的代码(运行时)管理的。
在上一节中,我们看到可以通过使用 async channel 并生成一个可以从同步代码调用的 async 任务来构建 Stream
。我们可以用线程做完全相同的事情!在 Listing 17-40 中,我们使用了 trpl::spawn_task
和 trpl::sleep
。在 Listing 17-41 中,我们在 get_intervals
函数中用标准库中的 thread::spawn
和 thread::sleep
API 替换了它们。
extern crate trpl; // required for mdbook test use std::{pin::pin, thread, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); // This is *not* `trpl::spawn` but `std::thread::spawn`! thread::spawn(move || { let mut count = 0; loop { // Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`! thread::sleep(Duration::from_millis(1)); count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
get_intervals
函数使用 std::thread
API 而不是 async trpl
API如果你运行它,输出是相同的。请注意,从调用代码的角度来看,这里几乎没有变化!更重要的是,即使我们的一个函数在运行时生成了一个 async 任务,而另一个函数生成了一个操作系统线程,结果 stream 并没有受到这些差异的影响。
然而,这两种方法的行为之间存在显着差异,尽管在这个非常简单的示例中我们可能很难衡量它。我们可以在任何现代个人计算机上生成数十万甚至数百万个 async 任务。如果我们尝试用线程来做这件事,我们真的会耗尽内存!
然而,这些 API 如此相似是有原因的。线程充当同步操作集的边界;并发性在线程*之间*是可能的。任务充当*异步*操作集的边界;并发性在任务*之间*和*内部*都是可能的。在这方面,任务有点像轻量级的、运行时管理的线程,它们具有额外的功能,这些功能来自于由运行时而不是操作系统管理。Future 是更细粒度的并发单元,其中每个 future 可能代表其他 future 的树。也就是说,运行时——特别是它的执行器——管理任务,而任务管理 future。
然而,这并不意味着 async 任务总是比线程更好,就像线程总是比任务更好一样。
一方面,线程并发在某些方面比 async
并发更简单的编程模型。线程有点“发射后不管”,它们没有 future 的原生等价物,因此它们只是运行到完成,除非操作系统本身中断,否则不会中断。也就是说,它们没有像 future 那样的*任务内并发*。Rust 中的线程也没有取消机制——我们没有在本章中深入讨论这个主题,但这隐含在我们每次结束 future 时,它的状态都会被正确清理的事实中。
这些限制使得线程比 future 更难组合。例如,构建像我们在 “构建我们自己的 Async 抽象” 中构建的 timeout
或我们在 “组合 Stream” 中与 stream 一起使用的 throttle
方法要困难得多。Future 是更丰富的数据结构这一事实意味着它们*可以*更自然地组合在一起,正如我们所见。
然后,任务提供了对 future 的*额外*控制,允许你选择在何处以及如何对它们进行分组。事实证明,线程和任务通常可以很好地协同工作,因为任务可以(至少在某些运行时中)在线程之间移动。我们直到现在还没有提到,但实际上,我们一直在使用的 Runtime
,包括 spawn_blocking
和 spawn_task
函数,默认情况下是多线程的!许多运行时使用一种称为*工作窃取*的方法,根据线程的当前利用率,在线程之间透明地移动任务,目的是提高系统的整体性能。要构建它,实际上需要线程*和*任务,以及 future。
作为思考何时使用哪种方法的默认方式
- 如果任务是*非常可并行化的*,例如处理大量数据,其中每个部分都可以单独处理,那么线程是更好的选择。
- 如果任务是*非常并发的*,例如处理来自大量不同来源的消息,这些消息可能以不同的间隔或不同的速率到达,那么 async 是更好的选择。
如果你需要并行性和并发性的某种组合,你不必在线程和 async 之间做出选择。你可以自由地将它们一起使用,让每个都服务于它最擅长的部分。例如,Listing 17-TODO 展示了现实世界 Rust 代码中这种混合的一个相当常见的例子。
extern crate trpl; // for mdbook test // ANCHOR: all use std::{thread, time::Duration}; fn main() { let (tx, mut rx) = trpl::channel(); thread::spawn(move || { for i in 1..11 { tx.send(i).unwrap(); thread::sleep(Duration::from_secs(1)); } }); trpl::run(async { while let Some(message) = rx.recv().await { println!("{message}"); } }); } // ANCHOR_END: all
我们首先创建一个 async channel。然后我们生成一个线程,该线程拥有 channel 发送端的所有权。在线程中,我们发送数字 1 到 10,并在每个数字之间休眠一秒钟。最后,我们运行一个使用传递给 trpl::run
的 async 块创建的 future,就像我们在整章中所做的那样。在该 future 中,我们等待这些消息,就像我们在其他消息传递示例中看到的那样。
回到我们在本章开头举的例子:你可以想象使用专用线程运行一组视频编码任务,因为视频编码是计算密集型的,但使用 async channel 通知 UI 这些操作已完成。这种混合的例子比比皆是!
总结
这不是你在这本书中最后一次看到并发:第 21 章中的项目将在比这里讨论的较小示例更真实的情况下使用本章中的概念——并更直接地比较使用线程与使用任务和 future 解决这类问题的方式。
无论是使用线程、future 和任务,还是将它们全部结合使用,Rust 都为你提供了编写安全、快速、并发代码所需的工具——无论是用于高吞吐量 Web 服务器还是嵌入式操作系统。
接下来,我们将讨论随着你的 Rust 程序变得更大,对问题进行建模和构建解决方案的惯用方法。此外,我们将讨论 Rust 的惯用语与你可能熟悉的面向对象编程的惯用语有何关系。