Creating custom operators in rxcpp

877 Views Asked by At

I am trying to learn how to create custom operators in rxcpp, and I was able to create operators as sited in here. But, I would like to learn how to create more generic operators implementing rxo::operator_base and using lift operator. Is there any documentation available to learn this with some simple examples?.

2

There are 2 best solutions below

0
Wang Evander On

Here`s a way to use rxcpp v2 observerable lift function :

class MyTestOp //: public rxcpp::operators::operator_base<int>
{
public:
    MyTestOp(){}
    ~MyTestOp(){}

    rxcpp::subscriber<int> operator() (rxcpp::subscriber<int> s) const {
        return rxcpp::make_subscriber<int>([s](const int & next) {
            s.on_next(std::move(next + 1));
        },  [&s](const std::exception_ptr & e) {
            s.on_error(e);
        },  [&s]() {
            s.on_completed();
        });
    }
};


int main()
{
    auto keys = rxcpp::observable<>::create<int>(
        [](rxcpp::subscriber<int> dest){
            for (;;) {
                int key = std::cin.get();
                dest.on_next(key);
            }
        }).
    publish();

    keys.lift<int>(MyTestOp()).subscribe([](int key){ 
        std::cout << key << std::endl;
    });

    // same as use class
    //keys.lift<int>([](rxcpp::subscriber<int> s) { 
    //    return rxcpp::make_subscriber<int>([s](const int & next) {
    //        s.on_next(std::move(next + 1));
    //    },  [&s](const std::exception_ptr & e) {
    //        s.on_error(e);
    //    },  [&s]() {
    //        s.on_completed();
    //    });
    //}).subscribe([](int key){ 
    //    std::cout << key << std::endl;
    //});

    // run the loop in create
    keys.connect();

    return 0;
}

since it is base on template check, you don`t need to inherit from any interface, just implement an operator() like before will be ok.

And I think the author would prefer you to use the way in the comments.

And maybe I should use has subscribe check ... any way ...

if(!s.isUnsubscribed()) { /*call s.on_xxx*/ }
0
Tom Huntington On

I found the following slide from Kirk's 2016 presentation quite helpful, even though it is about rxcppv3 rather than v2.

Sequence concepts

struct observable {
    void bind(observer);
};

struct observer {
    template<class T>
    void next(T);
};

struct lifter {
    observer lift(observer);
};

Sequence Implementations

const auto ints = [](auto first, auto last){
  return make_observable([=](auto r){ // Define observable::bind
    for(auto i = first;; ++i){
      r.next(i);
      if (i == last) break;
    }
  });
};

const auto copy_if = [](auto pred){
  return make_lifter([=](auto r){
    return make_observer(r, [=](auto& r, auto v){ // Define observer::next
        if (pred(v)) r.next(v);
    });
  });
};