Stream std::sync::mpsc::Receiver results from function

146 Views Asked by At

How do I stream the contents of an std::sync::mpsc::Receiver from a function?

The caller should be able to receive results before all the results are available.

Ideally, I'd like the return type of fetch_all() to be independent of std::sync::mpsc so something like impl Iterator<Item = &'a Result<FooResult<'a>, FooError<'a>>> is preferred.

The code below doesn't work because rx is allocated on the stack so it cannot be returned.

I suppose the caller could pass tx and rx into fetch_all(), is there a way to avoid this?

use std::sync::mpsc::{Receiver, Sender};

fn fetch(item: &'a str) -> Result<FooResult<'a>, FooError<'a>> {}

fn fetch_all(items: &'a [&'a str]) -> ??? { // What's the best return type?
    let (tx, rx) = mpsc::channel();

    let pool = ThreadPool::new(10);

    for i in items {
        let txx = tx.clone();
        let item = i.to_owned();
        pool.execute(move || {
            let r = fetch(item)
            txx.send(r).expect("Thread communication error");
        });
    }

    pool.join();
    std::mem::drop(sender); 

    rx.into_iter() // <- what should this be?
}
2

There are 2 best solutions below

2
Chayim Friedman On

That cannot be done, because it would be unsound.

For the code to be sound, items must be alive while the threads are running. But assuming we return immediately without letting the threads finish, the caller might destroy items and we'll have a use-after-free.

Of course, as @user4815162342 suggested, we can let the threads finish, but then this isn't "streaming" anymore. We essentially just fill a Vec with the results.

We can do a little better, though: we can shift the pool creation to the caller. Then, we can still only run while the pool is alive, but the caller decides when to kill it - and so, we expand the scope of running for all the caller wants. So the caller may include the processing of the streamed values in this scope, and then we will have true streaming.

Example:

use scoped_threadpool::{Pool, Scope};
use std::sync::mpsc;

fn fetch_all<'a: 'scope, 'pool, 'scope>(
    items: &'a [&'a str],
    scope: &Scope<'pool, 'scope>,
) -> impl Iterator<Item = Result<FooResult<'a>, FooError<'a>>> {
    let (tx, rx) = mpsc::channel();

    for i in items {
        let tx = tx.clone();
        scope.execute(move || {
            let r = fetch(i);
            tx.send(r).expect("Thread communication error");
        });
    }

    rx.into_iter()
}

fn main() {
    let mut pool = Pool::new(10);
    pool.scoped(|scope| {
        let iter = fetch_all(&["a", "b", "c"], scope);
        for item in iter {
            // Process `item`...
        }
    });
}
2
Jay On

The first problem with the original code is that the results of fetch() cannot reference arguments to fetch_all() (without std::sync::Arc) since fetch() will be run on a thread that continues to exist after fetch_all() has returned. In other words, we need to allow the input argument items to be destroyed before the results by removing the lifetime parameters and making any necessary str copies (unless we want to pass item as Arc<&str> to avoid copying).

The purpose of the lifetime parameters in the original code was to allow FooResult and FooError to have a reference to item without copying it, but the correct way to do this is with std::sync::Arc, not lifetime parameters.

The function signatures should be:

fn fetch(item: &str) -> Result<FooResult, FooError> {}
fn fetch_all(items: &[&'str]) ->  impl Iterator<Item = Result<FooResult, FooError>> {}

The second problem with the original code is that let item = i.to_owned(); doesn't copy the item; neither does let item = i.clone(); The item needs to be copied since it needs to exist inside the thread after fetch_all() returns.

The correct code is let item = String::from(i);


The complete solution:

pub fn fetch(item: &str) -> Result<FooResult, FooError> {}

pub fn fetch_all(
  items: &[&str],
) -> impl Iterator<Item = Result<FooResult, FooError>> {
  let (tx, rx) = mpsc::channel();
  let pool = ThreadPool::new(10);

  for &i in items {
    let item = String::from(i);
    let txx = tx.clone();
    pool.execute(move || {
      let r = fetch(&item);
      txx.send(r).expect("Thread communication error");
    });
  }

  rx.into_iter()
}

This works, but an ideal solution would allow a single implementation of fetch() that supports &String, &str, Arc<&str>, Arc<&String> as item type and copies in the case of &String or &str but not in the case of Arc, but I haven't figured this out yet.