Does FuturesUnordered use the Tokio thread pool?

53 Views Asked by At

I am trying to execute a bunch of futures in parallel. The tasks return more work that the main thread would use to spawn more tasks - ending when all the tasks have settled and no tasks return new work (I am crawling a graph in parallel).

The order of completion is not important, what's important is maximizing CPU utilization so I want to create new tasks as soon as any existing task completes and provides new work.

futures::stream::FuturesUnordered seems to do what I want however when I plug it into my real world use case, only 1 CPU is pegged at 100% usage.

My project is using Tokio as an async runtime however it does not appear that FuturesUnordered is using the thread pool (or perhaps I have misconfigured it).

As a toy example:

// If this returns a number, spawn that many more tasks
async fn do_work(c: usize) -> usize {
  println!("[{c}] START");
  tokio::time::sleep(Duration::from_secs(1)).await;
  let jobs_to_spawn = rand::thread_rng().gen_range(0..10);
  println!("[{c}] END");
  return jobs_to_spawn;
}

async fn main_async() {
  let mut jobs = futures::stream::FuturesUnordered::new();
  let mut c = 0; // to keep track of the task completion

  jobs.push(do_work(c.clone())); // initial task

  while let Some(result) = jobs.next().await { 
    println!("Spawning {:?}\n", result);
    for _ in 0..result {
      c+= 1;
      jobs.push(do_work(c.clone()));
    }
  }
}

// Bootstrap Tokio with a configurable number of threads
fn main() {
  tokio::runtime::Builder::new_multi_thread()
    .worker_threads(config.threads)
    .enable_all()
    .build()
    .unwrap()
    .block_on(main_async())
    .unwrap();
}

The output of this indicates that the tasks are running in concurrently

[0] START
[0] END
Spawning 6

[1] START
[2] START
[3] START
[4] START
[5] START
[6] START
[1] END
Spawning 3

[2] END
Spawning 9

[3] END
Spawning 9

[4] END
Spawning 6
...

However only 1 CPU is used for this work.

Initially I thought that futures was a utility crate that you could use with a runtime like Tokio. Is that the case?

If not, how could I achieve this with Tokio?

EDIT: I have achieved a working example with Tokio by using tokio::task::spawn, channels and mutexes to synchronize work - however I much prefer the control flow above as it's easier to reason about.

0

There are 0 best solutions below