Not getting result from Mio poll

163 Views Asked by At

I've built a basic runtime using smol with flume as the queue and futures_lite for the blocking of tasks until they are complete. I am now trying to get Mio polling directly in the poll function of a standard Future. However, when I run a TCP listener with Mio all in the main function it works fine. However, when I try and put the Mio TCP polling in the poll function of a Future I get the printout that there is not events every 5 seconds meaning that the future is being put back on the task queue to be polled again. However, when I send a data packet over TCP, the no events printout happens instantly and the client does not error. This means that the data is being sent to the socket but the Mio polling just doesn't seem to yield any data from the socket. Below is the code of the future:

use mio::net::{TcpListener, TcpStream};
use mio::{Events, Interest, Poll as MioPoll, Token};
use std::io::{self, Read, Write};
use std::time::Duration;
use std::error::Error;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

const SERVER: Token = Token(0);

struct ServerWorker {
    server: TcpListener,
    poll: MioPoll,
}


impl ServerWorker {

    pub fn new() -> Result<Self, Box<dyn Error>> {
        let addr = "127.0.0.1:13265".parse()?;
        let mut server = TcpListener::bind(addr)?;
        let poll: MioPoll = MioPoll::new()?;
        poll.registry()
        .register(&mut server, SERVER, Interest::READABLE)?;
        Ok(ServerWorker{
            server,
            poll,
        })
    }
}

impl Future for ServerWorker {
    type Output = String;


    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -> Poll<Self::Output> {

        let mut events = Events::with_capacity(128);

        let outcome = self.poll.poll(
            &mut events, 
            Some(Duration::from_secs(5))
        ).unwrap();

        println!("outcome: {:?}", outcome);

        if outcome == () {
            println!("no events");
            cx.waker().wake_by_ref();
            return Poll::Pending
        }

        println!("events: {:?}", events);
        for event in events.iter() {
            println!("event: {:?}", event);
            match event.token() {
                SERVER => {
                    let (mut stream, _) = self.server.accept().unwrap();
                    let mut received_data = [0; 4096];
                    let bytes_read = 0;
                    let _ = stream.read(&mut received_data[bytes_read..]);
                    let received_data_string = String::from_utf8_lossy(&received_data).to_string();
                    return Poll::Ready(received_data_string)
                }
                // We don't expect any events with tokens other than those we provided.
                _ => unreachable!(),
            }
        }
        cx.waker().wake_by_ref();
        return Poll::Pending
    }
}

I appreciate that there are better ways run a server but I'm just looking into this approach. I've read about a Mio waker here but calling this doesn't seem to work, through, I think it's important as the Mio waker talks about polling over threads and futures do pass through threads. Any help would be much appreciated.

I'm awaiting the future in the main function as seen below:

fn main() {
    Runtime::new().with_low_num(2).with_high_num(4).run();

    let server_worker = ServerWorker::new().unwrap();
    let test = spawn_task!(server_worker);
    let _outcome = future::block_on(test);
}

I then just call the TCP socket from a python script for testing (this script works when the Mio TCP listener is just running in the main). And the spawn_task! macro is just a wrapper around a structure simular to the following:

pub fn spawn_task<F, T>(future: F) -> Task<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
        let (tx, rx) = flume::unbounded::<Runnable>();

        thread::spawn(move || {
            while let Ok(runnable) = rx.recv() {
                let _ = catch_unwind(|| runnable.run());
            }
        });
        tx

    });

    let schedule = |runnable| QUEUE.send(runnable).unwrap();
    let (runnable, task) = async_task::spawn(future, schedule);

    runnable.schedule();
    return task
}

I'm not putting in all the task stealing and multiple threads otherwise it would just bloat the question but the runtime works on a lot of other tasks.

0

There are 0 best solutions below