Canonical Rust equivalent to C++ capture `this` in closure

137 Views Asked by At

I'm having trouble finding an equivalent to this type of code, which makes me suspect it isn't idiomatic for Rust, but it's unclear what the canonical approach would be since I can't find instances of the problem discussed.

Consider this struct:

struct Looper {
    shared_value: Arc<AtomicU64>,
    handle: Option<JoinHandle<()>>
}

impl Looper {
    pub fn new(task: Box<dyn Fn() + Send>) -> Self {
        let mut looper = Self { shared_value: Arc::new(AtomicU64::new(10)), handle: None };
        let shared_state = looper.shared_value.clone();

        looper.handle = Some(thread::spawn(move || {
            for _ in 0..1000 {
                sleep(Duration::from_millis(shared_state.load(Relaxed)));
                task();
            }
        }));

        looper
    }

    pub fn set_value(&self, value: u64) {
        self.shared_value.store(value, Relaxed)
    }
}

impl Drop for Looper {
    fn drop(&mut self) {
        self.handle.take().unwrap().join().unwrap();
    }
}

And consider a very contrived problem, where we want a wrapper object that can reuse the struct but it manages changes to the sleep period internally to the wrapper.

In C++, this is straightforward by capturing this in your lambda:

class Wrapper {
private:
    std::unique_ptr<Looper> looper;

public:
    Wrapper(Duration pollingRate, const std::function<uint64_t()>& task) {
        looper = std::make_unique<Looper>( [this, task]() { looper->set_value(task()); });
    }
}

In Rust, there are two issues. First, there is no constructor equivalent to capture this in. That type of issue is solvable by using options. I could create a None element first, then try to create a closure and capture the object currently set to None e.g.

struct WrappedLooper {
    looper: Option<Looper>
}

impl WrappedLooper {
    pub fn new(task: Box<dyn Fn() -> u64 + Send>) -> Self {
        let mut wrapped = WrappedLooper { looper: None };
        let this = &wrapped;

        wrapped.looper = Some(Looper::new(Box::new(move || {
            this.looper.unwrap().set_value(task());
        })));

        wrapped
    }
}

This obviously doesn't work as you can't move out of this. I could switch the internal state to an Arc and Mutex it, but this seems extremely heavy handed and incorrect to me:

struct WrappedLooper {
    looper: Arc<Mutex<Option<Looper>>>
}

impl WrappedLooper {
    pub fn new(task: Box<dyn Fn() -> u64 + Send>) -> Self {
        let mut wrapped = WrappedLooper { looper: Arc::new(Mutex::new(None)) };
        let copy = wrapped.looper.clone();

        wrapped.looper.lock().unwrap().replace(Looper::new(Box::new(move || {
            copy.lock().unwrap().as_mut().as_ref().unwrap().set_value(task());
        })));

        wrapped
    }
}

So what I am wondering is what is the right way to have this kind of self-referential initialization (the option, or something else) in scenarios where the access is always safe? (Here it is safe because the object outlives the closure + the shared mutable state is atomic)

2

There are 2 best solutions below

4
BrownieInMotion On

From my understanding, there are two separate issues. One has to do with the lifetime of the spawned thread; the other is self-reference.

Let's disregard that first issue by ignoring the thread entirely. First, let's try only requiring that the closure live as long as Looper::new:

    pub fn new<'a>(_task: Box<dyn Fn() + Send + Sync + 'a>) -> Self {
        Self {
            shared_value: Arc::new(AtomicU64::new(10)),
        }
    }

Then, everything else compiles just fine. But this isn't very useful: if we aren't guaranteed that the function lives past Looper::new, we obviously can't use it in the thread. Instead, we could tell Rust that task must live as long as the created Looper:

struct Looper<'a> {
    shared_value: Arc<AtomicU64>,
    _marker: PhantomData<dyn Fn() + Send + Sync + 'a>,
}

impl<'a> Looper<'a> {
    pub fn new(_task: Box<dyn Fn() + Send + Sync + 'a>) -> Self {
        Self {
            shared_value: Arc::new(AtomicU64::new(10)),
            _marker: PhantomData,
        }
    }

    pub fn set_value(&self, value: u64) {
        self.shared_value.store(value, Relaxed)
    }
}

Then, the immediate issue is that we want WrappedLooper to have an instance of Looper that depends on a reference to itself. We can't freely let Looper do that: what happens to that self-reference if Looper is moved? So, making this work would require some unsafe Rust using Pin, or at least a library that encapsulates that away.

However, even if this were implemented, we hit the other problem: there's no way to really know how long the thread lives. Let's add the thread spawning back in:

11 |   impl<'a> Looper<'a> {
   |        -- lifetime `'a` defined here
12 |       pub fn new(task: Box<dyn Fn() + Send + Sync + 'a>) -> Self {
   |                  ---- `task` is a reference that is only valid in the associated function body
...
20 | /         thread::spawn(move || {
21 | |             sleep(Duration::from_millis(shared_state.load(Relaxed)));
22 | |             task();
23 | |         });
   | |          ^
   | |          |
   | |__________`task` escapes the associated function body here
   |            argument requires that `'a` must outlive `'static`

(Playground link)

Telling Rust that our function lives as long as Looper is not enough: it has to be 'static. This makes sense: consider what happens if WrappedLooper is dropped before the delay is over. Then, task could no longer hold a reference to the looper. The C++ snippet has exactly this problem: if the Wrapper is destroyed, then the this captured in the task for the Looper is now dangling.

Because Rust does not provide a way to guarantee the looper instance indeed outlives the thread, it's probably best to just use reference counting here.


Sidenote:

In Rust, there are two issues. First, there is no constructor equivalent to capture this in. That type of issue is solvable by using options.

This first issue is also present in the C++ version, just hidden. In C++, unique_ptr can be null (and starts null). So, it acts just like an Option, where looper->set_value in C++ is essentially equivalent to looper.unwrap_unchecked().set_value in Rust. The difference is that Rust forces us to explicitly recognize it.

0
Chayim Friedman On

This kind of issue can be solved with Arc::new_cyclic():

struct WrappedLooper {
    looper: Arc<Looper>,
}

impl WrappedLooper {
    pub fn new(task: Box<dyn Fn() -> u64 + Send>) -> Self {
        let looper = Arc::<Looper>::new_cyclic(|looper| {
            let looper = Weak::clone(looper);
            Looper::new(Box::new(move || {
                let looper = looper.upgrade().expect("looper gone");
                looper.set_value(task());
            }))
        });

        WrappedLooper { looper }
    }
}

However, this code has an issue, that is also present in your C++ code and Rust code with the Mutex (in both Rust snippets it can cause panic, in the C++ snippet it can cause UB): if the thread will start running before we'll complete the Looper construction, we will see a null Looper (this will cause a panic in the unwrap() in your Rust snippet or in the upgrade().expect() in my Rust snippet, and UB in C++ as we'll try to dereference a null pointer). This can be solved with a channel:

impl WrappedLooper {
    pub fn new(task: Box<dyn Fn() -> u64 + Send>) -> Self {
        let (construction_completed_tx, construction_completed_rx) = mpsc::sync_channel(1);
        let looper = Arc::<Looper>::new_cyclic(|looper| {
            let looper = Weak::clone(looper);
            Looper::new(Box::new(move || {
                construction_completed_rx.recv().expect("looper gone");
                
                let looper = looper.upgrade().expect("looper gone");
                looper.set_value(task());
            }))
        });
        construction_completed_tx.send(()).unwrap();

        WrappedLooper { looper }
    }
}