Is it possible to launch this worker in a separate thread instead of blocking the main one?

58 Views Asked by At

I have a Rust app which is basically an axum web server with some routes.

But now I need to periodically check if in a database table there are some new rows.

If there are I need to do some HEAVY computations that can last even minutes (especially in an undersized docker container).

The code I'm using is the below and the output is:

The main thread should not be blocked and should print every second: Instant { t: 6479.5889821s }
The main thread should not be blocked and should print every second: Instant { t: 6480.5996495s }
The main thread should not be blocked and should print every second: Instant { t: 6481.6152853s }
Starting an heavy CPU computation...
The main thread should not be blocked and should print every second: Instant { t: 6502.5748215s }
The main thread should not be blocked and should print every second: Instant { t: 6503.5917731s }
The main thread should not be blocked and should print every second: Instant { t: 6504.5990575s }

As you can see the Starting an heavy CPU computation... blocks the main thread.

Is there a way to avoid this? Perhaps by using tokio::task::spawn_blocking() for each heavy job?

Can I start the entire "worker" in a separate thread? Because I have many different jobs.

I mean this code in main():

let worker = Queue::new();

tokio::spawn(async move { worker.run().await }); // Is there a way to launch this in a separate thread?

The code is here: Rust Playground

Or here: Rust Explorer

Or here:

use futures::StreamExt;
use rand::Rng;
use std::time::{Duration, Instant};

const CONCURRENCY: usize = 5;

struct Job {
    id: u16,
}

struct Queue {
    // some needed fields like DB connection
}

impl Queue {
    fn new() -> Self {
        Self {}
    }

    async fn run(&self) {
        loop {
            // I'll get jobs from DB here; for this demo are random generated

            let mut jobs: Vec<Job> = Vec::new();

            for _ in 0..2 {
                jobs.push(Job {
                    id: get_random_id(),
                })
            }

            futures::stream::iter(jobs)
                .for_each_concurrent(CONCURRENCY, |job| async {
                    match self.handle_job(job).await {
                        Ok(_) => {
                            // I will remove the job from queue
                        }
                        Err(_) => {
                            // I will handle this error
                        }
                    };
                })
                .await;

            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }

    async fn handle_job(&self, job: Job) -> Result<(), String> {
        if job.id % 2 == 0 {
            println!("Starting an heavy CPU computation...");

            // I'm simulating heavy CPU computations with this sleep thread blocking here
            std::thread::sleep(Duration::from_secs(10));

            // I think I can use spawn_blocking instead, right?

            // tokio::task::spawn_blocking(move || {
            //     std::thread::sleep(Duration::from_secs(8));
            // }).await.unwrap()
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let worker = Queue::new();

    // Can I start the below worker.run() in a separate thread?

    tokio::spawn(async move { worker.run().await });

    loop {
        println!(
            "The main thread should not be blocked and should print every second: {:?}",
            Instant::now()
        );
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn get_random_id() -> u16 {
    let mut rng = rand::thread_rng();

    rng.gen::<u16>()
}
1

There are 1 best solutions below

2
kmdreko On

Yes, you should use spawn_blocking. That will ensure that the async loop in main is not blocked by your CPU-heavy code.

Tokio has a number of blocking threads dedicated for this purpose and while it could be a problem if you run your CPU-heavy tasks unbounded, you are using for_each_concurrent to impose your own limit on parallelism so that isn't a problem.