How do I stream the contents of an std::sync::mpsc::Receiver from a function?
The caller should be able to receive results before all the results are available.
Ideally, I'd like the return type of fetch_all() to be independent of std::sync::mpsc so something like impl Iterator<Item = &'a Result<FooResult<'a>, FooError<'a>>> is preferred.
The code below doesn't work because rx is allocated on the stack so it cannot be returned.
I suppose the caller could pass tx and rx into fetch_all(), is there a way to avoid this?
use std::sync::mpsc::{Receiver, Sender};
fn fetch(item: &'a str) -> Result<FooResult<'a>, FooError<'a>> {}
fn fetch_all(items: &'a [&'a str]) -> ??? { // What's the best return type?
let (tx, rx) = mpsc::channel();
let pool = ThreadPool::new(10);
for i in items {
let txx = tx.clone();
let item = i.to_owned();
pool.execute(move || {
let r = fetch(item)
txx.send(r).expect("Thread communication error");
});
}
pool.join();
std::mem::drop(sender);
rx.into_iter() // <- what should this be?
}
That cannot be done, because it would be unsound.
For the code to be sound,
itemsmust be alive while the threads are running. But assuming we return immediately without letting the threads finish, the caller might destroyitemsand we'll have a use-after-free.Of course, as @user4815162342 suggested, we can let the threads finish, but then this isn't "streaming" anymore. We essentially just fill a
Vecwith the results.We can do a little better, though: we can shift the pool creation to the caller. Then, we can still only run while the pool is alive, but the caller decides when to kill it - and so, we expand the scope of running for all the caller wants. So the caller may include the processing of the streamed values in this scope, and then we will have true streaming.
Example: