Is there a recommended rust multi-threaded tcp communication program model?

1.2k Views Asked by At

While learning some Rust, I saw a lot of tutorials that used two very simple models. One is on the server side, where all the accepted tcpstreams are moved to a new thread for use, and the other is on the client side, using blocking reads and then output.

But for a real project, this is definitely not enough. For example, on the client side, it is usually not possible to block the main thread to read the data. So either use non-blocking sockets, or use multi-threaded or asynchronous io.

Since I am new to Rust, I don't plan to use async io or tokio libraries for this.

Suppose I use a thread to block reading data, and send data or close the tcp connection in the main thread.

As a general practice, since the tcp connection is used in two threads, then generally we have to use Arc<Mutex<TcpStream>> to use the connection variable.

But when I need to read in the read thread, I will do Mutex::lock() to get the TcpStream, and when I send or close in the main thread, I also need to do Mutex::lock(). Won't this cause a deadlock?

Of course, another way is to poll a message queue in a new thread, and send commands like this one when the socket has a read event, or when the main thread needs to send data or close the connection. This way the access to the TcpStream is done in one thread. However, it seems to add a lot of extra code for maintaining the message queue.

If the TcpStream can generate two ends, just like channel, a read end and a write end. I will use them in different threads conveniently. But it seems no such function provided.

Is there a recommended approach?

2

There are 2 best solutions below

0
user4815162342 On BEST ANSWER

Not sure about a "recommended" approach, but you don't need a mutex to read/write from a TcpStream because io traits are implemented for &TcpStream in addition to TcpStream. This allows you to call methods like read() on &stream, which can be easily shared among threads using Arc. For example:

use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::TcpStream;
use std::sync::Arc;

fn main() -> std::io::Result<()> {
    let stream = Arc::new(TcpStream::connect("127.0.0.1:34254")?);
    let (r, w) = (Arc::clone(&stream), stream);

    let thr1 = std::thread::spawn(move || -> std::io::Result<()> {
        let r = BufReader::new(r.as_ref());
        for line in r.lines() {
            println!("received: {}", line?);
        }
        Ok(())
    });
    let thr2 = std::thread::spawn(move || {
        let mut w = BufWriter::new(w.as_ref());
        w.write_all(b"Hello\n")
    });
    thr1.join().unwrap()?;
    thr2.join().unwrap()?;
    Ok(())
}
0
Yilmaz On

But when I need to read in the read thread, I will do Mutex::lock() to get the TcpStream, and when I send or close in the main thread, I also need to do Mutex::lock(). Won't this cause a deadlock?

I think the only thing you need to use lock if you want to have a certain number of threads. Let's say you want to allow only 5 threads, you create a variable and keep track of it inside threads. But to mutate this variable you need to lock it first and you should do it inside a code block.

use std::time::Duration;
use std::{fs, thread};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};

fn main(){
    let listener=TcpListener::bind("127.0.0.1:5000").unwrap();
    // we want to have multiple ownership and mutability inside threads
    let mut active_threads=Arc::new(Mutex::new(0));

    for stream in listener.incoming(){
        let active_requests=Arc::clone(&active_threads);
        let stream=stream.unwrap();
        thread::spawn(move ||{
        {
            /*
             - If you do not lock the var inside code block, this var will be locked till the end of handle_incoming_connection() and meanwhile no thread can access to this variable.
             - the purpose of using threads will faile      
             */
            let mut connection=active_requests.lock().unwrap();
            // you can limit the connections
            *connection+=1;
            if *connection>=5{
                // that means this is the last allowed thread. I wait 2 seconds to have some time for others to finish the connection
                thread::sleep(Duration::from_secs(2));
            }
        }
            // lock will be released. other threads can start to take other connections
            // write a function about how to handle the incoming connection
            handle_incoming_connection(stream);
            // after you handled the incoming connection decrease the number of active connections
            {
                let mut connection=active_requests.lock().unwrap();
                *connection-=1;
            }
        });
      
    }
    
}