I am trying to execute a bunch of futures in parallel. The tasks return more work that the main thread would use to spawn more tasks - ending when all the tasks have settled and no tasks return new work (I am crawling a graph in parallel).
The order of completion is not important, what's important is maximizing CPU utilization so I want to create new tasks as soon as any existing task completes and provides new work.
futures::stream::FuturesUnordered seems to do what I want however when I plug it into my real world use case, only 1 CPU is pegged at 100% usage.
My project is using Tokio as an async runtime however it does not appear that FuturesUnordered is using the thread pool (or perhaps I have misconfigured it).
As a toy example:
// If this returns a number, spawn that many more tasks
async fn do_work(c: usize) -> usize {
println!("[{c}] START");
tokio::time::sleep(Duration::from_secs(1)).await;
let jobs_to_spawn = rand::thread_rng().gen_range(0..10);
println!("[{c}] END");
return jobs_to_spawn;
}
async fn main_async() {
let mut jobs = futures::stream::FuturesUnordered::new();
let mut c = 0; // to keep track of the task completion
jobs.push(do_work(c.clone())); // initial task
while let Some(result) = jobs.next().await {
println!("Spawning {:?}\n", result);
for _ in 0..result {
c+= 1;
jobs.push(do_work(c.clone()));
}
}
}
// Bootstrap Tokio with a configurable number of threads
fn main() {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.threads)
.enable_all()
.build()
.unwrap()
.block_on(main_async())
.unwrap();
}
The output of this indicates that the tasks are running in concurrently
[0] START
[0] END
Spawning 6
[1] START
[2] START
[3] START
[4] START
[5] START
[6] START
[1] END
Spawning 3
[2] END
Spawning 9
[3] END
Spawning 9
[4] END
Spawning 6
...
However only 1 CPU is used for this work.
Initially I thought that futures was a utility crate that you could use with a runtime like Tokio. Is that the case?
If not, how could I achieve this with Tokio?
EDIT:
I have achieved a working example with Tokio by using tokio::task::spawn, channels and mutexes to synchronize work - however I much prefer the control flow above as it's easier to reason about.