I'm looking to execute a batch of processes in parallel, but process each batch in series using RXPY (we're using v3 right now). Each process is kicked off, then I use RXPY to wait for a set amount of time before ending the process. Here's a basic version:
def start_task(value):
print(f"Started {value}")
return value
def end_task(value):
print(f"End: {value}")
def main():
print("Start main")
rx.interval(1).pipe(
ops.flat_map(lambda time : rx.from_([1,2]).pipe(
ops.map(lambda value: [time, value])
)),
ops.map(lambda value: start_task(value)),
ops.delay(2),
ops.map(lambda value: end_task(value)),
).run()
The problem with this is the long-running processes overlap each other. In other words, I do not want new processes to start before the last batch has finished. In the above example, the output is:
Start main
Started [0, 1]
Started [0, 2]
Started [1, 1]
Started [1, 2]
Started [2, 1]
Started [2, 2]
End: [0, 1]
End: [0, 2]
End: [1, 1]
Started [3, 1]
End: [1, 2]
Started [3, 2]
End: [2, 1]
End: [2, 2]
...
As you can see, time 1 and 2 started before 0 ended.
I can solve this by adding a boolean variable working, somewhat like a semaphore:
def start_task(value):
print(f"Started {value}")
return value
def end_task(value):
print(f"End: {value}")
def main():
print("Start main")
global working
working = False
def set_working(input):
global working
working = input
rx.interval(1).pipe(
ops.filter(lambda time: not working),
ops.do_action(lambda value: set_working(True)),
ops.flat_map(lambda time : rx.from_([1,2]).pipe(
ops.map(lambda value: [time, value])
)),
ops.map(lambda value: start_task(value)),
ops.delay(2),
ops.map(lambda value: end_task(value)),
ops.do_action(lambda value: set_working(False)),
).run()
With the following output:
Start main
Started [0, 1]
Started [0, 2]
End: [0, 1]
End: [0, 2]
Started [3, 1]
Started [3, 2]
End: [3, 1]
End: [3, 2]
But this feels wrong. Is there an existing operator in RXPY that would accomplish this same functionality?
Even in your second solution you don't ensure that the next tasks won't start before tasks from first batch are finished.
To simply test it change
end_taskto:the output results become:
It happens because sequence that ended task first continues and sets variable
workingback to false even though[0, 2]is still being processed.Solution using threading Lock
If you want you can use
Lockfrom threading package to achieve "more pythonic" way.Now at the beginning of your sequence you should:
here is my code modification: