进阶篇-多线程

  • 木头
  • 更新于 2023-03-22 11:30
  • 阅读 1775

多线程的使用

在大部分现代操作系统中,已执行程序的代码在一个 进程(process)中运行,操作系统则会负责管理多个进程。在程序内部,也可以拥有多个同时运行的独立部分。这些运行这些独立部分的功能被称为 线程(threads)

将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题:

  • 竞态状态,多个线程以不一致的顺序访问数据或资源。
  • 死锁,两个线程相互等待对方停止使用其所拥有的资源,造成两者都永久等待。
  • 只会发生在特定情况且难以稳定重现和修复的 bug。

编程语言提供的线程叫做绿色线程,如go语言,在底层实现了M:N的模型,即M个绿色线程对应NOS线程。但是,Rust标准库只提供1:1的线程模型的实现,即一个Rust线程对应一个OS线程。

线程

创建新线程

为了创建一个新线程,需要调用thread::spawn函数并传递一个闭包:

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("{} in spawn thread", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("{} in main thread", i);
        thread::sleep(Duration::from_millis(1));
    }
}

注意当 Rust 程序的主线程结束时,新线程也会结束,而不管其是否执行完毕。这个程序的输出可能每次都略有不同,不过它大体上看起来像这样:

1 in main thread
1 in spawn thread
2 in main thread
2 in spawn thread
3 in main thread
3 in spawn thread
4 in main thread
4 in spawn thread
5 in spawn thread

thread::sleep 调用强制线程停止执行一小段时间,这会允许其他不同的线程运行。这些线程可能会轮流运行,不过并不保证如此:这依赖操作系统如何调度线程。

等待所有线程结束

由于主线程结束,大部分时候不光会提早结束新建线程,因为无法保证线程运行的顺序,我们甚至不能实际保证新建线程会被执行!

可以通过将thread::spawn 的返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题。thread::spawn的返回值类型是 JoinHandleJoinHandle 是一个拥有所有权的值,当对其调用 join 方法时,它会等待其线程结束:

use std::{thread, time::Duration};

fn main() {
    let t1 = thread::spawn(|| {
        for i in 1..10 {
            println!("{} in spawn thread", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("{} in main thread", i);
        thread::sleep(Duration::from_millis(1));
    }

    t1.join().unwrap();
}

通过调用 t1join 会阻塞当前线程直到 t1 所代表的线程结束。阻塞线程意味着阻止该线程执行工作或退出。因为我们将 join 调用放在了主线程的 for 循环之后,运行输出:

1 in main thread
1 in spawn thread
2 in main thread
2 in spawn thread
3 in main thread
3 in spawn thread
4 in main thread
4 in spawn thread
5 in spawn thread
6 in spawn thread
7 in spawn thread
8 in spawn thread
9 in spawn thread

这两个线程仍然会交替执行,不过主线程会由于 t1.join() 调用会等待直到新建线程执行完毕。

不过让我们看看将 t1.join() 移动到mainfor循环之前会发生什么,如下:

use std::{thread, time::Duration};

fn main() {
    let t1 = thread::spawn(|| {
        for i in 1..10 {
            println!("{} in spawn thread", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    t1.join().unwrap();

    for i in 1..5 {
        println!("{} in main thread", i);
        thread::sleep(Duration::from_millis(1));
    }
}

运行输出:

1 in spawn thread
2 in spawn thread
3 in spawn thread
4 in spawn thread
5 in spawn thread
6 in spawn thread
7 in spawn thread
8 in spawn thread
9 in spawn thread
1 in main thread
2 in main thread
3 in main thread
4 in main thread

join放置于何处,会影响线程是否同时运行。

线程与 move 闭包

在之前示例thread::spawn的闭包并没有任何参数,并没有在新建线程代码中使用任何主线程的数据。为了在新建线程中使用来自于主线程的数据,需要新建线程的闭包获取它需要的值:

use std::thread;

fn main() {
    let s = "hello".to_string();
    let t1 = thread::spawn(|| {
        println!("{}", s);
    });
    t1.join().unwrap();
}

闭包使用了s,所以闭包会捕获 s并使其成为闭包环境的一部分。因为 thread::spawn 在一个新线程中运行这个闭包,所以可以在新线程中访问s。然而当编译这个例子时,会得到如下错误:

$ cargo run
   Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0373]: closure may outlive the current function, but it borrows `s`, which is owned by the current function
 --> src/main.rs:4:28
  |
4 |     let t1 = thread::spawn(|| {
  |                            ^^ may outlive borrowed value `s`
5 |         println!("{}", s);
  |                        - `s` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:4:14
  |
4 |       let t1 = thread::spawn(|| {
  |  ______________^
5 | |         println!("{}", s);
6 | |     });
  | |______^
help: to force the closure to take ownership of `s` (and any other referenced variables), use the `move` keyword
  |
4 |     let t1 = thread::spawn(move || {
  |                            ++++

For more information about this error, try `rustc --explain E0373`.
error: could not compile `multithreading` due to previous error

Rust 会 推断 如何捕获 s,因为 println!只需要 s 的引用,闭包尝试借用s。然而这有一个问题:Rust不知道这个新建线程会执行多久,所以无法知晓 s 的引用是否一直有效。

看错误信息的建议在闭包之前增加move 关键字,我们强制闭包获取其使用的值的所有权,而不是任由 Rust 推断它应该借用值:

use std::thread;

fn main() {
    let s = "hello".to_string();
    let t1 = thread::spawn(move || {
        println!("{}", s);
    });
    t1.join().unwrap();
}

Rust 是保守的并只会为线程借用 s,这意味着主线程理论上可能使新建线程的引用无效。通过告诉Rusts 的所有权移动到新建线程,我们向 Rust 保证主线程不会再使用s

消息传递(线程间传送数据)

Rust中一个实现消息传递并发的主要工具是通道,Rust通道(channel)可以把一个线程的消息(数据)传递到另一个线程,从而让信息在不同的线程中流动,从而实现协作。通道由两部分组成,一个是发送端,一个是接收端,发送端用来发送消息,接收端用来接收消息。发送者或者接收者任一被丢弃时就可以认为通道被关闭了。

  • 通过mpsc::channel,创建通道,mpsc是多个生产者,单个消费者。
  • 通过spmc::channel,创建通道,spmc是一个生产者,多个消费者。

首先,创建了一个通道但没有做任何事。注意这还不能编译,因为 Rust不知道我们想要在信道中发送什么类型:

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

这里使用 mpsc::channel 函数创建一个新的通道;mpsc是 多个生产者,单个消费者。Rust 标准库实现通道的方式意味着一个通道可以有多个产生值的 发送端,但只能有一个消费这些值的 接收端。

mpsc::channel函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。

让我们将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了:

use std::{sync::mpsc, thread};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("你好");
        tx.send(val).unwrap();
    });
}

这里再次使用 thread::spawn 来创建一个新线程并使用movetx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有通道的发送端以便能向信道发送消息。通道的发送端有一个 send 方法用来获取需要放入通道的值。

我们在主线程中从通道的接收者获取值:

use std::{sync::mpsc, thread};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("你好");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("rx: {}", received);
}

通道的接收者有两个有用的方法:recvtry_recv。这里,我们使用了recv,这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E>中返回它。当通道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。

try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>:Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。

通道的所有权转移

我们在新建线程中的通道中发送完 val值 之后 再使用它:

use std::{sync::mpsc, thread};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("tx : {}", val);
    });

    let received = rx.recv().unwrap();
    println!("rx: {}", received);
}

这里尝试在通过 tx.send 发送 val 到通道中之后将其打印出来。在调用 send的时候,会发生move动作,所以此处不能再使用val

$ cargo run
   Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0382]: borrow of moved value: `val`
 --> src/main.rs:9:29
  |
7 |         let val = String::from("hi");
  |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
8 |         tx.send(val).unwrap();
  |                 --- value moved here
9 |         println!("tx : {}", val);
  |                             ^^^ value borrowed here after move
  |
  = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `multithreading` due to previous error

send 函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;

发送多个值

新建线程现在会发送多个消息并在每个消息之间暂停一秒钟:

use std::{sync::mpsc, thread, time::Duration};

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("A"),
            String::from("B"),
            String::from("C"),
            String::from("D"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("rx: {}", received);
    }
}

这一次,在新建线程中有一个字符串vector 希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个Duration值调用 thread::sleep 函数来暂停一秒。

在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当信道被关闭时,迭代器也将结束。

看到如下输出,每一行都会暂停一秒:

rx: A
rx: B
rx: C
rx: D

因为主线程中的 for 循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值。

多个生产者和单个消费者

之前有向同一接收者发送值的多个线程。这可以通过复制发送者来做到:

use std::{sync::mpsc, thread, time::Duration};

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();

    thread::spawn(move || {
        let vals = vec![
            String::from("A1"),
            String::from("B1"),
            String::from("C1"),
            String::from("D1"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("A2"),
            String::from("B2"),
            String::from("C2"),
            String::from("D2"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("rx: {}", received);
    }
}

这一次,在创建新线程之前,我们对发送者调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的信道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。

如果运行这些代码输出:

rx: A1
rx: A2
rx: B2
rx: B1
rx: C1
rx: C2
rx: D1
rx: D2

虽然你可能会看到这些值以不同的顺序出现;

互斥器

互斥器任意时刻,其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的锁(lock)来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 保护(guarding)其数据。

互斥器以难以使用著称,因为你不得不记住:

  1. 在使用数据之前尝试获取锁。
  2. 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁。

Mutex<T>

互斥器的单线程例子:

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);
    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }
    println!("m = {:?}", m);
}

像很多类型一样,我们使用关联函数new 来创建一个 Mutex&lt;T>。使用 lock 方法获取锁,以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。

如果另一个线程拥有锁,并且那个线程 panic 了,则 lock 调用会失败。在这种情况下,没人能够再获取锁,所以这里选择 unwrap并在遇到这种情况时使线程 panic

一旦获取了锁,就可以将返回值(在这里是num)视为一个其内部数据的可变引用了。类型系统确保了我们在使用m中的值之前获取锁。m 的类型是 Mutex&lt;i32> 而不是 i32,所以 必须获取锁才能使用这个 i32 值。我们是不会忘记这么做的,因为反之类型系统不允许访问内部的 i32值。

Mutex&lt;T>其实一个智能指针。更准确的说,lock调用 返回 一个叫做MutexGuard的智能指针。这个智能指针实现了 Deref 来指向其内部数据;其也提供了一个 Drop 实现当 MutexGuard 离开作用域时自动释放锁。丢弃了锁之后,可以打印出互斥器的值,并发现能够将其内部的 i32 改为6

共享 Mutex<T'>

现在尝试使用Mutex&lt;T>在多个线程间共享值。我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为10

use std::{sync::Mutex, thread};

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("counter = {}", *counter.lock().unwrap());
}

这里创建了一个 counter 变量来存放内含 i32Mutex&lt;T>,接下来遍历 range 创建了10 个线程。使用了 thread::spawn并对所有线程使用了相同的闭包:他们每一个都将调用 lock方法来获取 Mutex&lt;T> 上的锁,接着将互斥器中的值加一。当一个线程结束执行,num 会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。

在主线程中,我们收集了所有的 join句柄,调用它们的 join方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果。

这个例子不能编译:

$ cargo run               
   Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0382]: use of moved value: `counter`
 --> src/main.rs:8:36
  |
4 |     let counter = Mutex::new(0);
  |         ------- move occurs because `counter` has type `Mutex&lt;i32>`, which does not implement the `Copy` trait
...
8 |         let handle = thread::spawn(move || {
  |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
9 |             let mut num = counter.lock().unwrap();
  |                           ------- use occurs due to use in closure

For more information about this error, try `rustc --explain E0382`.
error: could not compile `multithreading` due to previous error

错误信息表明 counter 值在上一次循环中被移动了。所以 Rust 告诉我们不能将counter锁的所有权移动到多个线程中。

多所有权

通过使用智能指针Rc&lt;T>来创建引用计数的值,以便拥有多所有者。让我们在这也这么做看看会发生什么。将Mutex&lt;T>封装进 Rc&lt;T> 中并在将所有权移入线程之前克隆了Rc&lt;T>

use std::{rc::Rc, sync::Mutex, thread};

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("counter = {}", *counter.lock().unwrap());
}

再一次编译并出现了不同的错误:

`Rc&lt;Mutex&lt;i32>>` cannot be sent between threads safely

Rc&lt;T>并不能安全的在线程间共享。当Rc&lt;T>管理引用计数时,它必须在每一个 clone 调用时增加计数,并在每一个克隆被丢弃时减少计数。Rc&lt;T> 并没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断。在计数出错时可能会导致诡异的 bug,比如可能会造成内存泄漏,或在使用结束之前就丢弃一个值。我们所需要的是一个完全类似 Rc&lt;T>,又以一种线程安全的方式改变引用计数的类型。

Arc<T'>

所幸 Arc&lt;T>正是 这么一个类似Rc&lt;T>并可以安全的用于并发环境的类型。

use std::{
    sync::{Arc, Mutex},
    thread,
};

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("counter = {}", *counter.lock().unwrap());
}

运行:

counter = 10

RefCell<T'>/Rc<T'> 与 Mutex<T'>/Arc<T'> 的相似性

你可能注意到了,因为 counter 是不可变的,不过可以获取其内部值的可变引用;这意味着 Mutex&lt;T>提供了内部可变性,就像 Cell 系列类型那样。RefCell&lt;T>可以改变Rc&lt;T>中的内容那样,同样的可以使用 Mutex&lt;T> 来改变 Arc&lt;T> 中的内容。

  • 原创
  • 学分: 7
  • 分类: Rust
  • 标签:
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。
124 订阅 31 篇文章

0 条评论

请先 登录 后评论
木头
木头
0xC020...10cf
江湖只有他的大名,没有他的介绍。