RxCpp: How to write a custom scheduler

103 Views Asked by At

I want to write a custom RxCpp scheduler that executes actions on an event queue that is part of my application. To do this, I am learning about the relevant interfaces.

My simple code below fails with a SIGSEGV -- and when stepping through the code, it looks like the RxCpp code is indeed dereferencing a null pointer. What is the right pattern to follow?

#include <rxcpp/rx.hpp>
#include <iostream>

class simple_worker : public rxcpp::schedulers::worker_interface {
public:
  clock_type::time_point now() const override {
    return clock_type::now();
  }

  void schedule(const rxcpp::schedulers::schedulable& scbl) const override {
    scbl();
  }

  void schedule(clock_type::time_point when, const rxcpp::schedulers::schedulable& scbl) const override {
    // TODO: Cheating, not using "when".
    scbl();
  }
};

class simple_service : public rxcpp::schedulers::scheduler_interface {
public:
  virtual clock_type::time_point now() const override {
    return clock_type::now();
  }

  virtual rxcpp::schedulers::worker create_worker(rxcpp::composite_subscription cs) const override {
    return rxcpp::schedulers::worker(std::move(cs), std::make_shared<simple_worker>());
  }
};

void testSubjects() {
  rxcpp::subjects::subject<int> s;

  auto threads = rxcpp::observe_on_one_worker(rxcpp::schedulers::make_scheduler<simple_service>());

  s.get_observable().observe_on(threads) |
  rxcpp::operators::subscribe<int>(
      [](int j) {
        std::cout << "  observer received " << j << std::endl;
      },
      [](const std::exception_ptr&) {
        std::cout << "  observer received error" << std::endl;
      },
      []() {
        std::cout << "  observer received complete" << std::endl;
      });

  for (int i = 0; i < 100; i++) {
    s.get_subscriber().on_next(i);
  }
}

int main() {
  testSubjects();
}
1

There are 1 best solutions below

0
Victimsnino On

Rxcpp uses recursor to make recursion calls inside scheduler. You need to invoke schedulable something like this:

if (scbl.is_subscribed()) {
                // allow recursion
                recursion r(true);
                scbl(r.get_recurse());
            }

It is how immediate implemented: https://github.com/ReactiveX/RxCpp/blob/main/Rx/v2/src/rxcpp/schedulers/rx-immediate.hpp#L40