I want to write a custom RxCpp scheduler that executes actions on an event queue that is part of my application. To do this, I am learning about the relevant interfaces.
My simple code below fails with a SIGSEGV -- and when stepping through the code, it looks like the RxCpp code is indeed dereferencing a null pointer. What is the right pattern to follow?
#include <rxcpp/rx.hpp>
#include <iostream>
class simple_worker : public rxcpp::schedulers::worker_interface {
public:
clock_type::time_point now() const override {
return clock_type::now();
}
void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
scbl();
}
void schedule(clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
// TODO: Cheating, not using "when".
scbl();
}
};
class simple_service : public rxcpp::schedulers::scheduler_interface {
public:
virtual clock_type::time_point now() const override {
return clock_type::now();
}
virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
return rxcpp::schedulers::worker(std::move(cs), std::make_shared<simple_worker>());
}
};
void testSubjects() {
rxcpp::subjects::subject<int> s;
auto threads = rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<simple_service>());
s.get_observable().observe_on(threads) |
rxcpp::operators::subscribe<int>(
[](int j) {
std::cout << " observer received " << j << std::endl;
},
[](const std::exception_ptr&) {
std::cout << " observer received error" << std::endl;
},
[]() {
std::cout << " observer received complete" << std::endl;
});
for (int i = 0; i < 100; i++) {
s.get_subscriber().on_next(i);
}
}
int main() {
testSubjects();
}
Rxcpp uses
recursorto make recursion calls inside scheduler. You need to invoke schedulable something like this:It is how immediate implemented: https://github.com/ReactiveX/RxCpp/blob/main/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp#L40