How can I proactively close a synchronous tungstenite websocket connection?

51 Views Asked by At

I am trying to write a websocket client to connect the server.

I put the websocket connection to another thread to read, then I'd like to close the connection from the main thread, but couldn't figure out a way to do it because the arc_ws_stream.lock() in the main thread will be blocked forever (locked by the mutex in the spawned thread).

Here are the dependencies in Cargo.toml

[dependencies]
tungstenite = "0.21.0"
url = "2.5.0"
native-tls = "0.2.11"
websocket = "0.27.0"

Here is the main.rs

#![allow(unused_imports)]
use std::{net::ToSocketAddrs, error, fmt::format, net, sync::{Arc, Mutex}, thread::{self, sleep}, time::Duration};
use tungstenite::{protocol::CloseFrame, WebSocket, Message, Error};
use url::Url;
use native_tls::TlsConnector;
use websocket::{result::WebSocketError, ClientBuilder, OwnedMessage};

fn main()  {
    connect_to_websocket("wss://127.0.0.1:7878/ws", String::from("login message"));
}

fn connect_to_websocket(uri: &str, login_msg: String){
  let ws_url = Url::parse(uri).unwrap();

  // Create a TLS connector that ignores invalid certificates
  let tls_connector = TlsConnector::builder().danger_accept_invalid_certs(true).build().unwrap();

  // Establish a TCP connection, then wrap the TCP stream with TLS and connect to the server
  let remote_addr = format!("{}:{}", ws_url.host().unwrap(), ws_url.port().unwrap());
  let tcp_stream = std::net::TcpStream::connect(remote_addr.clone()).unwrap();
  let tls_stream = tls_connector.connect(remote_addr.as_str(), tcp_stream).unwrap();
  let (mut raw_ws_stream, _) = tungstenite::client(uri, tls_stream).unwrap();

  raw_ws_stream.send(Message::Text(login_msg)).unwrap();

  let arc_ws_stream = Arc::new(Mutex::new(raw_ws_stream));
  let thread_ws_stream = arc_ws_stream.clone();
  let ws_recv_thread = thread::spawn(move ||{
    loop{
      let msg = thread_ws_stream.lock().unwrap().read();
      if !process_ws_msg(msg) { 
        break;
      }
    }
  });  

  sleep(Duration::from_millis(5_000));
  let _ = arc_ws_stream.lock().unwrap().close(None);  //this line will be blocked forever
  let _ = ws_recv_thread.join();  
}

fn process_ws_msg(msg: Result<Message, Error>)->bool {
    match msg{
        Ok(msg) => {
          match msg {
            Message::Text(msg) => {
                println!("WS received message: {}", msg);
            },
            Message::Close(cls_msg) => {
              match cls_msg{
                Some(cls_frame) =>{
                  println!("WS session closed message, close frame = {}", cls_frame);
                }
                None =>{
                  println!("WS session closed message, no close frame");
                }
              }
              return  false;
            },
            _ => {}
          }
        },
        Err(err) =>{
          println!("WS session error: {}", err);
          return false;
        }
    }
    true
}

I tried to use async solution but couldn't solve the invalid SSL certificate issue.

0

There are 0 best solutions below