Is this the correct way of using reactiveX in python?

47 Views Asked by At

I am trying to learn reactive programming in Python, And made a little script to track the battery plug events like this

from time import sleep
import reactivex as rx
import psutil


class BatteryStream:
    def __iter__(self):
        return self

    def __next__(self):
        sleep(2)
        battery = psutil.sensors_battery()
        return battery.power_plugged


def print_battery_status(pluged_in: bool) -> None:
    print("Plugged in" if pluged_in else "On battery")


battery_events = rx.from_iterable(BatteryStream())
battery_events.subscribe(
    on_next=lambda event: print_battery_status(event),
    on_error=lambda err: print("error: ", err),
)

Please help me to identify whether I am on the correct track, Or is there any better way to implement this

0

There are 0 best solutions below