Rust 多线程程序和异步程序简单编写
Contents
程序介绍
这次我打算把一个 select 多路复用的程序改成多线程版本和异步版本的,他的工作流程如下
- 从
stdin
读取数据,处理数据 - 从
socket
读取数据,处理数据
而多线程和异步版本的工作流程则是
- 创建数据通道
channel
- 创建一个 线程/Future, 从
stdin
读取数据,将数据写入channel
- 创建一个 线程/Future, 从
socket
读取数据,将数据写入channel
- 创建一个 线程/Future, 从
channel
读取数据,处理数据
我们马上来做一下看吧
多线程程序
从 socket 读取数据的线程
let thread1 = thread::spawn(move || {
loop {
let (mut stream, _) = server.accept().unwrap();
let mut received = String::new();
if let Err(_) = stream.read_to_string(&mut received) {
eprintln!("recv error occur");
}
if let Err(e) = sender1.send(received) {
eprintln!("send error occur: {}", e);
}
}
});
从 stdin 读取数据的线程
let thread2 = thread::spawn(move || {
loop {
let mut input = io::stdin().lock().lines().next().unwrap().unwrap();
if input == "quit" {
process::exit(0);
}
if let Err(e) = sender2.send(input) {
eprintln!("send error occur: {}", e);
}
}
});
从 channel 读取数据的线程
let manager_thread = thread::spawn(move || {
loop {
match receiver.recv() {
Ok(received) => {
process(received);
},
Err(e) => {
eprintln!("{}", e);
}
}
}
});
完整代码
use std::net::TcpListener;
use std::sync::{Arc, mpsc::{self, Sender, Receiver}};
use std::thread;
use std::io;
use std::io::{BufRead, Read};
use std::process;
fn main() {
let server = TcpListener::bind("127.0.0.1:9999").unwrap();
let (sender, receiver) = mpsc::channel::<String>();
let sender1 = sender.clone();
let sender2 = sender.clone();
// recv from client
let thread1 = thread::spawn(move || {
loop {
let (mut stream, _) = server.accept().unwrap();
let mut received = String::new();
let _ = stream.read_to_string(&mut received);
if let Err(e) = sender1.send(received) {
println!("send error: {}", e);
}
}
});
// recv from stdin
let thread2 = thread::spawn(move || {
loop {
let mut input = io::stdin().lock().lines().next().unwrap().unwrap();
if input == "quit" {
process::exit(0);
}
if let Err(e) = sender2.send(input) {
eprintln!("send error: {}", e);
}
}
});
let manager_thread = thread::spawn(move || {
loop {
match receiver.recv() {
Ok(received) => {
process(received);
},
Err(e) => {
eprintln!("{}", e);
}
}
}
});
thread1.join().unwrap();
thread2.join().unwrap();
manager_thread.join().unwrap();
}
fn process(string: String) {
println!("recv {} bytes from peer end: {}", string.len(), string);
}
异步程序
注意
没事不要作死写 stdin + async 的程序,会出现莫名其妙的阻塞,就像我的代码
从 socket 读取数据的 task
let task1 = tokio::spawn(async move {
loop {
let (mut stream, _) = server.accept().unwrap();
let mut received = String::new();
if let Err(_) = stream.read_to_string(&mut received) {
eprintln!("recv error occur");
break;
}
if let Err(e) = sender1.send(received).await {
eprintln!("send error occur: {}", e);
break;
}
}
});
从 stdin 读取数据的 task
let task2 = tokio::spawn(async move {
loop {
let input = io::stdin().lock().lines().next().unwrap().unwrap();
if input.trim() == "quit" {
process::exit(0);
}
println!("before send2 send");
if let Err(e) = sender2.send_timeout(input, Duration::from_secs(10)).await {
eprintln!("send error occur: {}", e);
// sender2.send(String::from("quit"));
process::exit(0);
}
println!("after send2 send");
}
});
从 channel 读取数据的 task
let manager_task = tokio::spawn(async move {
loop {
if let Some(received) = receiver.recv().await {
process(received);
}
}
});
完整代码
use std::{process};
use std::io::{self, BufRead, Read};
use std::net::{TcpListener};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
#[tokio::main]
async fn main() {
let server = TcpListener::bind("127.0.0.1:9999").unwrap();
let (sender, mut receiver) = mpsc::channel::<String>(1);
let sender1 = sender.clone();
let sender2 = sender.clone();
let task1 = tokio::spawn(async move {
loop {
let (mut stream, _) = server.accept().unwrap();
let mut received = String::new();
if let Err(_) = stream.read_to_string(&mut received) {
eprintln!("recv error occur");
break;
}
if let Err(e) = sender1.send(received).await {
eprintln!("send error occur: {}", e);
break;
}
}
});
let task2 = tokio::spawn(async move {
loop {
let input = io::stdin().lock().lines().next().unwrap().unwrap();
if input.trim() == "quit" {
process::exit(0);
}
println!("before send2 send");
if let Err(e) = sender2.send_timeout(input, Duration::from_secs(10)).await {
eprintln!("send error occur: {}", e);
// sender2.send(String::from("quit"));
process::exit(0);
}
println!("after send2 send");
}
});
let manager_task = tokio::spawn(async move {
loop {
if let Some(received) = receiver.recv().await {
process(received);
}
}
});
task1.await.unwrap();
task2.await.unwrap();
manager_task.await.unwrap();
}
fn process(string: String) {
println!("read {} from peer: {}", string.len(), string);
}