将单线程服务器转换为多线程服务器
目前,服务器将依次处理每个请求,这意味着它在第一个连接完成处理之前不会处理第二个连接。如果服务器收到越来越多的请求,这种串行执行将变得越来越不理想。如果服务器收到一个需要很长时间才能处理的请求,则后续请求将不得不等待,直到长时间请求完成,即使新请求可以快速处理。我们需要解决这个问题,但首先,我们将看看实际运行中的问题。
在当前服务器实现中模拟慢速请求
我们将看看慢速处理请求如何影响对当前服务器实现的其他请求。列表 20-10 实现了处理对 /sleep 的请求,并模拟了慢速响应,这将导致服务器在响应前休眠 5 秒。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
列表 20-10:通过休眠 5 秒来模拟慢速请求
现在我们有三个案例,所以我们从 if
切换到 match
。我们需要显式地匹配 request_line
的切片,以针对字符串字面值进行模式匹配;match
不像相等方法那样进行自动引用和解引用。
第一个分支与列表 20-9 中的 if
代码块相同。第二个分支匹配对 /sleep 的请求。当收到该请求时,服务器将休眠 5 秒,然后再呈现成功的 HTML 页面。第三个分支与列表 20-9 中的 else
代码块相同。
你可以看到我们的服务器有多么原始:真正的库会以一种远没有那么冗长的方式处理多个请求的识别!
使用 cargo run
启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果您像以前一样多次输入 / URI,您会看到它响应迅速。但是,如果您输入 /sleep,然后加载 /,您会看到 / 会等待 sleep
休眠满 5 秒后才加载。
我们可以使用多种技术来避免请求在慢速请求后面积压;我们将实现的技术是线程池。
通过线程池提高吞吐量
线程池 是一组已生成并准备好处理任务的线程。当程序收到新任务时,它会将池中的一个线程分配给该任务,并且该线程将处理该任务。池中剩余的线程可用于处理在第一个线程处理时收到的任何其他任务。当第一个线程完成处理其任务时,它将返回到空闲线程池,准备处理新任务。线程池允许您并发处理连接,从而提高服务器的吞吐量。
我们将限制池中线程的数量为一个较小的数字,以保护我们免受拒绝服务 (DoS) 攻击;如果我们让程序为每个传入的请求创建一个新线程,那么有人向我们的服务器发出 1000 万个请求可能会通过耗尽我们服务器的所有资源并使请求处理陷入停顿来制造混乱。
因此,我们将拥有固定数量的线程在池中等待,而不是生成无限数量的线程。传入的请求被发送到池中进行处理。池将维护一个传入请求队列。池中的每个线程将从该队列中弹出一个请求,处理该请求,然后向队列询问另一个请求。通过这种设计,我们可以并发处理最多 N
个请求,其中 N
是线程数。如果每个线程都在响应一个长时间运行的请求,则后续请求仍然可以在队列中积压,但是我们增加了在达到该点之前可以处理的长时间运行的请求的数量。
这种技术只是提高 Web 服务器吞吐量的众多方法之一。您可能探索的其他选项包括 fork/join 模型、单线程异步 I/O 模型 或 多线程异步 I/O 模型。如果您对这个主题感兴趣,您可以阅读有关其他解决方案的更多信息并尝试实现它们;使用像 Rust 这样的低级语言,所有这些选项都是可能的。
在我们开始实现线程池之前,让我们讨论一下使用该池应该是什么样的。当您尝试设计代码时,首先编写客户端接口可以帮助指导您的设计。编写代码的 API,使其结构符合您想要调用它的方式;然后在该结构中实现功能,而不是先实现功能,然后再设计公共 API。
类似于我们在第 12 章的项目中使用的测试驱动开发,我们将在这里使用编译器驱动开发。我们将编写调用我们想要的函数的代码,然后我们将查看来自编译器的错误,以确定接下来应该更改什么以使代码工作。但是,在此之前,我们将探索我们不打算用作起点的技术。
为每个请求生成一个线程
首先,让我们探索一下,如果我们的代码确实为每个连接创建了一个新线程,它会是什么样子。正如前面提到的,由于可能生成无限数量线程的问题,这不是我们的最终计划,但这首先是一个获得可工作的多线程服务器的起点。然后,我们将添加线程池作为改进,并且对比这两个解决方案将更容易。列表 20-11 显示了对 main
进行的更改,以在 for
循环中为每个流生成一个新线程。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
列表 20-11:为每个流生成一个新线程
正如你在第 16 章中学到的,thread::spawn
将创建一个新线程,然后在新线程中的闭包中运行代码。如果您运行此代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /,您确实会看到对 / 的请求不必等待 /sleep 完成。但是,正如我们提到的,这最终将使系统不堪重负,因为您将创建新线程而没有任何限制。
创建有限数量的线程
我们希望我们的线程池以类似、熟悉的方式工作,以便从线程切换到线程池不需要对使用我们 API 的代码进行大的更改。列表 20-12 显示了我们想要使用的 ThreadPool
结构体的假设接口,而不是 thread::spawn
。
文件名:src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
列表 20-12:我们理想的 ThreadPool
接口
我们使用 ThreadPool::new
创建一个新的线程池,其中包含可配置数量的线程,在本例中为四个。然后,在 for
循环中,pool.execute
具有与 thread::spawn
类似的接口,因为它接受一个闭包,池应该为每个流运行该闭包。我们需要实现 pool.execute
,以便它接受闭包并将其提供给池中的一个线程来运行。此代码尚无法编译,但我们将尝试编译,以便编译器可以指导我们如何修复它。
使用编译器驱动开发构建 ThreadPool
将列表 20-12 中的更改应用到 src/main.rs,然后让我们使用来自 cargo check
的编译器错误来驱动我们的开发。这是我们得到的第一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
太棒了!此错误告诉我们需要 ThreadPool
类型或模块,因此我们现在将构建一个。我们的 ThreadPool
实现将独立于我们的 Web 服务器正在执行的工作类型。因此,让我们将 hello
crate 从二进制 crate 切换到库 crate,以保存我们的 ThreadPool
实现。在我们更改为库 crate 后,我们还可以将单独的线程池库用于我们想要使用线程池执行的任何工作,而不仅仅是用于服务 Web 请求。
创建一个 src/lib.rs,其中包含以下内容,这是我们现在可以拥有的 ThreadPool
结构体的最简单定义
文件名:src/lib.rs
pub struct ThreadPool;
然后编辑 main.rs 文件,通过将以下代码添加到 src/main.rs 的顶部,将 ThreadPool
从库 crate 引入作用域
文件名:src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
此代码仍然无法工作,但让我们再次检查它以获取我们需要解决的下一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
此错误指示接下来我们需要为 ThreadPool
创建一个名为 new
的关联函数。我们也知道 new
需要有一个参数,该参数可以接受 4
作为参数,并且应该返回一个 ThreadPool
实例。让我们实现最简单的 new
函数,该函数将具有这些特征
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我们选择 usize
作为 size
参数的类型,因为我们知道负数个线程没有任何意义。我们也知道我们将使用此 4 作为线程集合中元素的数量,这正是 usize
类型的用途,如 “整数类型” 中讨论的那样第 3 章的节。
让我们再次检查代码
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
现在出现错误是因为我们在 ThreadPool
上没有 execute
方法。回想一下 “创建有限数量的线程”节,我们决定我们的线程池应该具有类似于 thread::spawn
的接口。此外,我们将实现 execute
函数,以便它接受给定的闭包,并将其提供给池中的空闲线程以运行。
我们将在 ThreadPool
上定义 execute
方法以将闭包作为参数。回想一下 “将捕获的值移出闭包和 Fn
trait”第 13 章中的节,我们可以使用三种不同的 trait 将闭包作为参数:Fn
、FnMut
和 FnOnce
。我们需要决定在此处使用哪种闭包。我们知道我们最终会做一些类似于标准库 thread::spawn
实现的事情,因此我们可以查看 thread::spawn
的签名对其参数的界限。文档向我们展示了以下内容
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F
类型参数是我们在这里关心的;T
类型参数与返回值有关,我们不关心它。我们可以看到 spawn
使用 FnOnce
作为 F
的 trait 界限。这可能也是我们想要的,因为我们最终会将我们在 execute
中获得的参数传递给 spawn
。我们可以进一步确信 FnOnce
是我们想要使用的 trait,因为用于运行请求的线程将只执行该请求的闭包一次,这与 FnOnce
中的 Once
匹配。
F
类型参数还具有 trait 界限 Send
和生命周期界限 'static
,这在我们的情况下很有用:我们需要 Send
将闭包从一个线程传输到另一个线程,并且需要 'static
,因为我们不知道线程将执行多长时间。让我们在 ThreadPool
上创建一个 execute
方法,该方法将采用具有这些界限的 F
类型的泛型参数
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们在 FnOnce
之后仍然使用 ()
,因为此 FnOnce
表示一个不带参数并返回单元类型 ()
的闭包。就像函数定义一样,返回类型可以从签名中省略,但即使我们没有参数,我们仍然需要括号。
同样,这是 execute
方法的最简单实现:它什么也不做,但我们只是尝试使我们的代码编译。让我们再次检查它
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
它编译了!但请注意,如果您尝试 cargo run
并在浏览器中发出请求,您将在浏览器中看到我们在本章开头看到的错误。我们的库实际上尚未调用传递给 execute
的闭包!
注意:您可能会听到关于具有严格编译器的语言(例如 Haskell 和 Rust)的一句话是“如果代码编译,它就可以工作。” 但这句话并非普遍正确。我们的项目编译了,但它绝对什么也没做!如果我们正在构建一个真正的、完整的项目,那么现在是开始编写单元测试以检查代码是否编译 并且 具有我们想要的行为的好时机。
验证 new
中的线程数
我们没有对 new
和 execute
的参数做任何事情。让我们用我们想要的行为来实现这些函数的主体。首先,让我们考虑一下 new
。早些时候,我们为 size
参数选择了一个无符号类型,因为具有负数个线程的池没有任何意义。但是,具有零个线程的池也没有任何意义,但零是一个完全有效的 usize
。我们将添加代码来检查 size
是否大于零,然后再返回 ThreadPool
实例,并且如果它收到零,则使用 assert!
宏使程序 panic,如列表 20-13 所示。
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
列表 20-13:实现 ThreadPool::new
以在 size
为零时 panic
我们还为我们的 ThreadPool
添加了一些文档,其中包含文档注释。请注意,我们遵循了良好的文档实践,添加了一个部分,其中指出了我们的函数可能 panic 的情况,如第 14 章中所讨论的。尝试运行 cargo doc --open
并单击 ThreadPool
结构体以查看为 new
生成的文档是什么样的!
我们可以将 new
更改为 build
并返回一个 Result
,就像我们在列表 12-9 的 I/O 项目中使用 Config::build
所做的那样,而不是像我们在此处所做的那样添加 assert!
宏。但我们在此案例中已决定,尝试创建不带任何线程的线程池应该是一个不可恢复的错误。如果您感到雄心勃勃,请尝试编写一个名为 build
的函数,其签名如下,以便与 new
函数进行比较
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建空间来存储线程
现在我们有一种方法知道我们有有效的线程数可以存储在池中,我们可以在返回结构体之前创建这些线程并将它们存储在 ThreadPool
结构体中。但是我们如何“存储”一个线程呢?让我们再次查看 thread::spawn
签名
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
函数返回一个 JoinHandle<T>
,其中 T
是闭包返回的类型。让我们也尝试使用 JoinHandle
,看看会发生什么。在我们的案例中,我们传递给线程池的闭包将处理连接并且不返回任何内容,因此 T
将是单元类型 ()
。
列表 20-14 中的代码将编译,但尚未创建任何线程。我们已更改 ThreadPool
的定义以保存 thread::JoinHandle<()>
实例的 vector,使用 size
的容量初始化 vector,设置一个 for
循环,该循环将运行一些代码来创建线程,并返回包含它们的 ThreadPool
实例。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
列表 20-14:创建一个 vector,供 ThreadPool
保存线程
我们在库 crate 中引入了 std::thread
作用域,因为我们使用 thread::JoinHandle
作为 ThreadPool
中 vector 中项的类型。
一旦收到有效大小,我们的 ThreadPool
就会创建一个新的 vector,该 vector 可以容纳 size
个项。with_capacity
函数执行与 Vec::new
相同的任务,但有一个重要的区别:它预先分配 vector 中的空间。因为我们知道我们需要在 vector 中存储 size
个元素,所以预先进行此分配比使用 Vec::new
稍微有效率,后者会在插入元素时调整自身大小。
当您再次运行 cargo check
时,它应该会成功。
负责将代码从 ThreadPool
发送到线程的 Worker
结构体
我们在列表 20-14 的 for
循环中留下了一条关于创建线程的注释。在这里,我们将看看我们实际上是如何创建线程的。标准库提供了 thread::spawn
作为创建线程的方法,并且 thread::spawn
希望在线程创建后立即获得一些线程应该运行的代码。但是,在我们的案例中,我们想要创建线程并让它们 等待 我们稍后发送的代码。标准库的线程实现不包含任何执行此操作的方法;我们必须手动实现它。
我们将通过在 ThreadPool
和线程之间引入一个新的数据结构来实现此行为,该数据结构将管理此新行为。我们将此数据结构称为 Worker,这是池实现中的常用术语。Worker 拾取需要运行的代码并在 Worker 的线程中运行代码。想想餐厅厨房里工作的人:工人们等待顾客的订单进来,然后他们负责接受这些订单并完成它们。
我们将存储 Worker
结构体的实例而不是在线程池中存储 JoinHandle<()>
实例的 vector。每个 Worker
将存储一个 JoinHandle<()>
实例。然后我们将在 Worker
上实现一个方法,该方法将接受要运行的代码闭包并将其发送到已运行的线程以执行。我们还将为每个 worker 提供一个 id
,以便我们在日志记录或调试时可以区分池中不同的 worker。
这是当我们创建 ThreadPool
时将发生的新过程。在以这种方式设置 Worker
后,我们将实现将闭包发送到线程的代码
- 定义一个
Worker
结构体,该结构体包含一个id
和一个JoinHandle<()>
。 - 更改
ThreadPool
以保存Worker
实例的 vector。 - 定义一个
Worker::new
函数,该函数接受一个id
号并返回一个Worker
实例,该实例包含id
和使用空闭包生成的线程。 - 在
ThreadPool::new
中,使用for
循环计数器生成一个id
,使用该id
创建一个新的Worker
,并将 worker 存储在 vector 中。
如果您准备好迎接挑战,请在查看列表 20-15 中的代码之前,尝试自行实现这些更改。
准备好了吗?以下是列表 20-15,其中包含一种进行上述修改的方法。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
列表 20-15:修改 ThreadPool
以保存 Worker
实例,而不是直接保存线程
我们已将 ThreadPool
上字段的名称从 threads
更改为 workers
,因为它现在保存 Worker
实例而不是 JoinHandle<()>
实例。我们使用 for
循环中的计数器作为 Worker::new
的参数,并将每个新的 Worker
存储在名为 workers
的 vector 中。
外部代码(例如 src/main.rs 中的服务器)不需要知道有关在 ThreadPool
中使用 Worker
结构体的实现细节,因此我们将 Worker
结构体及其 new
函数设为私有。Worker::new
函数使用我们给它的 id
,并存储一个 JoinHandle<()>
实例,该实例是通过使用空闭包生成新线程创建的。
注意:如果操作系统由于系统资源不足而无法创建线程,则 thread::spawn
将 panic。这将导致我们的整个服务器 panic,即使某些线程的创建可能成功。为了简单起见,此行为是可以接受的,但在生产线程池实现中,您可能希望使用 std::thread::Builder
及其 spawn
方法,该方法返回 Result
。
此代码将编译,并将存储我们指定为 ThreadPool::new
参数的 Worker
实例数。但是我们 仍然 没有处理我们在 execute
中获得的闭包。让我们看看接下来如何做到这一点。
通过通道将请求发送到线程
我们将要解决的下一个问题是,提供给 thread::spawn
的闭包绝对没有任何作用。目前,我们获取要在 execute
方法中执行的闭包。但是我们需要在创建 ThreadPool
期间给 thread::spawn
一个闭包,以便在创建每个 Worker
时运行。
我们希望我们刚刚创建的每个 Worker
结构体都从 ThreadPool
中保存的队列中获取要运行的代码,并将该代码发送到 worker 的线程以运行。
我们在第 16 章中了解到的通道(一种在两个线程之间进行通信的简单方法)非常适合此用例。我们将使用通道作为作业队列,并且 execute
将作业从 ThreadPool
发送到 Worker
实例,然后这些实例将作业发送到其线程。这是计划
ThreadPool
将创建一个通道并持有发送者。- 每个
Worker
将持有接收者。 - 我们将创建一个新的
Job
结构体,该结构体将保存我们要通过通道发送的闭包。 execute
方法将通过发送者发送它想要执行的作业。- 在其线程中,
Worker
将循环遍历其接收者并执行它接收到的任何作业的闭包。
让我们首先在 ThreadPool::new
中创建一个通道,并将发送者保存在 ThreadPool
实例中,如列表 20-16 所示。Job
结构体暂时不保存任何内容,但将成为我们通过通道发送的项的类型。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
列表 20-16:修改 ThreadPool
以存储传输 Job
实例的通道的发送者
在 ThreadPool::new
中,我们创建了新的通道,并让池持有发送者。这将成功编译。
让我们尝试将通道的接收者作为线程池创建 worker 时传递给每个 worker。我们知道我们想在 worker 生成的线程中使用接收者,因此我们将在闭包中引用 receiver
参数。列表 20-17 中的代码尚无法编译。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
列表 20-17:将接收者传递给 worker
我们做了一些小的、直接的更改:我们将接收者传递给 Worker::new
,然后在闭包内部使用它。
当我们尝试检查此代码时,我们收到此错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
代码试图将 receiver
传递给多个 Worker
实例。正如您在第 16 章中回忆起的那样,这将不起作用:Rust 提供的通道实现是多 生产者,单 消费者。这意味着我们不能只克隆通道的消费端来修复此代码。我们也不想多次向多个消费者发送消息;我们想要一个消息列表和多个 worker,以便每条消息都被处理一次。
此外,从通道队列中取出作业涉及改变 receiver
,因此线程需要一种安全的方式来共享和修改 receiver
;否则,我们可能会遇到竞争条件(如第 16 章中所述)。
回想一下第 16 章中讨论的线程安全智能指针:为了跨多个线程共享所有权并允许线程改变值,我们需要使用 Arc<Mutex<T>>
。Arc
类型将允许多个 worker 拥有接收者,而 Mutex
将确保一次只有一个 worker 从接收者获取作业。列表 20-18 显示了我们需要进行的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
列表 20-18:使用 Arc
和 Mutex
在 worker 之间共享接收者
在 ThreadPool::new
中,我们将接收者放入 Arc
和 Mutex
中。对于每个新的 worker,我们克隆 Arc
以增加引用计数,以便 worker 可以共享接收者的所有权。
经过这些更改,代码可以编译了!我们快成功了!
实现 execute
方法
让我们最终在 ThreadPool
上实现 execute
方法。我们还将把 Job
从结构体更改为 trait 对象的类型别名,该 trait 对象保存 execute
接收的闭包类型。正如在 “使用类型别名创建类型同义词” 中讨论的那样第 20 章的章节,类型别名允许我们为了易于使用而缩短长类型。请看示例 20-19。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
示例 20-19:为 Box
创建 Job
类型别名,该 Box
保存每个闭包,然后将作业发送到通道中
在使用我们在 execute
中获得的闭包创建新的 Job
实例后,我们将该作业发送到通道的发送端。我们对 send
调用 unwrap
是为了处理发送失败的情况。例如,如果我们停止所有线程执行,这意味着接收端已停止接收新消息,则可能会发生这种情况。目前,我们无法停止线程执行:只要线程池存在,我们的线程就会继续执行。我们使用 unwrap
的原因是,我们知道不会发生失败情况,但编译器不知道这一点。
但是,我们还没有完全完成!在 worker 中,传递给 thread::spawn
的闭包仍然只 *引用* 通道的接收端。相反,我们需要闭包永远循环,向通道的接收端请求作业,并在获得作业时运行它。让我们对 Worker::new
进行示例 20-20 中所示的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
示例 20-20:在 worker 线程中接收和执行作业
在这里,我们首先在 receiver
上调用 lock
以获取互斥锁,然后我们调用 unwrap
以在任何错误时 panic。如果互斥锁处于 *poisoned* 状态,则获取锁可能会失败,如果其他线程在持有锁而不是释放锁时发生 panic,则可能会发生这种情况。在这种情况下,调用 unwrap
以使此线程 panic 是正确的操作。您可以随意将此 unwrap
更改为带有对您有意义的错误消息的 expect
。
如果我们获得了互斥锁,我们调用 recv
以从通道接收 Job
。最后的 unwrap
也解决了这里的任何错误,如果持有 sender 的线程已关闭,则可能会发生这种情况,类似于如果 receiver 关闭,send
方法如何返回 Err
。
对 recv
的调用会阻塞,因此如果没有作业,则当前线程将等待直到作业可用。 Mutex<T>
确保一次只有一个 Worker
线程尝试请求作业。
我们的线程池现在处于工作状态!运行 cargo run
并发出一些请求
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!我们现在有了一个异步执行连接的线程池。永远不会创建超过四个线程,因此如果服务器收到大量请求,我们的系统也不会过载。如果我们向 /sleep 发出请求,服务器将能够通过让另一个线程运行它们来处理其他请求。
注意:如果您同时在多个浏览器窗口中打开 /sleep,它们可能会以 5 秒的间隔一次加载一个。由于缓存原因,某些 Web 浏览器会按顺序执行同一请求的多个实例。此限制不是由我们的 Web 服务器引起的。
在学习了第 19 章中的 while let
循环之后,您可能想知道为什么我们没有像示例 20-21 中所示那样编写 worker 线程代码。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
示例 20-21:使用 while let
的 Worker::new
的替代实现
此代码可以编译和运行,但不会产生所需的线程行为:一个慢请求仍然会导致其他请求等待处理。原因有点微妙:Mutex
结构没有公共的 unlock
方法,因为锁的所有权基于 lock
方法返回的 LockResult<MutexGuard<T>>
中的 MutexGuard<T>
的生命周期。在编译时,借用检查器可以强制执行以下规则:除非我们持有锁,否则无法访问受 Mutex
保护的资源。但是,如果我们不注意 MutexGuard<T>
的生命周期,此实现也可能导致锁被持有时间超出预期。
示例 20-20 中使用 let job = receiver.lock().unwrap().recv().unwrap();
的代码有效,因为使用 let
时,等号右侧表达式中使用的任何临时值都会在 let
语句结束后立即被丢弃。但是,while let
(以及 if let
和 match
)不会在关联块结束之前丢弃临时值。在示例 20-21 中,锁在调用 job()
的持续时间内保持持有状态,这意味着其他 worker 无法接收作业。