Sending mutable reference to async wont give it back after shutdown of async

45 Views Asked by At

have a stream and I want to keep sending packets into it from a async function (spawned via tokio) until my main fn receives user input. Then the runtime that contained the spawned async will be shutdown and another function should send a disconnect packet into the stream.

However I get an error about not being able to borrow a mutable reference twice. But the function that received the first mutable reference already was terminated, so why can I still not borrow a new reference?

That's my main function:

fn main() -> io::Result<()> {
    
    let socket_path = "/run/user/1000/discord-ipc-0";
    let mut stream = UnixStream::connect(socket_path)?;

    let rt  = Runtime::new().unwrap();
    rt.spawn(update_presence(&mut stream));
    {
        let mut confirm = String::new();
        let _ = io::stdin().read_line(&mut confirm);
    }
    rt.shutdown_background(); 

    disconnect_stream(&mut stream)?;

    Ok(())

}

update_presence() contains a loop{} that continues to send packets into the stream every few seconds, until the runtime gets shutdown.

1

There are 1 best solutions below

0
drewtato On

I'm assuming you're using Tokio.

The surface-level issue is that spawn requires the future to be 'static, which means it cannot borrow local variables. The deeper issue is that shutdown_background doesn't synchronize your current thread with the runtime, so your &mut stream may still be used afterward.

If you must mix blocking IO (std::io::stdin) and async IO (UnixStream), then the simplest way is probably to move your stream into the task, then when you want to terminate it, send a signal into the async task that cancels your task and returns the stream. (playground)

let (send_shutdown, recv_shutdown) = oneshot::channel();
let runtime = Runtime::new().unwrap();

let task = runtime.spawn(async move {
    tokio::select! {
        () = update_presence(&mut stream) => unreachable!(),
        res = recv_shutdown => res.unwrap(),
    }
    // `update_presence` is dropped when `select!` ends
    stream
});

let mut confirm = String::new();
std::io::stdin().read_line(&mut confirm).unwrap();

send_shutdown.send(()).unwrap();
let mut stream = runtime.block_on(task).unwrap();

disconnect_stream(&mut stream);

If you use Tokio's stdin, then the solution is a little simpler. (playground)

use tokio::io::{stdin, AsyncBufReadExt, BufReader};

let runtime = Runtime::new().unwrap();
let shutdown = async {
    let mut confirm = String::new();
    BufReader::new(stdin()).read_line(&mut confirm).await
};

runtime.block_on(async {
    tokio::select! {
        () = update_presence(&mut stream) => unreachable!(),
        res = shutdown => res.unwrap(),
    }
});

disconnect_stream(&mut stream);