Channels
A channel is an easy way to use many threads that send to one place.它们相当流行,因为它们很容易组合在一起。你可以在Rust中用std::sync::mpsc
创建一个channel。mpsc
的意思是 "多个生产者,单个消费者",所以 "many threads sending to one place"。要启动一个通道,你可以使用 channel()
。这将创建一个 Sender
和一个 Receiver
,它们被绑在一起。你可以在函数签名中看到这一点。
所以你要选择一个发送者的名字和一个接收者的名字。通常你会看到像let (sender, receiver) = channel();
这样的开头。因为它是泛型函数,如果你只写这个,Rust不会知道类型。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel(); // ⚠️
}
编译器说:
error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
--> src\main.rs:30:30
|
30 | let (sender, receiver) = channel();
| ------------------ ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
| |
| consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified
它建议为Sender
和Receiver
添加一个类型。如果你愿意的话,可以这样做:
use std::sync::mpsc::{channel, Sender, Receiver}; // Added Sender and Receiver here
fn main() {
let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel();
}
但你不必这样做: 一旦你开始使用Sender
和Receiver
,Rust就能猜到类型。
所以我们来看一下最简单的使用通道的方法。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel();
sender.send(5);
receiver.recv(); // recv = receive, not "rec v"
}
现在编译器知道类型了。sender
是Result<(), SendError<i32>>
,receiver
是Result<i32, RecvError>
。所以你可以用.unwrap()
来看看发送是否有效,或者使用更好的错误处理。我们加上.unwrap()
,也加上println!
,看看得到什么。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel();
sender.send(5).unwrap();
println!("{}", receiver.recv().unwrap());
}
这样就可以打印出5
。
channel
就像Arc
一样,因为你可以克隆它,并将克隆的内容发送到其他线程中。让我们创建两个线程,并将值发送到receiver
。这段代码可以工作,但它并不完全是我们想要的。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel();
let sender_clone = sender.clone();
std::thread::spawn(move|| { // move sender in
sender.send("Send a &str this time").unwrap();
});
std::thread::spawn(move|| { // move sender_clone in
sender_clone.send("And here is another &str").unwrap();
});
println!("{}", receiver.recv().unwrap());
}
两个线程开始发送,然后我们println!
。它可能会打印 Send a &str this time
或 And here is another &str
,这取决于哪个线程先完成。让我们创建一个join句柄来等待它们完成。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel();
let sender_clone = sender.clone();
let mut handle_vec = vec![]; // Put our handles in here
handle_vec.push(std::thread::spawn(move|| { // push this into the vec
sender.send("Send a &str this time").unwrap();
}));
handle_vec.push(std::thread::spawn(move|| { // and push this into the vec
sender_clone.send("And here is another &str").unwrap();
}));
for _ in handle_vec { // now handle_vec has 2 items. Let's print them
println!("{:?}", receiver.recv().unwrap());
}
}
这个将打印:
"Send a &str this time"
"And here is another &str"
现在我们不打印,我们创建一个results_vec
。
use std::sync::mpsc::channel;
fn main() {
let (sender, receiver) = channel();
let sender_clone = sender.clone();
let mut handle_vec = vec![];
let mut results_vec = vec![];
handle_vec.push(std::thread::spawn(move|| {
sender.send("Send a &str this time").unwrap();
}));
handle_vec.push(std::thread::spawn(move|| {
sender_clone.send("And here is another &str").unwrap();
}));
for _ in handle_vec {
results_vec.push(receiver.recv().unwrap());
}
println!("{:?}", results_vec);
}
现在结果在我们的vec中:["Send a &str this time", "And here is another &str"]
。
现在让我们假设我们有很多工作要做,并且想要使用线程。我们有一个大的VEC,里面有1百万个元素,都是0,我们想把每个0都变成1,我们将使用10个线程,每个线程将做十分之一的工作。我们将创建一个新的VEC,并使用.extend()
来收集结果。
use std::sync::mpsc::channel;
use std::thread::spawn;
fn main() {
let (sender, receiver) = channel();
let hugevec = vec![0; 1_000_000];
let mut newvec = vec![];
let mut handle_vec = vec![];
for i in 0..10 {
let sender_clone = sender.clone();
let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // new vec to put the work in. 1/10th the size
work.extend(&hugevec[i*100_000..(i+1)*100_000]); // first part gets 0..100_000, next gets 100_000..200_000, etc.
let handle =spawn(move || { // make a handle
for number in work.iter_mut() { // do the actual work
*number += 1;
};
sender_clone.send(work).unwrap(); // use the sender_clone to send the work to the receiver
});
handle_vec.push(handle);
}
for handle in handle_vec { // stop until the threads are done
handle.join().unwrap();
}
while let Ok(results) = receiver.try_recv() {
newvec.push(results); // push the results from receiver.recv() into the vec
}
// Now we have a Vec<Vec<u8>>. To put it together we can use .flatten()
let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // Now it's one vec of 1_000_000 u8 numbers
println!("{:?}, {:?}, total length: {}", // Let's print out some numbers to make sure they are all 1
&newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // And show that the length is 1_000_000 items
);
for number in newvec { // And let's tell Rust that it can panic if even one number is not 1
if number != 1 {
panic!();
}
}
}