共享状态并发

消息传递是处理并发的一种很好的方式,但它不是唯一的方式。另一种方法是让多个线程访问相同的共享数据。再次考虑 Go 语言文档中这部分标语:“不要通过共享内存来通信。”

通过共享内存进行通信会是什么样子?此外,为什么消息传递爱好者会告诫不要使用内存共享?

在某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦你通过通道传递一个值,你就应该不再使用该值。共享内存并发就像多所有权:多个线程可以同时访问相同的内存位置。正如你在第 15 章中看到的,智能指针使多所有权成为可能,多所有权会增加复杂性,因为这些不同的所有者需要管理。Rust 的类型系统和所有权规则极大地帮助正确地进行这种管理。例如,让我们看看互斥锁,这是共享内存更常见的并发原语之一。

使用互斥锁来允许一次从一个线程访问数据

互斥锁互斥(mutual exclusion)的缩写,顾名思义,互斥锁只允许一个线程在任何给定时间访问某些数据。要访问互斥锁中的数据,线程必须首先发出信号,表明它想要通过请求获取互斥锁的来访问。锁是互斥锁的一部分数据结构,用于跟踪当前谁拥有对数据的独占访问权。因此,互斥锁被描述为通过锁定系统保护它所持有的数据。

互斥锁因难以使用而闻名,因为你必须记住两条规则

  • 在使用数据之前,你必须尝试获取锁。
  • 当你完成对互斥锁保护的数据的操作后,你必须解锁数据,以便其他线程可以获取锁。

对于互斥锁的真实世界比喻,想象一下会议上的小组讨论,只有一个麦克风。在小组成员可以发言之前,他们必须请求或发出信号表明他们想要使用麦克风。当他们拿到麦克风时,他们可以随意发言,然后将麦克风交给下一个请求发言的小组成员。如果小组成员忘记在说完后交出麦克风,那么其他人就无法发言。如果共享麦克风的管理出现问题,小组讨论将无法按计划进行!

互斥锁的管理可能非常棘手,这就是为什么这么多人对通道充满热情的原因。但是,得益于 Rust 的类型系统和所有权规则,你不可能在锁定和解锁方面犯错。

Mutex<T> 的 API

作为如何使用互斥锁的示例,让我们首先在单线程上下文中使用互斥锁,如列表 16-12 所示

文件名:src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {m:?}");
}

列表 16-12:为了简单起见,在单线程上下文中探索 Mutex<T> 的 API

与许多类型一样,我们使用关联函数 new 创建 Mutex<T>。要访问互斥锁内部的数据,我们使用 lock 方法来获取锁。此调用将阻塞当前线程,使其在轮到我们拥有锁之前无法执行任何工作。

如果另一个持有锁的线程发生 panic,则对 lock 的调用将失败。在这种情况下,没有人能够获得锁,因此我们选择 unwrap,并且如果我们在这种情况下,则让此线程 panic。

在我们获取锁之后,我们可以将返回值(在本例中命名为 num)视为对内部数据的可变引用。类型系统确保我们在使用 m 中的值之前获取锁。m 的类型是 Mutex<i32>,而不是 i32,因此我们必须调用 lock 才能使用 i32 值。我们不会忘记;否则类型系统不会让我们访问内部 i32

你可能怀疑,Mutex<T> 是一个智能指针。更准确地说,对 lock 的调用返回一个名为 MutexGuard 的智能指针,它被包装在 LockResult 中,我们使用 unwrap 的调用处理了它。MutexGuard 智能指针实现了 Deref 以指向我们的内部数据;智能指针还具有 Drop 实现,该实现在 MutexGuard 超出作用域(发生在内部作用域的末尾)时自动释放锁。因此,我们不会冒忘记释放锁并阻止其他线程使用互斥锁的风险,因为锁释放是自动发生的。

在释放锁之后,我们可以打印互斥锁值,并看到我们能够将内部 i32 更改为 6。

在多个线程之间共享 Mutex<T>

现在,让我们尝试使用 Mutex<T> 在多个线程之间共享一个值。我们将启动 10 个线程,并让它们每个线程将计数器值递增 1,因此计数器从 0 变为 10。列表 16-13 中的下一个示例将出现编译器错误,我们将使用该错误来了解更多关于使用 Mutex<T> 的信息,以及 Rust 如何帮助我们正确使用它。

文件名:src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

列表 16-13:十个线程各自递增一个由 Mutex<T> 保护的计数器

我们创建一个 counter 变量来保存 Mutex<T> 内部的 i32,就像我们在列表 16-12 中所做的那样。接下来,我们通过迭代数字范围来创建 10 个线程。我们使用 thread::spawn 并为所有线程提供相同的闭包:一个将 counter 移动到线程中,通过调用 lock 方法获取 Mutex<T> 上的锁,然后将互斥锁中的值加 1 的闭包。当线程完成运行其闭包时,num 将超出作用域并释放锁,以便另一个线程可以获取它。

在主线程中,我们收集所有 join 句柄。然后,正如我们在列表 16-2 中所做的那样,我们在每个句柄上调用 join 以确保所有线程都完成。此时,主线程将获取锁并打印此程序的结果。

我们暗示过这个例子不会编译。现在让我们找出原因!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8  |     for _ in 0..10 {
   |     -------------- inside of this loop
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
8  ~     let mut value = counter.lock();
9  ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

错误消息指出 counter 值在循环的先前迭代中被移动。Rust 告诉我们,我们不能将 counter 的所有权移动到多个线程中。让我们使用我们在第 15 章中讨论的多所有权方法来修复编译器错误。

多线程的多所有权

在第 15 章中,我们通过使用智能指针 Rc<T> 创建引用计数的值,从而为一个值提供了多个所有者。让我们在这里也这样做,看看会发生什么。我们将列表 16-14 中的 Mutex<T> 包装在 Rc<T> 中,并在将所有权移动到线程之前克隆 Rc<T>

文件名:src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

列表 16-14:尝试使用 Rc<T> 允许多个线程拥有 Mutex<T>

再一次,我们编译并得到……不同的错误!编译器正在教我们很多东西。

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`, which is required by `{closure@src/main.rs:11:36: 11:43}: Send`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/eeb90cda1969383f56a2637cbd3037bdf598841c/library/std/src/thread/mod.rs:688:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

哇,这个错误消息非常冗长!这里是需要关注的重点:`Rc<Mutex<i32>>` cannot be sent between threads safely`Rc<Mutex<i32>>` 无法在线程之间安全发送)。编译器还告诉我们原因:the trait `Send` is not implemented for `Rc<Mutex<i32>>` (trait `Send` 没有为 `Rc<Mutex<i32>>` 实现)。我们将在下一节讨论 Send:它是确保我们与线程一起使用的类型旨在用于并发情况的 trait 之一。

不幸的是,Rc<T> 不能安全地跨线程共享。当 Rc<T> 管理引用计数时,它会在每次调用 clone 时增加计数,并在每次克隆被丢弃时从计数中减去。但是它不使用任何并发原语来确保对计数的更改不会被另一个线程中断。这可能会导致错误的计数——细微的错误,反过来可能导致内存泄漏或在完成值之前将其丢弃。我们需要的是一种与 Rc<T> 完全相同的类型,但以线程安全的方式更改引用计数的类型。

使用 Arc<T> 进行原子引用计数

幸运的是,Arc<T> 一种类似于 Rc<T> 的类型,可以安全地在并发情况中使用。a 代表 atomic(原子),意思是它是一种原子引用计数类型。原子性是另一种并发原语,我们在此处不会详细介绍:有关更多详细信息,请参阅标准库文档 std::sync::atomic。在这一点上,你只需要知道原子性就像原始类型一样工作,但可以安全地跨线程共享。

你可能想知道为什么所有原始类型都不是原子类型,以及为什么标准库类型没有实现为默认使用 Arc<T>。原因是线程安全性会带来性能损失,你只想在真正需要时才付出这种代价。如果你只是在单个线程中对值执行操作,那么如果你的代码不必强制执行原子性提供的保证,则可以运行得更快。

让我们回到我们的示例:Arc<T>Rc<T> 具有相同的 API,因此我们通过更改 use 行、对 new 的调用以及对 clone 的调用来修复我们的程序。列表 16-15 中的代码最终将编译并运行

文件名:src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

列表 16-15:使用 Arc<T> 包装 Mutex<T> 以便能够在多个线程之间共享所有权

此代码将打印以下内容

Result: 10

我们成功了!我们从 0 数到 10,这看起来可能不是很令人印象深刻,但它确实教会了我们很多关于 Mutex<T> 和线程安全性的知识。你也可以使用此程序的结构来执行比仅递增计数器更复杂的操作。使用此策略,你可以将计算划分为独立的部分,将这些部分分配到线程中,然后使用 Mutex<T> 让每个线程使用其部分更新最终结果。

请注意,如果你正在执行简单的数值运算,则标准库的 std::sync::atomic 模块提供了比 Mutex<T> 类型更简单的类型。这些类型提供了对原始类型的安全、并发、原子访问。我们选择在本示例中使用带有原始类型的 Mutex<T>,以便我们可以专注于 Mutex<T> 的工作原理。

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 之间的相似之处

你可能已经注意到 counter 是不可变的,但我们可以获得对其内部值的可变引用;这意味着 Mutex<T> 提供了内部可变性,正如 Cell 系列所做的那样。就像我们在第 15 章中使用 RefCell<T> 来允许我们改变 Rc<T> 内部的内容一样,我们使用 Mutex<T> 来改变 Arc<T> 内部的内容。

需要注意的另一个细节是,当你使用 Mutex<T> 时,Rust 无法保护你免受所有类型的逻辑错误的影响。回想一下第 15 章,使用 Rc<T> 会带来创建引用循环的风险,其中两个 Rc<T> 值相互引用,导致内存泄漏。同样,Mutex<T> 也会带来创建死锁的风险。当一个操作需要锁定两个资源,而两个线程各自获得其中一个锁时,就会发生死锁,导致它们永远互相等待。如果你对死锁感兴趣,请尝试创建一个具有死锁的 Rust 程序;然后研究任何语言中互斥锁的死锁缓解策略,并尝试在 Rust 中实现它们。Mutex<T>MutexGuard 的标准库 API 文档提供了有用的信息。

我们将通过讨论 SendSync trait 以及我们如何在自定义类型中使用它们来结束本章。